chia-blockchain/chia/full_node/sync_store.py

137 lines
4.8 KiB
Python

import asyncio
import logging
from typing import Dict, List, Optional, Set, Tuple
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint32, uint128
log = logging.getLogger(__name__)
class SyncStore:
# Whether or not we are syncing
sync_mode: bool
long_sync: bool
peak_to_peer: Dict[bytes32, Set[bytes32]] # Header hash : peer node id
peer_to_peak: Dict[bytes32, Tuple[bytes32, uint32, uint128]] # peer node id : [header_hash, height, weight]
sync_target_header_hash: Optional[bytes32] # Peak hash we are syncing towards
sync_target_height: Optional[uint32] # Peak height we are syncing towards
peers_changed: asyncio.Event
batch_syncing: Set[bytes32] # Set of nodes which we are batch syncing from
backtrack_syncing: Dict[bytes32, int] # Set of nodes which we are backtrack syncing from, and how many threads
@classmethod
async def create(cls):
self = cls()
self.sync_mode = False
self.long_sync = False
self.sync_target_header_hash = None
self.sync_target_height = None
self.peak_fork_point = {}
self.peak_to_peer = {}
self.peer_to_peak = {}
self.peers_changed = asyncio.Event()
self.batch_syncing = set()
self.backtrack_syncing = {}
return self
def set_peak_target(self, peak_hash: bytes32, target_height: uint32):
self.sync_target_header_hash = peak_hash
self.sync_target_height = target_height
def get_sync_target_hash(self) -> Optional[bytes32]:
return self.sync_target_header_hash
def get_sync_target_height(self) -> Optional[bytes32]:
return self.sync_target_height
def set_sync_mode(self, sync_mode: bool):
self.sync_mode = sync_mode
def get_sync_mode(self) -> bool:
return self.sync_mode
def set_long_sync(self, long_sync: bool):
self.long_sync = long_sync
def get_long_sync(self) -> bool:
return self.long_sync
def peer_has_block(self, header_hash: bytes32, peer_id: bytes32, weight: uint128, height: uint32, new_peak: bool):
"""
Adds a record that a certain peer has a block.
"""
if header_hash == self.sync_target_header_hash:
self.peers_changed.set()
if header_hash in self.peak_to_peer:
self.peak_to_peer[header_hash].add(peer_id)
else:
self.peak_to_peer[header_hash] = {peer_id}
if new_peak:
self.peer_to_peak[peer_id] = (header_hash, height, weight)
def get_peers_that_have_peak(self, header_hashes: List[bytes32]) -> Set[bytes32]:
"""
Returns: peer ids of peers that have at least one of the header hashes.
"""
node_ids: Set[bytes32] = set()
for header_hash in header_hashes:
if header_hash in self.peak_to_peer:
for node_id in self.peak_to_peer[header_hash]:
node_ids.add(node_id)
return node_ids
def get_peak_of_each_peer(self) -> Dict[bytes32, Tuple[bytes32, uint32, uint128]]:
"""
Returns: dictionary of peer id to peak information.
"""
ret = {}
for peer_id, v in self.peer_to_peak.items():
if v[0] not in self.peak_to_peer:
continue
ret[peer_id] = v
return ret
def get_heaviest_peak(self) -> Optional[Tuple[bytes32, uint32, uint128]]:
"""
Returns: the header_hash, height, and weight of the heaviest block that one of our peers has notified
us of.
"""
if len(self.peer_to_peak) == 0:
return None
heaviest_peak_hash: Optional[bytes32] = None
heaviest_peak_weight: uint128 = uint128(0)
heaviest_peak_height: Optional[uint32] = None
for peer_id, (peak_hash, height, weight) in self.peer_to_peak.items():
if peak_hash not in self.peak_to_peer:
continue
if heaviest_peak_hash is None or weight > heaviest_peak_weight:
heaviest_peak_hash = peak_hash
heaviest_peak_weight = weight
heaviest_peak_height = height
assert heaviest_peak_hash is not None and heaviest_peak_weight is not None and heaviest_peak_height is not None
return heaviest_peak_hash, heaviest_peak_height, heaviest_peak_weight
async def clear_sync_info(self):
"""
Clears the peak_to_peer info which can get quite large.
"""
self.peak_to_peer = {}
def peer_disconnected(self, node_id: bytes32):
if node_id in self.peer_to_peak:
del self.peer_to_peak[node_id]
for peak, peers in self.peak_to_peer.items():
if node_id in peers:
self.peak_to_peer[peak].remove(node_id)
assert node_id not in self.peak_to_peer[peak]
self.peers_changed.set()