454 lines
20 KiB
Python
454 lines
20 KiB
Python
import asyncio
|
|
import dataclasses
|
|
import logging
|
|
import multiprocessing
|
|
from concurrent.futures.process import ProcessPoolExecutor
|
|
from enum import Enum
|
|
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
|
|
|
from chia.consensus.block_header_validation import validate_finished_header_block, validate_unfinished_header_block
|
|
from chia.consensus.block_record import BlockRecord
|
|
from chia.consensus.blockchain_interface import BlockchainInterface
|
|
from chia.consensus.constants import ConsensusConstants
|
|
from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty
|
|
from chia.consensus.find_fork_point import find_fork_point_in_chain
|
|
from chia.consensus.full_block_to_block_record import block_to_block_record
|
|
from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing
|
|
from chia.types.blockchain_format.sized_bytes import bytes32
|
|
from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary
|
|
from chia.types.header_block import HeaderBlock
|
|
from chia.types.unfinished_header_block import UnfinishedHeaderBlock
|
|
from chia.util.errors import Err, ValidationError
|
|
from chia.util.ints import uint32, uint64
|
|
from chia.util.streamable import recurse_jsonify
|
|
from chia.wallet.block_record import HeaderBlockRecord
|
|
from chia.wallet.wallet_block_store import WalletBlockStore
|
|
from chia.wallet.wallet_coin_store import WalletCoinStore
|
|
from chia.wallet.wallet_transaction_store import WalletTransactionStore
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class ReceiveBlockResult(Enum):
|
|
"""
|
|
When Blockchain.receive_block(b) is called, one of these results is returned,
|
|
showing whether the block was added to the chain (extending the peak),
|
|
and if not, why it was not added.
|
|
"""
|
|
|
|
NEW_PEAK = 1 # Added to the peak of the blockchain
|
|
ADDED_AS_ORPHAN = 2 # Added as an orphan/stale block (not a new peak of the chain)
|
|
INVALID_BLOCK = 3 # Block was not added because it was invalid
|
|
ALREADY_HAVE_BLOCK = 4 # Block is already present in this blockchain
|
|
DISCONNECTED_BLOCK = 5 # Block's parent (previous pointer) is not in this blockchain
|
|
|
|
|
|
class WalletBlockchain(BlockchainInterface):
|
|
constants: ConsensusConstants
|
|
constants_json: Dict
|
|
# peak of the blockchain
|
|
_peak_height: Optional[uint32]
|
|
# All blocks in peak path are guaranteed to be included, can include orphan blocks
|
|
__block_records: Dict[bytes32, BlockRecord]
|
|
# Defines the path from genesis to the peak, no orphan blocks
|
|
__height_to_hash: Dict[uint32, bytes32]
|
|
# all hashes of blocks in block_record by height, used for garbage collection
|
|
__heights_in_cache: Dict[uint32, Set[bytes32]]
|
|
# All sub-epoch summaries that have been included in the blockchain from the beginning until and including the peak
|
|
# (height_included, SubEpochSummary). Note: ONLY for the blocks in the path to the peak
|
|
__sub_epoch_summaries: Dict[uint32, SubEpochSummary] = {}
|
|
# Unspent Store
|
|
coin_store: WalletCoinStore
|
|
tx_store: WalletTransactionStore
|
|
# Store
|
|
block_store: WalletBlockStore
|
|
# Used to verify blocks in parallel
|
|
pool: ProcessPoolExecutor
|
|
|
|
coins_of_interest_received: Any
|
|
reorg_rollback: Any
|
|
wallet_state_manager_lock: asyncio.Lock
|
|
|
|
# Whether blockchain is shut down or not
|
|
_shut_down: bool
|
|
|
|
# Lock to prevent simultaneous reads and writes
|
|
lock: asyncio.Lock
|
|
log: logging.Logger
|
|
|
|
@staticmethod
|
|
async def create(
|
|
block_store: WalletBlockStore,
|
|
coin_store: WalletCoinStore,
|
|
tx_store: WalletTransactionStore,
|
|
consensus_constants: ConsensusConstants,
|
|
coins_of_interest_received: Callable, # f(removals: List[Coin], additions: List[Coin], height: uint32)
|
|
reorg_rollback: Callable,
|
|
lock: asyncio.Lock,
|
|
):
|
|
"""
|
|
Initializes a blockchain with the BlockRecords from disk, assuming they have all been
|
|
validated. Uses the genesis block given in override_constants, or as a fallback,
|
|
in the consensus constants config.
|
|
"""
|
|
self = WalletBlockchain()
|
|
self.lock = asyncio.Lock()
|
|
self.coin_store = coin_store
|
|
self.tx_store = tx_store
|
|
cpu_count = multiprocessing.cpu_count()
|
|
if cpu_count > 61:
|
|
cpu_count = 61 # Windows Server 2016 has an issue https://bugs.python.org/issue26903
|
|
num_workers = max(cpu_count - 2, 1)
|
|
self.pool = ProcessPoolExecutor(max_workers=num_workers)
|
|
log.info(f"Started {num_workers} processes for block validation")
|
|
self.constants = consensus_constants
|
|
self.constants_json = recurse_jsonify(dataclasses.asdict(self.constants))
|
|
self.block_store = block_store
|
|
self._shut_down = False
|
|
self.coins_of_interest_received = coins_of_interest_received
|
|
self.reorg_rollback = reorg_rollback
|
|
self.log = logging.getLogger(__name__)
|
|
self.wallet_state_manager_lock = lock
|
|
await self._load_chain_from_store()
|
|
return self
|
|
|
|
def shut_down(self):
|
|
self._shut_down = True
|
|
self.pool.shutdown(wait=True)
|
|
|
|
async def _load_chain_from_store(self) -> None:
|
|
"""
|
|
Initializes the state of the Blockchain class from the database.
|
|
"""
|
|
height_to_hash, sub_epoch_summaries = await self.block_store.get_peak_heights_dicts()
|
|
self.__height_to_hash = height_to_hash
|
|
self.__sub_epoch_summaries = sub_epoch_summaries
|
|
self.__block_records = {}
|
|
self.__heights_in_cache = {}
|
|
blocks, peak = await self.block_store.get_block_records_close_to_peak(self.constants.BLOCKS_CACHE_SIZE)
|
|
for block_record in blocks.values():
|
|
self.add_block_record(block_record)
|
|
|
|
if len(blocks) == 0:
|
|
assert peak is None
|
|
self._peak_height = None
|
|
return None
|
|
|
|
assert peak is not None
|
|
self._peak_height = self.block_record(peak).height
|
|
assert len(self.__height_to_hash) == self._peak_height + 1
|
|
|
|
def get_peak(self) -> Optional[BlockRecord]:
|
|
"""
|
|
Return the peak of the blockchain
|
|
"""
|
|
if self._peak_height is None:
|
|
return None
|
|
return self.height_to_block_record(self._peak_height)
|
|
|
|
async def receive_block(
|
|
self,
|
|
header_block_record: HeaderBlockRecord,
|
|
pre_validation_result: Optional[PreValidationResult] = None,
|
|
trusted: bool = False,
|
|
fork_point_with_peak: Optional[uint32] = None,
|
|
) -> Tuple[ReceiveBlockResult, Optional[Err], Optional[uint32]]:
|
|
"""
|
|
Adds a new block into the blockchain, if it's valid and connected to the current
|
|
blockchain, regardless of whether it is the child of a head, or another block.
|
|
Returns a header if block is added to head. Returns an error if the block is
|
|
invalid. Also returns the fork height, in the case of a new peak.
|
|
"""
|
|
|
|
block = header_block_record.header
|
|
genesis: bool = block.height == 0
|
|
|
|
if self.contains_block(block.header_hash):
|
|
return ReceiveBlockResult.ALREADY_HAVE_BLOCK, None, None
|
|
|
|
if not self.contains_block(block.prev_header_hash) and not genesis:
|
|
return (
|
|
ReceiveBlockResult.DISCONNECTED_BLOCK,
|
|
Err.INVALID_PREV_BLOCK_HASH,
|
|
None,
|
|
)
|
|
|
|
if block.height == 0:
|
|
prev_b: Optional[BlockRecord] = None
|
|
else:
|
|
prev_b = self.block_record(block.prev_header_hash)
|
|
sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty(
|
|
self.constants, len(block.finished_sub_slots) > 0, prev_b, self
|
|
)
|
|
|
|
if trusted is False and pre_validation_result is None:
|
|
required_iters, error = validate_finished_header_block(
|
|
self.constants, self, block, False, difficulty, sub_slot_iters
|
|
)
|
|
elif trusted:
|
|
unfinished_header_block = UnfinishedHeaderBlock(
|
|
block.finished_sub_slots,
|
|
block.reward_chain_block.get_unfinished(),
|
|
block.challenge_chain_sp_proof,
|
|
block.reward_chain_sp_proof,
|
|
block.foliage,
|
|
block.foliage_transaction_block,
|
|
block.transactions_filter,
|
|
)
|
|
|
|
required_iters, val_error = validate_unfinished_header_block(
|
|
self.constants, self, unfinished_header_block, False, difficulty, sub_slot_iters, False, True
|
|
)
|
|
error = ValidationError(Err(val_error)) if val_error is not None else None
|
|
else:
|
|
assert pre_validation_result is not None
|
|
required_iters = pre_validation_result.required_iters
|
|
error = (
|
|
ValidationError(Err(pre_validation_result.error)) if pre_validation_result.error is not None else None
|
|
)
|
|
|
|
if error is not None:
|
|
return ReceiveBlockResult.INVALID_BLOCK, error.code, None
|
|
assert required_iters is not None
|
|
|
|
block_record = block_to_block_record(
|
|
self.constants,
|
|
self,
|
|
required_iters,
|
|
None,
|
|
block,
|
|
)
|
|
|
|
# Always add the block to the database
|
|
async with self.wallet_state_manager_lock:
|
|
async with self.block_store.db_wrapper.lock:
|
|
try:
|
|
await self.block_store.db_wrapper.begin_transaction()
|
|
await self.block_store.add_block_record(header_block_record, block_record)
|
|
self.add_block_record(block_record)
|
|
self.clean_block_record(block_record.height - self.constants.BLOCKS_CACHE_SIZE)
|
|
|
|
fork_height: Optional[uint32] = await self._reconsider_peak(
|
|
block_record, genesis, fork_point_with_peak
|
|
)
|
|
await self.block_store.db_wrapper.commit_transaction()
|
|
except BaseException as e:
|
|
self.log.error(f"Error during db transaction: {e}")
|
|
if self.block_store.db_wrapper.db._connection is not None:
|
|
await self.block_store.db_wrapper.rollback_transaction()
|
|
await self.coin_store.rebuild_wallet_cache()
|
|
await self.tx_store.rebuild_tx_cache()
|
|
raise
|
|
if fork_height is not None:
|
|
self.log.info(f"💰 Updated wallet peak to height {block_record.height}, weight {block_record.weight}, ")
|
|
return ReceiveBlockResult.NEW_PEAK, None, fork_height
|
|
else:
|
|
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None
|
|
|
|
async def _reconsider_peak(
|
|
self, block_record: BlockRecord, genesis: bool, fork_point_with_peak: Optional[uint32]
|
|
) -> Optional[uint32]:
|
|
"""
|
|
When a new block is added, this is called, to check if the new block is the new peak of the chain.
|
|
This also handles reorgs by reverting blocks which are not in the heaviest chain.
|
|
It returns the height of the fork between the previous chain and the new chain, or returns
|
|
None if there was no update to the heaviest chain.
|
|
"""
|
|
peak = self.get_peak()
|
|
if genesis:
|
|
if peak is None:
|
|
block: Optional[HeaderBlockRecord] = await self.block_store.get_header_block_record(
|
|
block_record.header_hash
|
|
)
|
|
assert block is not None
|
|
self.__height_to_hash[uint32(0)] = block.header_hash
|
|
for removed in block.removals:
|
|
self.log.debug(f"Removed: {removed.name()}")
|
|
await self.coins_of_interest_received(block.removals, block.additions, block.height)
|
|
self._peak_height = uint32(0)
|
|
return uint32(0)
|
|
return None
|
|
|
|
assert peak is not None
|
|
if block_record.weight > peak.weight:
|
|
# Find the fork. if the block is just being appended, it will return the peak
|
|
# If no blocks in common, returns -1, and reverts all blocks
|
|
if fork_point_with_peak is not None:
|
|
fork_h: int = fork_point_with_peak
|
|
else:
|
|
fork_h = find_fork_point_in_chain(self, block_record, peak)
|
|
|
|
# Rollback to fork
|
|
self.log.debug(f"fork_h: {fork_h}, SB: {block_record.height}, peak: {peak.height}")
|
|
if block_record.prev_hash != peak.header_hash:
|
|
await self.reorg_rollback(fork_h)
|
|
|
|
# Rollback sub_epoch_summaries
|
|
heights_to_delete = []
|
|
for ses_included_height in self.__sub_epoch_summaries.keys():
|
|
if ses_included_height > fork_h:
|
|
heights_to_delete.append(ses_included_height)
|
|
for height in heights_to_delete:
|
|
del self.__sub_epoch_summaries[height]
|
|
|
|
# Collect all blocks from fork point to new peak
|
|
blocks_to_add: List[Tuple[HeaderBlockRecord, BlockRecord]] = []
|
|
curr = block_record.header_hash
|
|
while fork_h < 0 or curr != self.height_to_hash(uint32(fork_h)):
|
|
fetched_header_block: Optional[HeaderBlockRecord] = await self.block_store.get_header_block_record(curr)
|
|
fetched_block_record: Optional[BlockRecord] = await self.block_store.get_block_record(curr)
|
|
assert fetched_header_block is not None
|
|
assert fetched_block_record is not None
|
|
blocks_to_add.append((fetched_header_block, fetched_block_record))
|
|
if fetched_header_block.height == 0:
|
|
# Doing a full reorg, starting at height 0
|
|
break
|
|
curr = fetched_block_record.prev_hash
|
|
|
|
for fetched_header_block, fetched_block_record in reversed(blocks_to_add):
|
|
self.__height_to_hash[fetched_block_record.height] = fetched_block_record.header_hash
|
|
if fetched_block_record.is_transaction_block:
|
|
await self.coins_of_interest_received(
|
|
fetched_header_block.removals,
|
|
fetched_header_block.additions,
|
|
fetched_header_block.height,
|
|
)
|
|
if fetched_block_record.sub_epoch_summary_included is not None:
|
|
self.__sub_epoch_summaries[
|
|
fetched_block_record.height
|
|
] = fetched_block_record.sub_epoch_summary_included
|
|
|
|
# Changes the peak to be the new peak
|
|
await self.block_store.set_peak(block_record.header_hash)
|
|
self._peak_height = block_record.height
|
|
return uint32(min(fork_h, 0))
|
|
|
|
# This is not a heavier block than the heaviest we have seen, so we don't change the coin set
|
|
return None
|
|
|
|
def get_next_difficulty(self, header_hash: bytes32, new_slot: bool) -> uint64:
|
|
assert self.contains_block(header_hash)
|
|
curr = self.block_record(header_hash)
|
|
if curr.height <= 2:
|
|
return self.constants.DIFFICULTY_STARTING
|
|
return get_next_sub_slot_iters_and_difficulty(self.constants, new_slot, curr, self)[1]
|
|
|
|
def get_next_slot_iters(self, header_hash: bytes32, new_slot: bool) -> uint64:
|
|
assert self.contains_block(header_hash)
|
|
curr = self.block_record(header_hash)
|
|
if curr.height <= 2:
|
|
return self.constants.SUB_SLOT_ITERS_STARTING
|
|
return get_next_sub_slot_iters_and_difficulty(self.constants, new_slot, curr, self)[0]
|
|
|
|
async def pre_validate_blocks_multiprocessing(
|
|
self, blocks: List[HeaderBlock], batch_size: int = 4
|
|
) -> Optional[List[PreValidationResult]]:
|
|
return await pre_validate_blocks_multiprocessing(
|
|
self.constants, self.constants_json, self, blocks, self.pool, True, {}, None, batch_size
|
|
)
|
|
|
|
def contains_block(self, header_hash: bytes32) -> bool:
|
|
"""
|
|
True if we have already added this block to the chain. This may return false for orphan blocks
|
|
that we have added but no longer keep in memory.
|
|
"""
|
|
return header_hash in self.__block_records
|
|
|
|
def block_record(self, header_hash: bytes32) -> BlockRecord:
|
|
return self.__block_records[header_hash]
|
|
|
|
def height_to_block_record(self, height: uint32, check_db=False) -> BlockRecord:
|
|
header_hash = self.height_to_hash(height)
|
|
return self.block_record(header_hash)
|
|
|
|
def get_ses_heights(self) -> List[uint32]:
|
|
return sorted(self.__sub_epoch_summaries.keys())
|
|
|
|
def get_ses(self, height: uint32) -> SubEpochSummary:
|
|
return self.__sub_epoch_summaries[height]
|
|
|
|
def height_to_hash(self, height: uint32) -> Optional[bytes32]:
|
|
return self.__height_to_hash[height]
|
|
|
|
def contains_height(self, height: uint32) -> bool:
|
|
return height in self.__height_to_hash
|
|
|
|
def get_peak_height(self) -> Optional[uint32]:
|
|
return self._peak_height
|
|
|
|
async def warmup(self, fork_point: uint32):
|
|
"""
|
|
Loads blocks into the cache. The blocks loaded include all blocks from
|
|
fork point - BLOCKS_CACHE_SIZE up to and including the fork_point.
|
|
|
|
Args:
|
|
fork_point: the last block height to load in the cache
|
|
|
|
"""
|
|
|
|
if self._peak_height is None:
|
|
return None
|
|
blocks = await self.block_store.get_block_records_in_range(
|
|
fork_point - self.constants.BLOCKS_CACHE_SIZE, self._peak_height
|
|
)
|
|
for block_record in blocks.values():
|
|
self.add_block_record(block_record)
|
|
|
|
def clean_block_record(self, height: int):
|
|
"""
|
|
Clears all block records in the cache which have block_record < height.
|
|
Args:
|
|
height: Minimum height that we need to keep in the cache
|
|
"""
|
|
|
|
if height < 0:
|
|
return None
|
|
blocks_to_remove = self.__heights_in_cache.get(uint32(height), None)
|
|
while blocks_to_remove is not None and height >= 0:
|
|
for header_hash in blocks_to_remove:
|
|
del self.__block_records[header_hash]
|
|
del self.__heights_in_cache[uint32(height)] # remove height from heights in cache
|
|
|
|
height -= 1
|
|
blocks_to_remove = self.__heights_in_cache.get(uint32(height), None)
|
|
|
|
def clean_block_records(self):
|
|
"""
|
|
Cleans the cache so that we only maintain relevant blocks.
|
|
This removes block records that have height < peak - BLOCKS_CACHE_SIZE.
|
|
These blocks are necessary for calculating future difficulty adjustments.
|
|
"""
|
|
|
|
if len(self.__block_records) < self.constants.BLOCKS_CACHE_SIZE:
|
|
return None
|
|
|
|
peak = self.get_peak()
|
|
assert peak is not None
|
|
if peak.height - self.constants.BLOCKS_CACHE_SIZE < 0:
|
|
return None
|
|
self.clean_block_record(peak.height - self.constants.BLOCKS_CACHE_SIZE)
|
|
|
|
async def get_block_records_in_range(self, start: int, stop: int) -> Dict[bytes32, BlockRecord]:
|
|
return await self.block_store.get_block_records_in_range(start, stop)
|
|
|
|
async def get_header_blocks_in_range(
|
|
self, start: int, stop: int, tx_filter: bool = True
|
|
) -> Dict[bytes32, HeaderBlock]:
|
|
return await self.block_store.get_header_blocks_in_range(start, stop)
|
|
|
|
async def get_block_record_from_db(self, header_hash: bytes32) -> Optional[BlockRecord]:
|
|
if header_hash in self.__block_records:
|
|
return self.__block_records[header_hash]
|
|
return await self.block_store.get_block_record(header_hash)
|
|
|
|
def remove_block_record(self, header_hash: bytes32):
|
|
sbr = self.block_record(header_hash)
|
|
del self.__block_records[header_hash]
|
|
self.__heights_in_cache[sbr.height].remove(header_hash)
|
|
|
|
def add_block_record(self, block_record: BlockRecord):
|
|
self.__block_records[block_record.header_hash] = block_record
|
|
if block_record.height not in self.__heights_in_cache.keys():
|
|
self.__heights_in_cache[block_record.height] = set()
|
|
self.__heights_in_cache[block_record.height].add(block_record.header_hash)
|