307 lines
14 KiB
Python
307 lines
14 KiB
Python
import asyncio
|
|
import logging
|
|
import traceback
|
|
from concurrent.futures.process import ProcessPoolExecutor
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Sequence, Tuple, Union, Callable
|
|
|
|
from chia.consensus.block_header_validation import validate_finished_header_block
|
|
from chia.consensus.block_record import BlockRecord
|
|
from chia.consensus.blockchain_interface import BlockchainInterface
|
|
from chia.consensus.constants import ConsensusConstants
|
|
from chia.consensus.cost_calculator import NPCResult
|
|
from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty
|
|
from chia.consensus.full_block_to_block_record import block_to_block_record
|
|
from chia.consensus.get_block_challenge import get_block_challenge
|
|
from chia.consensus.pot_iterations import calculate_iterations_quality, is_overflow_block
|
|
from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions
|
|
from chia.types.blockchain_format.coin import Coin
|
|
from chia.types.blockchain_format.sized_bytes import bytes32
|
|
from chia.types.full_block import FullBlock
|
|
from chia.types.generator_types import BlockGenerator
|
|
from chia.types.header_block import HeaderBlock
|
|
from chia.util.block_cache import BlockCache
|
|
from chia.util.errors import Err
|
|
from chia.util.generator_tools import get_block_header, tx_removals_and_additions
|
|
from chia.util.ints import uint16, uint64, uint32
|
|
from chia.util.streamable import Streamable, dataclass_from_dict, streamable
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
@streamable
|
|
class PreValidationResult(Streamable):
|
|
error: Optional[uint16]
|
|
required_iters: Optional[uint64] # Iff error is None
|
|
npc_result: Optional[NPCResult] # Iff error is None and block is a transaction block
|
|
|
|
|
|
def batch_pre_validate_blocks(
|
|
constants_dict: Dict,
|
|
blocks_pickled: Dict[bytes, bytes],
|
|
full_blocks_pickled: Optional[List[bytes]],
|
|
header_blocks_pickled: Optional[List[bytes]],
|
|
prev_transaction_generators: List[Optional[bytes]],
|
|
npc_results: Dict[uint32, bytes],
|
|
check_filter: bool,
|
|
expected_difficulty: List[uint64],
|
|
expected_sub_slot_iters: List[uint64],
|
|
) -> List[bytes]:
|
|
blocks = {}
|
|
for k, v in blocks_pickled.items():
|
|
blocks[k] = BlockRecord.from_bytes(v)
|
|
results: List[PreValidationResult] = []
|
|
constants: ConsensusConstants = dataclass_from_dict(ConsensusConstants, constants_dict)
|
|
if full_blocks_pickled is not None and header_blocks_pickled is not None:
|
|
assert ValueError("Only one should be passed here")
|
|
if full_blocks_pickled is not None:
|
|
for i in range(len(full_blocks_pickled)):
|
|
try:
|
|
block: FullBlock = FullBlock.from_bytes(full_blocks_pickled[i])
|
|
tx_additions: List[Coin] = []
|
|
removals: List[bytes32] = []
|
|
npc_result: Optional[NPCResult] = None
|
|
if block.height in npc_results:
|
|
npc_result = NPCResult.from_bytes(npc_results[block.height])
|
|
assert npc_result is not None
|
|
if npc_result.npc_list is not None:
|
|
removals, tx_additions = tx_removals_and_additions(npc_result.npc_list)
|
|
else:
|
|
removals, tx_additions = [], []
|
|
|
|
if block.transactions_generator is not None and npc_result is None:
|
|
prev_generator_bytes = prev_transaction_generators[i]
|
|
assert prev_generator_bytes is not None
|
|
assert block.transactions_info is not None
|
|
block_generator: BlockGenerator = BlockGenerator.from_bytes(prev_generator_bytes)
|
|
assert block_generator.program == block.transactions_generator
|
|
npc_result = get_name_puzzle_conditions(
|
|
block_generator, min(constants.MAX_BLOCK_COST_CLVM, block.transactions_info.cost), True
|
|
)
|
|
removals, tx_additions = tx_removals_and_additions(npc_result.npc_list)
|
|
|
|
header_block = get_block_header(block, tx_additions, removals)
|
|
required_iters, error = validate_finished_header_block(
|
|
constants,
|
|
BlockCache(blocks),
|
|
header_block,
|
|
check_filter,
|
|
expected_difficulty[i],
|
|
expected_sub_slot_iters[i],
|
|
)
|
|
error_int: Optional[uint16] = None
|
|
if error is not None:
|
|
error_int = uint16(error.code.value)
|
|
|
|
results.append(PreValidationResult(error_int, required_iters, npc_result))
|
|
except Exception:
|
|
error_stack = traceback.format_exc()
|
|
log.error(f"Exception: {error_stack}")
|
|
results.append(PreValidationResult(uint16(Err.UNKNOWN.value), None, None))
|
|
elif header_blocks_pickled is not None:
|
|
for i in range(len(header_blocks_pickled)):
|
|
try:
|
|
header_block = HeaderBlock.from_bytes(header_blocks_pickled[i])
|
|
required_iters, error = validate_finished_header_block(
|
|
constants,
|
|
BlockCache(blocks),
|
|
header_block,
|
|
check_filter,
|
|
expected_difficulty[i],
|
|
expected_sub_slot_iters[i],
|
|
)
|
|
error_int = None
|
|
if error is not None:
|
|
error_int = uint16(error.code.value)
|
|
results.append(PreValidationResult(error_int, required_iters, None))
|
|
except Exception:
|
|
error_stack = traceback.format_exc()
|
|
log.error(f"Exception: {error_stack}")
|
|
results.append(PreValidationResult(uint16(Err.UNKNOWN.value), None, None))
|
|
return [bytes(r) for r in results]
|
|
|
|
|
|
async def pre_validate_blocks_multiprocessing(
|
|
constants: ConsensusConstants,
|
|
constants_json: Dict,
|
|
block_records: BlockchainInterface,
|
|
blocks: Sequence[Union[FullBlock, HeaderBlock]],
|
|
pool: ProcessPoolExecutor,
|
|
check_filter: bool,
|
|
npc_results: Dict[uint32, NPCResult],
|
|
get_block_generator: Optional[Callable],
|
|
batch_size: int,
|
|
) -> Optional[List[PreValidationResult]]:
|
|
"""
|
|
This method must be called under the blockchain lock
|
|
If all the full blocks pass pre-validation, (only validates header), returns the list of required iters.
|
|
if any validation issue occurs, returns False.
|
|
|
|
Args:
|
|
check_filter:
|
|
constants_json:
|
|
pool:
|
|
constants:
|
|
block_records:
|
|
blocks: list of full blocks to validate (must be connected to current chain)
|
|
npc_results
|
|
get_block_generator
|
|
"""
|
|
prev_b: Optional[BlockRecord] = None
|
|
# Collects all the recent blocks (up to the previous sub-epoch)
|
|
recent_blocks: Dict[bytes32, BlockRecord] = {}
|
|
recent_blocks_compressed: Dict[bytes32, BlockRecord] = {}
|
|
num_sub_slots_found = 0
|
|
num_blocks_seen = 0
|
|
if blocks[0].height > 0:
|
|
if not block_records.contains_block(blocks[0].prev_header_hash):
|
|
return [PreValidationResult(uint16(Err.INVALID_PREV_BLOCK_HASH.value), None, None)]
|
|
curr = block_records.block_record(blocks[0].prev_header_hash)
|
|
num_sub_slots_to_look_for = 3 if curr.overflow else 2
|
|
while (
|
|
curr.sub_epoch_summary_included is None
|
|
or num_blocks_seen < constants.NUMBER_OF_TIMESTAMPS
|
|
or num_sub_slots_found < num_sub_slots_to_look_for
|
|
) and curr.height > 0:
|
|
if num_blocks_seen < constants.NUMBER_OF_TIMESTAMPS or num_sub_slots_found < num_sub_slots_to_look_for:
|
|
recent_blocks_compressed[curr.header_hash] = curr
|
|
|
|
if curr.first_in_sub_slot:
|
|
assert curr.finished_challenge_slot_hashes is not None
|
|
num_sub_slots_found += len(curr.finished_challenge_slot_hashes)
|
|
recent_blocks[curr.header_hash] = curr
|
|
if curr.is_transaction_block:
|
|
num_blocks_seen += 1
|
|
curr = block_records.block_record(curr.prev_hash)
|
|
recent_blocks[curr.header_hash] = curr
|
|
recent_blocks_compressed[curr.header_hash] = curr
|
|
block_record_was_present = []
|
|
for block in blocks:
|
|
block_record_was_present.append(block_records.contains_block(block.header_hash))
|
|
|
|
diff_ssis: List[Tuple[uint64, uint64]] = []
|
|
for block in blocks:
|
|
if block.height != 0:
|
|
assert block_records.contains_block(block.prev_header_hash)
|
|
if prev_b is None:
|
|
prev_b = block_records.block_record(block.prev_header_hash)
|
|
|
|
sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty(
|
|
constants, len(block.finished_sub_slots) > 0, prev_b, block_records
|
|
)
|
|
|
|
overflow = is_overflow_block(constants, block.reward_chain_block.signage_point_index)
|
|
challenge = get_block_challenge(constants, block, BlockCache(recent_blocks), prev_b is None, overflow, False)
|
|
if block.reward_chain_block.challenge_chain_sp_vdf is None:
|
|
cc_sp_hash: bytes32 = challenge
|
|
else:
|
|
cc_sp_hash = block.reward_chain_block.challenge_chain_sp_vdf.output.get_hash()
|
|
q_str: Optional[bytes32] = block.reward_chain_block.proof_of_space.verify_and_get_quality_string(
|
|
constants, challenge, cc_sp_hash
|
|
)
|
|
if q_str is None:
|
|
for i, block_i in enumerate(blocks):
|
|
if not block_record_was_present[i] and block_records.contains_block(block_i.header_hash):
|
|
block_records.remove_block_record(block_i.header_hash)
|
|
return None
|
|
|
|
required_iters: uint64 = calculate_iterations_quality(
|
|
constants.DIFFICULTY_CONSTANT_FACTOR,
|
|
q_str,
|
|
block.reward_chain_block.proof_of_space.size,
|
|
difficulty,
|
|
cc_sp_hash,
|
|
)
|
|
|
|
block_rec = block_to_block_record(
|
|
constants,
|
|
block_records,
|
|
required_iters,
|
|
block,
|
|
None,
|
|
)
|
|
# Makes sure to not override the valid blocks already in block_records
|
|
if not block_records.contains_block(block_rec.header_hash):
|
|
block_records.add_block_record(block_rec) # Temporarily add block to dict
|
|
recent_blocks[block_rec.header_hash] = block_rec
|
|
recent_blocks_compressed[block_rec.header_hash] = block_rec
|
|
else:
|
|
recent_blocks[block_rec.header_hash] = block_records.block_record(block_rec.header_hash)
|
|
recent_blocks_compressed[block_rec.header_hash] = block_records.block_record(block_rec.header_hash)
|
|
prev_b = block_rec
|
|
diff_ssis.append((difficulty, sub_slot_iters))
|
|
|
|
block_dict: Dict[bytes32, Union[FullBlock, HeaderBlock]] = {}
|
|
for i, block in enumerate(blocks):
|
|
block_dict[block.header_hash] = block
|
|
if not block_record_was_present[i]:
|
|
block_records.remove_block_record(block.header_hash)
|
|
|
|
recent_sb_compressed_pickled = {bytes(k): bytes(v) for k, v in recent_blocks_compressed.items()}
|
|
npc_results_pickled = {}
|
|
for k, v in npc_results.items():
|
|
npc_results_pickled[k] = bytes(v)
|
|
futures = []
|
|
# Pool of workers to validate blocks concurrently
|
|
for i in range(0, len(blocks), batch_size):
|
|
end_i = min(i + batch_size, len(blocks))
|
|
blocks_to_validate = blocks[i:end_i]
|
|
if any([len(block.finished_sub_slots) > 0 for block in blocks_to_validate]):
|
|
final_pickled = {bytes(k): bytes(v) for k, v in recent_blocks.items()}
|
|
else:
|
|
final_pickled = recent_sb_compressed_pickled
|
|
b_pickled: Optional[List[bytes]] = None
|
|
hb_pickled: Optional[List[bytes]] = None
|
|
previous_generators: List[Optional[bytes]] = []
|
|
for block in blocks_to_validate:
|
|
# We ONLY add blocks which are in the past, based on header hashes (which are validated later) to the
|
|
# prev blocks dict. This is important since these blocks are assumed to be valid and are used as previous
|
|
# generator references
|
|
prev_blocks_dict: Dict[uint32, Union[FullBlock, HeaderBlock]] = {}
|
|
curr_b: Union[FullBlock, HeaderBlock] = block
|
|
|
|
while curr_b.prev_header_hash in block_dict:
|
|
curr_b = block_dict[curr_b.prev_header_hash]
|
|
prev_blocks_dict[curr_b.header_hash] = curr_b
|
|
|
|
if isinstance(block, FullBlock):
|
|
assert get_block_generator is not None
|
|
if b_pickled is None:
|
|
b_pickled = []
|
|
b_pickled.append(bytes(block))
|
|
try:
|
|
block_generator: Optional[BlockGenerator] = await get_block_generator(block, prev_blocks_dict)
|
|
except ValueError:
|
|
return None
|
|
if block_generator is not None:
|
|
previous_generators.append(bytes(block_generator))
|
|
else:
|
|
previous_generators.append(None)
|
|
else:
|
|
if hb_pickled is None:
|
|
hb_pickled = []
|
|
hb_pickled.append(bytes(block))
|
|
|
|
futures.append(
|
|
asyncio.get_running_loop().run_in_executor(
|
|
pool,
|
|
batch_pre_validate_blocks,
|
|
constants_json,
|
|
final_pickled,
|
|
b_pickled,
|
|
hb_pickled,
|
|
previous_generators,
|
|
npc_results_pickled,
|
|
check_filter,
|
|
[diff_ssis[j][0] for j in range(i, end_i)],
|
|
[diff_ssis[j][1] for j in range(i, end_i)],
|
|
)
|
|
)
|
|
# Collect all results into one flat list
|
|
return [
|
|
PreValidationResult.from_bytes(result)
|
|
for batch_result in (await asyncio.gather(*futures))
|
|
for result in batch_result
|
|
]
|