2017 lines
98 KiB
Python
2017 lines
98 KiB
Python
import asyncio
|
||
import dataclasses
|
||
import logging
|
||
import random
|
||
import time
|
||
import traceback
|
||
from pathlib import Path
|
||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
|
||
|
||
import aiosqlite
|
||
from blspy import AugSchemeMPL
|
||
|
||
import chia.server.ws_connection as ws # lgtm [py/import-and-import-from]
|
||
from chia.consensus.block_creation import unfinished_block_to_full_block
|
||
from chia.consensus.block_record import BlockRecord
|
||
from chia.consensus.blockchain import Blockchain, ReceiveBlockResult
|
||
from chia.consensus.constants import ConsensusConstants
|
||
from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty
|
||
from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary
|
||
from chia.consensus.multiprocess_validation import PreValidationResult
|
||
from chia.consensus.pot_iterations import calculate_sp_iters
|
||
from chia.full_node.block_store import BlockStore
|
||
from chia.full_node.bundle_tools import detect_potential_template_generator
|
||
from chia.full_node.coin_store import CoinStore
|
||
from chia.full_node.full_node_store import FullNodeStore
|
||
from chia.full_node.mempool_manager import MempoolManager
|
||
from chia.full_node.signage_point import SignagePoint
|
||
from chia.full_node.sync_store import SyncStore
|
||
from chia.full_node.weight_proof import WeightProofHandler
|
||
from chia.protocols import farmer_protocol, full_node_protocol, timelord_protocol, wallet_protocol
|
||
from chia.protocols.full_node_protocol import (
|
||
RejectBlocks,
|
||
RequestBlocks,
|
||
RespondBlock,
|
||
RespondBlocks,
|
||
RespondSignagePoint,
|
||
)
|
||
from chia.protocols.protocol_message_types import ProtocolMessageTypes
|
||
from chia.server.node_discovery import FullNodePeers
|
||
from chia.server.outbound_message import Message, NodeType, make_msg
|
||
from chia.server.server import ChiaServer
|
||
from chia.types.blockchain_format.classgroup import ClassgroupElement
|
||
from chia.types.blockchain_format.pool_target import PoolTarget
|
||
from chia.types.blockchain_format.sized_bytes import bytes32
|
||
from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary
|
||
from chia.types.blockchain_format.vdf import CompressibleVDFField, VDFInfo, VDFProof
|
||
from chia.types.end_of_slot_bundle import EndOfSubSlotBundle
|
||
from chia.types.full_block import FullBlock
|
||
from chia.types.header_block import HeaderBlock
|
||
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
|
||
from chia.types.spend_bundle import SpendBundle
|
||
from chia.types.unfinished_block import UnfinishedBlock
|
||
from chia.util.bech32m import encode_puzzle_hash
|
||
from chia.util.db_wrapper import DBWrapper
|
||
from chia.util.errors import ConsensusError, Err
|
||
from chia.util.ints import uint8, uint32, uint64, uint128
|
||
from chia.util.path import mkdir, path_from_root
|
||
from chia.util.safe_cancel_task import cancel_task_safe
|
||
from chia.util.profiler import profile_task
|
||
|
||
|
||
class FullNode:
|
||
block_store: BlockStore
|
||
full_node_store: FullNodeStore
|
||
full_node_peers: Optional[FullNodePeers]
|
||
sync_store: Any
|
||
coin_store: CoinStore
|
||
mempool_manager: MempoolManager
|
||
connection: aiosqlite.Connection
|
||
_sync_task: Optional[asyncio.Task]
|
||
_init_weight_proof: Optional[asyncio.Task] = None
|
||
blockchain: Blockchain
|
||
config: Dict
|
||
server: Any
|
||
log: logging.Logger
|
||
constants: ConsensusConstants
|
||
_shut_down: bool
|
||
root_path: Path
|
||
state_changed_callback: Optional[Callable]
|
||
timelord_lock: asyncio.Lock
|
||
initialized: bool
|
||
weight_proof_handler: Optional[WeightProofHandler]
|
||
|
||
def __init__(
|
||
self,
|
||
config: Dict,
|
||
root_path: Path,
|
||
consensus_constants: ConsensusConstants,
|
||
name: str = None,
|
||
):
|
||
self.initialized = False
|
||
self.root_path = root_path
|
||
self.config = config
|
||
self.server = None
|
||
self._shut_down = False # Set to true to close all infinite loops
|
||
self.constants = consensus_constants
|
||
self.pow_creation: Dict[uint32, asyncio.Event] = {}
|
||
self.state_changed_callback: Optional[Callable] = None
|
||
self.full_node_peers = None
|
||
self.sync_store = None
|
||
self.signage_point_times = [time.time() for _ in range(self.constants.NUM_SPS_SUB_SLOT)]
|
||
self.full_node_store = FullNodeStore(self.constants)
|
||
|
||
if name:
|
||
self.log = logging.getLogger(name)
|
||
else:
|
||
self.log = logging.getLogger(__name__)
|
||
|
||
db_path_replaced: str = config["database_path"].replace("CHALLENGE", config["selected_network"])
|
||
self.db_path = path_from_root(root_path, db_path_replaced)
|
||
mkdir(self.db_path.parent)
|
||
|
||
def _set_state_changed_callback(self, callback: Callable):
|
||
self.state_changed_callback = callback
|
||
|
||
async def _start(self):
|
||
self.timelord_lock = asyncio.Lock()
|
||
self.compact_vdf_sem = asyncio.Semaphore(4)
|
||
self.new_peak_sem = asyncio.Semaphore(8)
|
||
# create the store (db) and full node instance
|
||
self.connection = await aiosqlite.connect(self.db_path)
|
||
self.db_wrapper = DBWrapper(self.connection)
|
||
self.block_store = await BlockStore.create(self.db_wrapper)
|
||
self.sync_store = await SyncStore.create()
|
||
self.coin_store = await CoinStore.create(self.db_wrapper)
|
||
self.log.info("Initializing blockchain from disk")
|
||
start_time = time.time()
|
||
self.blockchain = await Blockchain.create(self.coin_store, self.block_store, self.constants)
|
||
self.mempool_manager = MempoolManager(self.coin_store, self.constants)
|
||
self.weight_proof_handler = None
|
||
self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())
|
||
|
||
if self.config.get("enable_profiler", False):
|
||
asyncio.create_task(profile_task(self.root_path, "node", self.log))
|
||
|
||
self._sync_task = None
|
||
self._segment_task = None
|
||
time_taken = time.time() - start_time
|
||
if self.blockchain.get_peak() is None:
|
||
self.log.info(f"Initialized with empty blockchain time taken: {int(time_taken)}s")
|
||
else:
|
||
self.log.info(
|
||
f"Blockchain initialized to peak {self.blockchain.get_peak().header_hash} height"
|
||
f" {self.blockchain.get_peak().height}, "
|
||
f"time taken: {int(time_taken)}s"
|
||
)
|
||
pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_peak())
|
||
assert len(pending_tx) == 0 # no pending transactions when starting up
|
||
|
||
peak: Optional[BlockRecord] = self.blockchain.get_peak()
|
||
self.uncompact_task = None
|
||
if peak is not None:
|
||
full_peak = await self.blockchain.get_full_peak()
|
||
await self.peak_post_processing(full_peak, peak, max(peak.height - 1, 0), None)
|
||
if self.config["send_uncompact_interval"] != 0:
|
||
sanitize_weight_proof_only = False
|
||
if "sanitize_weight_proof_only" in self.config:
|
||
sanitize_weight_proof_only = self.config["sanitize_weight_proof_only"]
|
||
assert self.config["target_uncompact_proofs"] != 0
|
||
self.uncompact_task = asyncio.create_task(
|
||
self.broadcast_uncompact_blocks(
|
||
self.config["send_uncompact_interval"],
|
||
self.config["target_uncompact_proofs"],
|
||
sanitize_weight_proof_only,
|
||
)
|
||
)
|
||
self.initialized = True
|
||
if self.full_node_peers is not None:
|
||
asyncio.create_task(self.full_node_peers.start())
|
||
|
||
async def initialize_weight_proof(self):
|
||
self.weight_proof_handler = WeightProofHandler(self.constants, self.blockchain)
|
||
peak = self.blockchain.get_peak()
|
||
if peak is not None:
|
||
await self.weight_proof_handler.create_sub_epoch_segments()
|
||
|
||
def set_server(self, server: ChiaServer):
|
||
self.server = server
|
||
dns_servers = []
|
||
if "dns_servers" in self.config:
|
||
dns_servers = self.config["dns_servers"]
|
||
elif self.config["port"] == 8444:
|
||
# If `dns_servers` misses from the `config`, hardcode it if we're running mainnet.
|
||
dns_servers.append("dns-introducer.chia.net")
|
||
try:
|
||
self.full_node_peers = FullNodePeers(
|
||
self.server,
|
||
self.root_path,
|
||
self.config["target_peer_count"] - self.config["target_outbound_peer_count"],
|
||
self.config["target_outbound_peer_count"],
|
||
self.config["peer_db_path"],
|
||
self.config["introducer_peer"],
|
||
dns_servers,
|
||
self.config["peer_connect_interval"],
|
||
self.config["selected_network"],
|
||
self.log,
|
||
)
|
||
except Exception as e:
|
||
error_stack = traceback.format_exc()
|
||
self.log.error(f"Exception: {e}")
|
||
self.log.error(f"Exception in peer discovery: {e}")
|
||
self.log.error(f"Exception Stack: {error_stack}")
|
||
|
||
def _state_changed(self, change: str):
|
||
if self.state_changed_callback is not None:
|
||
self.state_changed_callback(change)
|
||
|
||
async def short_sync_batch(self, peer: ws.WSChiaConnection, start_height: uint32, target_height: uint32) -> bool:
|
||
"""
|
||
Tries to sync to a chain which is not too far in the future, by downloading batches of blocks. If the first
|
||
block that we download is not connected to our chain, we return False and do an expensive long sync instead.
|
||
Long sync is not preferred because it requires downloading and validating a weight proof.
|
||
|
||
Args:
|
||
peer: peer to sync from
|
||
start_height: height that we should start downloading at. (Our peak is higher)
|
||
target_height: target to sync to
|
||
|
||
Returns:
|
||
False if the fork point was not found, and we need to do a long sync. True otherwise.
|
||
|
||
"""
|
||
# Don't trigger multiple batch syncs to the same peer
|
||
|
||
if (
|
||
peer.peer_node_id in self.sync_store.backtrack_syncing
|
||
and self.sync_store.backtrack_syncing[peer.peer_node_id] > 0
|
||
):
|
||
return True # Don't batch sync, we are already in progress of a backtrack sync
|
||
if peer.peer_node_id in self.sync_store.batch_syncing:
|
||
return True # Don't trigger a long sync
|
||
self.sync_store.batch_syncing.add(peer.peer_node_id)
|
||
|
||
self.log.info(f"Starting batch short sync from {start_height} to height {target_height}")
|
||
if start_height > 0:
|
||
first = await peer.request_block(full_node_protocol.RequestBlock(uint32(start_height), False))
|
||
if first is None or not isinstance(first, full_node_protocol.RespondBlock):
|
||
self.sync_store.batch_syncing.remove(peer.peer_node_id)
|
||
raise ValueError(f"Error short batch syncing, could not fetch block at height {start_height}")
|
||
if not self.blockchain.contains_block(first.block.prev_header_hash):
|
||
self.log.info("Batch syncing stopped, this is a deep chain")
|
||
self.sync_store.batch_syncing.remove(peer.peer_node_id)
|
||
# First sb not connected to our blockchain, do a long sync instead
|
||
return False
|
||
|
||
batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS
|
||
if self._segment_task is not None and (not self._segment_task.done()):
|
||
try:
|
||
self._segment_task.cancel()
|
||
except Exception as e:
|
||
self.log.warning(f"failed to cancel segment task {e}")
|
||
self._segment_task = None
|
||
|
||
try:
|
||
for height in range(start_height, target_height, batch_size):
|
||
end_height = min(target_height, height + batch_size)
|
||
request = RequestBlocks(uint32(height), uint32(end_height), True)
|
||
response = await peer.request_blocks(request)
|
||
if not response:
|
||
raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}")
|
||
async with self.blockchain.lock:
|
||
success, advanced_peak, fork_height = await self.receive_block_batch(response.blocks, peer, None)
|
||
if not success:
|
||
raise ValueError(f"Error short batch syncing, failed to validate blocks {height}-{end_height}")
|
||
if advanced_peak:
|
||
peak = self.blockchain.get_peak()
|
||
peak_fb: Optional[FullBlock] = await self.blockchain.get_full_peak()
|
||
assert peak is not None and peak_fb is not None and fork_height is not None
|
||
await self.peak_post_processing(peak_fb, peak, fork_height, peer)
|
||
self.log.info(f"Added blocks {height}-{end_height}")
|
||
except Exception:
|
||
self.sync_store.batch_syncing.remove(peer.peer_node_id)
|
||
raise
|
||
self.sync_store.batch_syncing.remove(peer.peer_node_id)
|
||
return True
|
||
|
||
async def short_sync_backtrack(
|
||
self, peer: ws.WSChiaConnection, peak_height: uint32, target_height: uint32, target_unf_hash: bytes32
|
||
):
|
||
"""
|
||
Performs a backtrack sync, where blocks are downloaded one at a time from newest to oldest. If we do not
|
||
find the fork point 5 deeper than our peak, we return False and do a long sync instead.
|
||
|
||
Args:
|
||
peer: peer to sync from
|
||
peak_height: height of our peak
|
||
target_height: target height
|
||
target_unf_hash: partial hash of the unfinished block of the target
|
||
|
||
Returns:
|
||
True iff we found the fork point, and we do not need to long sync.
|
||
"""
|
||
try:
|
||
if peer.peer_node_id not in self.sync_store.backtrack_syncing:
|
||
self.sync_store.backtrack_syncing[peer.peer_node_id] = 0
|
||
self.sync_store.backtrack_syncing[peer.peer_node_id] += 1
|
||
|
||
unfinished_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block(target_unf_hash)
|
||
curr_height: int = target_height
|
||
found_fork_point = False
|
||
responses = []
|
||
while curr_height > peak_height - 5:
|
||
# If we already have the unfinished block, don't fetch the transactions. In the normal case, we will
|
||
# already have the unfinished block, from when it was broadcast, so we just need to download the header,
|
||
# but not the transactions
|
||
fetch_tx: bool = unfinished_block is None or curr_height != target_height
|
||
curr = await peer.request_block(full_node_protocol.RequestBlock(uint32(curr_height), fetch_tx))
|
||
if curr is None:
|
||
raise ValueError(f"Failed to fetch block {curr_height} from {peer.get_peer_info()}, timed out")
|
||
if curr is None or not isinstance(curr, full_node_protocol.RespondBlock):
|
||
raise ValueError(
|
||
f"Failed to fetch block {curr_height} from {peer.get_peer_info()}, wrong type {type(curr)}"
|
||
)
|
||
responses.append(curr)
|
||
if self.blockchain.contains_block(curr.block.prev_header_hash) or curr_height == 0:
|
||
found_fork_point = True
|
||
break
|
||
curr_height -= 1
|
||
if found_fork_point:
|
||
for response in reversed(responses):
|
||
await self.respond_block(response, peer)
|
||
except Exception as e:
|
||
self.sync_store.backtrack_syncing[peer.peer_node_id] -= 1
|
||
raise e
|
||
|
||
self.sync_store.backtrack_syncing[peer.peer_node_id] -= 1
|
||
return found_fork_point
|
||
|
||
async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection):
|
||
"""
|
||
We have received a notification of a new peak from a peer. This happens either when we have just connected,
|
||
or when the peer has updated their peak.
|
||
|
||
Args:
|
||
request: information about the new peak
|
||
peer: peer that sent the message
|
||
|
||
"""
|
||
|
||
# Store this peak/peer combination in case we want to sync to it, and to keep track of peers
|
||
self.sync_store.peer_has_block(request.header_hash, peer.peer_node_id, request.weight, request.height, True)
|
||
|
||
if self.blockchain.contains_block(request.header_hash):
|
||
return None
|
||
|
||
# Not interested in less heavy peaks
|
||
peak: Optional[BlockRecord] = self.blockchain.get_peak()
|
||
curr_peak_height = uint32(0) if peak is None else peak.height
|
||
if peak is not None and peak.weight > request.weight:
|
||
return None
|
||
|
||
if self.sync_store.get_sync_mode():
|
||
# If peer connects while we are syncing, check if they have the block we are syncing towards
|
||
peak_sync_hash = self.sync_store.get_sync_target_hash()
|
||
peak_sync_height = self.sync_store.get_sync_target_height()
|
||
if peak_sync_hash is not None and request.header_hash != peak_sync_hash and peak_sync_height is not None:
|
||
peak_peers: Set[bytes32] = self.sync_store.get_peers_that_have_peak([peak_sync_hash])
|
||
# Don't ask if we already know this peer has the peak
|
||
if peer.peer_node_id not in peak_peers:
|
||
target_peak_response: Optional[RespondBlock] = await peer.request_block(
|
||
full_node_protocol.RequestBlock(uint32(peak_sync_height), False), timeout=10
|
||
)
|
||
if target_peak_response is not None and isinstance(target_peak_response, RespondBlock):
|
||
self.sync_store.peer_has_block(
|
||
peak_sync_hash,
|
||
peer.peer_node_id,
|
||
target_peak_response.block.weight,
|
||
peak_sync_height,
|
||
False,
|
||
)
|
||
else:
|
||
if request.height <= curr_peak_height + self.config["short_sync_blocks_behind_threshold"]:
|
||
# This is the normal case of receiving the next block
|
||
if await self.short_sync_backtrack(
|
||
peer, curr_peak_height, request.height, request.unfinished_reward_block_hash
|
||
):
|
||
return None
|
||
|
||
if request.height < self.constants.WEIGHT_PROOF_RECENT_BLOCKS:
|
||
# This is the case of syncing up more than a few blocks, at the start of the chain
|
||
# TODO(almog): fix weight proofs so they work at the beginning as well
|
||
self.log.debug("Doing batch sync, no backup")
|
||
await self.short_sync_batch(peer, uint32(0), request.height)
|
||
return None
|
||
|
||
if request.height < curr_peak_height + self.config["sync_blocks_behind_threshold"]:
|
||
# This case of being behind but not by so much
|
||
if await self.short_sync_batch(peer, uint32(max(curr_peak_height - 6, 0)), request.height):
|
||
return None
|
||
|
||
# This is the either the case where we were not able to sync successfully (for example, due to the fork
|
||
# point being in the past), or we are very far behind. Performs a long sync.
|
||
self._sync_task = asyncio.create_task(self._sync())
|
||
|
||
async def send_peak_to_timelords(
|
||
self, peak_block: Optional[FullBlock] = None, peer: Optional[ws.WSChiaConnection] = None
|
||
):
|
||
"""
|
||
Sends current peak to timelords
|
||
"""
|
||
if peak_block is None:
|
||
peak_block = await self.blockchain.get_full_peak()
|
||
if peak_block is not None:
|
||
peak = self.blockchain.block_record(peak_block.header_hash)
|
||
difficulty = self.blockchain.get_next_difficulty(peak.header_hash, False)
|
||
ses: Optional[SubEpochSummary] = next_sub_epoch_summary(
|
||
self.constants,
|
||
self.blockchain,
|
||
peak.required_iters,
|
||
peak_block,
|
||
True,
|
||
)
|
||
recent_rc = self.blockchain.get_recent_reward_challenges()
|
||
|
||
curr = peak
|
||
while not curr.is_challenge_block(self.constants) and not curr.first_in_sub_slot:
|
||
curr = self.blockchain.block_record(curr.prev_hash)
|
||
|
||
if curr.is_challenge_block(self.constants):
|
||
last_csb_or_eos = curr.total_iters
|
||
else:
|
||
last_csb_or_eos = curr.ip_sub_slot_total_iters(self.constants)
|
||
|
||
curr = peak
|
||
passed_ses_height_but_not_yet_included = True
|
||
while (curr.height % self.constants.SUB_EPOCH_BLOCKS) != 0:
|
||
if curr.sub_epoch_summary_included:
|
||
passed_ses_height_but_not_yet_included = False
|
||
curr = self.blockchain.block_record(curr.prev_hash)
|
||
if curr.sub_epoch_summary_included or curr.height == 0:
|
||
passed_ses_height_but_not_yet_included = False
|
||
|
||
timelord_new_peak: timelord_protocol.NewPeakTimelord = timelord_protocol.NewPeakTimelord(
|
||
peak_block.reward_chain_block,
|
||
difficulty,
|
||
peak.deficit,
|
||
peak.sub_slot_iters,
|
||
ses,
|
||
recent_rc,
|
||
last_csb_or_eos,
|
||
passed_ses_height_but_not_yet_included,
|
||
)
|
||
|
||
msg = make_msg(ProtocolMessageTypes.new_peak_timelord, timelord_new_peak)
|
||
if peer is None:
|
||
await self.server.send_to_all([msg], NodeType.TIMELORD)
|
||
else:
|
||
await self.server.send_to_specific([msg], peer.peer_node_id)
|
||
|
||
async def synced(self) -> bool:
|
||
curr: Optional[BlockRecord] = self.blockchain.get_peak()
|
||
if curr is None:
|
||
return False
|
||
|
||
while curr is not None and not curr.is_transaction_block:
|
||
curr = self.blockchain.try_block_record(curr.prev_hash)
|
||
|
||
now = time.time()
|
||
if (
|
||
curr is None
|
||
or curr.timestamp is None
|
||
or curr.timestamp < uint64(int(now - 60 * 7))
|
||
or self.sync_store.get_sync_mode()
|
||
):
|
||
return False
|
||
else:
|
||
return True
|
||
|
||
async def on_connect(self, connection: ws.WSChiaConnection):
|
||
"""
|
||
Whenever we connect to another node / wallet, send them our current heads. Also send heads to farmers
|
||
and challenges to timelords.
|
||
"""
|
||
|
||
self._state_changed("add_connection")
|
||
self._state_changed("sync_mode")
|
||
if self.full_node_peers is not None:
|
||
asyncio.create_task(self.full_node_peers.on_connect(connection))
|
||
|
||
if self.initialized is False:
|
||
return None
|
||
|
||
if connection.connection_type is NodeType.FULL_NODE:
|
||
# Send filter to node and request mempool items that are not in it (Only if we are currently synced)
|
||
synced = await self.synced()
|
||
peak_height = self.blockchain.get_peak_height()
|
||
current_time = int(time.time())
|
||
if synced and peak_height is not None and current_time > self.constants.INITIAL_FREEZE_END_TIMESTAMP:
|
||
my_filter = self.mempool_manager.get_filter()
|
||
mempool_request = full_node_protocol.RequestMempoolTransactions(my_filter)
|
||
|
||
msg = make_msg(ProtocolMessageTypes.request_mempool_transactions, mempool_request)
|
||
await connection.send_message(msg)
|
||
|
||
peak_full: Optional[FullBlock] = await self.blockchain.get_full_peak()
|
||
|
||
if peak_full is not None:
|
||
peak: BlockRecord = self.blockchain.block_record(peak_full.header_hash)
|
||
if connection.connection_type is NodeType.FULL_NODE:
|
||
request_node = full_node_protocol.NewPeak(
|
||
peak.header_hash,
|
||
peak.height,
|
||
peak.weight,
|
||
peak.height,
|
||
peak_full.reward_chain_block.get_unfinished().get_hash(),
|
||
)
|
||
await connection.send_message(make_msg(ProtocolMessageTypes.new_peak, request_node))
|
||
|
||
elif connection.connection_type is NodeType.WALLET:
|
||
# If connected to a wallet, send the Peak
|
||
request_wallet = wallet_protocol.NewPeakWallet(
|
||
peak.header_hash,
|
||
peak.height,
|
||
peak.weight,
|
||
peak.height,
|
||
)
|
||
await connection.send_message(make_msg(ProtocolMessageTypes.new_peak_wallet, request_wallet))
|
||
elif connection.connection_type is NodeType.TIMELORD:
|
||
await self.send_peak_to_timelords()
|
||
|
||
def on_disconnect(self, connection: ws.WSChiaConnection):
|
||
self.log.info(f"peer disconnected {connection.get_peer_info()}")
|
||
self._state_changed("close_connection")
|
||
self._state_changed("sync_mode")
|
||
if self.sync_store is not None:
|
||
self.sync_store.peer_disconnected(connection.peer_node_id)
|
||
|
||
def _num_needed_peers(self) -> int:
|
||
assert self.server is not None
|
||
assert self.server.all_connections is not None
|
||
diff = self.config["target_peer_count"] - len(self.server.all_connections)
|
||
return diff if diff >= 0 else 0
|
||
|
||
def _close(self):
|
||
self._shut_down = True
|
||
if self._init_weight_proof is not None:
|
||
self._init_weight_proof.cancel()
|
||
if self.blockchain is not None:
|
||
self.blockchain.shut_down()
|
||
if self.mempool_manager is not None:
|
||
self.mempool_manager.shut_down()
|
||
if self.full_node_peers is not None:
|
||
asyncio.create_task(self.full_node_peers.close())
|
||
if self.uncompact_task is not None:
|
||
self.uncompact_task.cancel()
|
||
|
||
async def _await_closed(self):
|
||
cancel_task_safe(self._sync_task, self.log)
|
||
for task_id, task in list(self.full_node_store.tx_fetch_tasks.items()):
|
||
cancel_task_safe(task, self.log)
|
||
await self.connection.close()
|
||
if self._init_weight_proof is not None:
|
||
await asyncio.wait([self._init_weight_proof])
|
||
|
||
async def _sync(self):
|
||
"""
|
||
Performs a full sync of the blockchain up to the peak.
|
||
- Wait a few seconds for peers to send us their peaks
|
||
- Select the heaviest peak, and request a weight proof from a peer with that peak
|
||
- Validate the weight proof, and disconnect from the peer if invalid
|
||
- Find the fork point to see where to start downloading blocks
|
||
- Download blocks in batch (and in parallel) and verify them one at a time
|
||
- Disconnect peers that provide invalid blocks or don't have the blocks
|
||
"""
|
||
if self.weight_proof_handler is None:
|
||
return None
|
||
# Ensure we are only syncing once and not double calling this method
|
||
if self.sync_store.get_sync_mode():
|
||
return None
|
||
|
||
if self.sync_store.get_long_sync():
|
||
self.log.debug("already in long sync")
|
||
return None
|
||
|
||
self.sync_store.set_long_sync(True)
|
||
self.log.debug("long sync started")
|
||
try:
|
||
self.log.info("Starting to perform sync.")
|
||
self.log.info("Waiting to receive peaks from peers.")
|
||
|
||
# Wait until we have 3 peaks or up to a max of 30 seconds
|
||
peaks = []
|
||
for i in range(300):
|
||
peaks = [tup[0] for tup in self.sync_store.get_peak_of_each_peer().values()]
|
||
if len(self.sync_store.get_peers_that_have_peak(peaks)) < 3:
|
||
if self._shut_down:
|
||
return None
|
||
await asyncio.sleep(0.1)
|
||
|
||
self.log.info(f"Collected a total of {len(peaks)} peaks.")
|
||
self.sync_peers_handler = None
|
||
|
||
# Based on responses from peers about the current peaks, see which peak is the heaviest
|
||
# (similar to longest chain rule).
|
||
target_peak = self.sync_store.get_heaviest_peak()
|
||
|
||
if target_peak is None:
|
||
raise RuntimeError("Not performing sync, no peaks collected")
|
||
heaviest_peak_hash, heaviest_peak_height, heaviest_peak_weight = target_peak
|
||
self.sync_store.set_peak_target(heaviest_peak_hash, heaviest_peak_height)
|
||
|
||
self.log.info(f"Selected peak {heaviest_peak_height}, {heaviest_peak_hash}")
|
||
# Check which peers are updated to this height
|
||
|
||
peers = []
|
||
coroutines = []
|
||
for peer in self.server.all_connections.values():
|
||
if peer.connection_type == NodeType.FULL_NODE:
|
||
peers.append(peer.peer_node_id)
|
||
coroutines.append(
|
||
peer.request_block(
|
||
full_node_protocol.RequestBlock(uint32(heaviest_peak_height), True), timeout=10
|
||
)
|
||
)
|
||
for i, target_peak_response in enumerate(await asyncio.gather(*coroutines)):
|
||
if target_peak_response is not None and isinstance(target_peak_response, RespondBlock):
|
||
self.sync_store.peer_has_block(
|
||
heaviest_peak_hash, peers[i], heaviest_peak_weight, heaviest_peak_height, False
|
||
)
|
||
# TODO: disconnect from peer which gave us the heaviest_peak, if nobody has the peak
|
||
|
||
peer_ids: Set[bytes32] = self.sync_store.get_peers_that_have_peak([heaviest_peak_hash])
|
||
peers_with_peak: List = [c for c in self.server.all_connections.values() if c.peer_node_id in peer_ids]
|
||
|
||
# Request weight proof from a random peer
|
||
self.log.info(f"Total of {len(peers_with_peak)} peers with peak {heaviest_peak_height}")
|
||
weight_proof_peer = random.choice(peers_with_peak)
|
||
self.log.info(
|
||
f"Requesting weight proof from peer {weight_proof_peer.peer_host} up to height"
|
||
f" {heaviest_peak_height}"
|
||
)
|
||
|
||
if self.blockchain.get_peak() is not None and heaviest_peak_weight <= self.blockchain.get_peak().weight:
|
||
raise ValueError("Not performing sync, already caught up.")
|
||
|
||
wp_timeout = 360
|
||
if "weight_proof_timeout" in self.config:
|
||
wp_timeout = self.config["weight_proof_timeout"]
|
||
self.log.debug(f"weight proof timeout is {wp_timeout} sec")
|
||
request = full_node_protocol.RequestProofOfWeight(heaviest_peak_height, heaviest_peak_hash)
|
||
response = await weight_proof_peer.request_proof_of_weight(request, timeout=wp_timeout)
|
||
|
||
# Disconnect from this peer, because they have not behaved properly
|
||
if response is None or not isinstance(response, full_node_protocol.RespondProofOfWeight):
|
||
await weight_proof_peer.close(600)
|
||
raise RuntimeError(f"Weight proof did not arrive in time from peer: {weight_proof_peer.peer_host}")
|
||
if response.wp.recent_chain_data[-1].reward_chain_block.height != heaviest_peak_height:
|
||
await weight_proof_peer.close(600)
|
||
raise RuntimeError(f"Weight proof had the wrong height: {weight_proof_peer.peer_host}")
|
||
if response.wp.recent_chain_data[-1].reward_chain_block.weight != heaviest_peak_weight:
|
||
await weight_proof_peer.close(600)
|
||
raise RuntimeError(f"Weight proof had the wrong weight: {weight_proof_peer.peer_host}")
|
||
|
||
# dont sync to wp if local peak is heavier,
|
||
# dont ban peer, we asked for this peak
|
||
current_peak = self.blockchain.get_peak()
|
||
if current_peak is not None:
|
||
if response.wp.recent_chain_data[-1].reward_chain_block.weight <= current_peak.weight:
|
||
raise RuntimeError(f"current peak is heavier than Weight proof peek: {weight_proof_peer.peer_host}")
|
||
|
||
try:
|
||
validated, fork_point, summaries = await self.weight_proof_handler.validate_weight_proof(response.wp)
|
||
except Exception as e:
|
||
await weight_proof_peer.close(600)
|
||
raise ValueError(f"Weight proof validation threw an error {e}")
|
||
|
||
if not validated:
|
||
await weight_proof_peer.close(600)
|
||
raise ValueError("Weight proof validation failed")
|
||
|
||
self.log.info(f"Re-checked peers: total of {len(peers_with_peak)} peers with peak {heaviest_peak_height}")
|
||
self.sync_store.set_sync_mode(True)
|
||
self._state_changed("sync_mode")
|
||
# Ensures that the fork point does not change
|
||
async with self.blockchain.lock:
|
||
await self.blockchain.warmup(fork_point)
|
||
await self.sync_from_fork_point(fork_point, heaviest_peak_height, heaviest_peak_hash, summaries)
|
||
except asyncio.CancelledError:
|
||
self.log.warning("Syncing failed, CancelledError")
|
||
except Exception as e:
|
||
tb = traceback.format_exc()
|
||
self.log.error(f"Error with syncing: {type(e)}{tb}")
|
||
finally:
|
||
if self._shut_down:
|
||
return None
|
||
await self._finish_sync()
|
||
|
||
async def sync_from_fork_point(
|
||
self,
|
||
fork_point_height: int,
|
||
target_peak_sb_height: uint32,
|
||
peak_hash: bytes32,
|
||
summaries: List[SubEpochSummary],
|
||
):
|
||
self.log.info(f"Start syncing from fork point at {fork_point_height} up to {target_peak_sb_height}")
|
||
peer_ids: Set[bytes32] = self.sync_store.get_peers_that_have_peak([peak_hash])
|
||
peers_with_peak: List = [c for c in self.server.all_connections.values() if c.peer_node_id in peer_ids]
|
||
|
||
if len(peers_with_peak) == 0:
|
||
raise RuntimeError(f"Not syncing, no peers with header_hash {peak_hash} ")
|
||
advanced_peak = False
|
||
batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS
|
||
|
||
our_peak_height = self.blockchain.get_peak_height()
|
||
ses_heigths = self.blockchain.get_ses_heights()
|
||
if len(ses_heigths) > 2 and our_peak_height is not None:
|
||
ses_heigths.sort()
|
||
max_fork_ses_height = ses_heigths[-3]
|
||
# This is fork point in SES in case where fork was not detected
|
||
if self.blockchain.get_peak_height() is not None and fork_point_height == max_fork_ses_height:
|
||
for peer in peers_with_peak:
|
||
# Grab a block at peak + 1 and check if fork point is actually our current height
|
||
block_response: Optional[Any] = await peer.request_block(
|
||
full_node_protocol.RequestBlock(uint32(our_peak_height + 1), True)
|
||
)
|
||
if block_response is not None and isinstance(block_response, full_node_protocol.RespondBlock):
|
||
peak = self.blockchain.get_peak()
|
||
if peak is not None and block_response.block.prev_header_hash == peak.header_hash:
|
||
fork_point_height = our_peak_height
|
||
break
|
||
|
||
for i in range(fork_point_height, target_peak_sb_height, batch_size):
|
||
start_height = i
|
||
end_height = min(target_peak_sb_height, start_height + batch_size)
|
||
request = RequestBlocks(uint32(start_height), uint32(end_height), True)
|
||
self.log.info(f"Requesting blocks: {start_height} to {end_height}")
|
||
batch_added = False
|
||
to_remove = []
|
||
for peer in peers_with_peak:
|
||
if peer.closed:
|
||
to_remove.append(peer)
|
||
continue
|
||
response = await peer.request_blocks(request, timeout=60)
|
||
if response is None:
|
||
await peer.close()
|
||
to_remove.append(peer)
|
||
continue
|
||
if isinstance(response, RejectBlocks):
|
||
to_remove.append(peer)
|
||
continue
|
||
elif isinstance(response, RespondBlocks):
|
||
success, advanced_peak, _ = await self.receive_block_batch(
|
||
response.blocks, peer, None if advanced_peak else uint32(fork_point_height), summaries
|
||
)
|
||
if success is False:
|
||
await peer.close(600)
|
||
continue
|
||
else:
|
||
batch_added = True
|
||
break
|
||
|
||
peak = self.blockchain.get_peak()
|
||
assert peak is not None
|
||
msg = make_msg(
|
||
ProtocolMessageTypes.new_peak_wallet,
|
||
wallet_protocol.NewPeakWallet(
|
||
peak.header_hash,
|
||
peak.height,
|
||
peak.weight,
|
||
uint32(max(peak.height - 1, uint32(0))),
|
||
),
|
||
)
|
||
await self.server.send_to_all([msg], NodeType.WALLET)
|
||
|
||
for peer in to_remove:
|
||
peers_with_peak.remove(peer)
|
||
|
||
if self.sync_store.peers_changed.is_set():
|
||
peer_ids = self.sync_store.get_peers_that_have_peak([peak_hash])
|
||
peers_with_peak = [c for c in self.server.all_connections.values() if c.peer_node_id in peer_ids]
|
||
self.log.info(f"Number of peers we are syncing from: {len(peers_with_peak)}")
|
||
self.sync_store.peers_changed.clear()
|
||
|
||
if batch_added is False:
|
||
self.log.info(f"Failed to fetch blocks {start_height} to {end_height} from peers: {peers_with_peak}")
|
||
break
|
||
else:
|
||
self.log.info(f"Added blocks {start_height} to {end_height}")
|
||
self.blockchain.clean_block_record(
|
||
min(
|
||
end_height - self.constants.BLOCKS_CACHE_SIZE,
|
||
peak.height - self.constants.BLOCKS_CACHE_SIZE,
|
||
)
|
||
)
|
||
|
||
async def receive_block_batch(
|
||
self,
|
||
all_blocks: List[FullBlock],
|
||
peer: ws.WSChiaConnection,
|
||
fork_point: Optional[uint32],
|
||
wp_summaries: Optional[List[SubEpochSummary]] = None,
|
||
) -> Tuple[bool, bool, Optional[uint32]]:
|
||
advanced_peak = False
|
||
fork_height: Optional[uint32] = uint32(0)
|
||
|
||
blocks_to_validate: List[FullBlock] = []
|
||
for i, block in enumerate(all_blocks):
|
||
if not self.blockchain.contains_block(block.header_hash):
|
||
blocks_to_validate = all_blocks[i:]
|
||
break
|
||
if len(blocks_to_validate) == 0:
|
||
return True, False, fork_height
|
||
|
||
pre_validate_start = time.time()
|
||
pre_validation_results: Optional[
|
||
List[PreValidationResult]
|
||
] = await self.blockchain.pre_validate_blocks_multiprocessing(blocks_to_validate, {})
|
||
self.log.debug(f"Block pre-validation time: {time.time() - pre_validate_start}")
|
||
if pre_validation_results is None:
|
||
return False, False, None
|
||
for i, block in enumerate(blocks_to_validate):
|
||
if pre_validation_results[i].error is not None:
|
||
self.log.error(
|
||
f"Invalid block from peer: {peer.get_peer_info()} {Err(pre_validation_results[i].error)}"
|
||
)
|
||
return False, advanced_peak, fork_height
|
||
|
||
for i, block in enumerate(blocks_to_validate):
|
||
assert pre_validation_results[i].required_iters is not None
|
||
(result, error, fork_height,) = await self.blockchain.receive_block(
|
||
block, pre_validation_results[i], None if advanced_peak else fork_point, wp_summaries
|
||
)
|
||
if result == ReceiveBlockResult.NEW_PEAK:
|
||
advanced_peak = True
|
||
elif result == ReceiveBlockResult.INVALID_BLOCK or result == ReceiveBlockResult.DISCONNECTED_BLOCK:
|
||
if error is not None:
|
||
self.log.error(f"Error: {error}, Invalid block from peer: {peer.get_peer_info()} ")
|
||
return False, advanced_peak, fork_height
|
||
block_record = self.blockchain.block_record(block.header_hash)
|
||
if block_record.sub_epoch_summary_included is not None:
|
||
if self.weight_proof_handler is not None:
|
||
await self.weight_proof_handler.create_prev_sub_epoch_segments()
|
||
if advanced_peak:
|
||
self._state_changed("new_peak")
|
||
self.log.debug(
|
||
f"Total time for {len(blocks_to_validate)} blocks: {time.time() - pre_validate_start}, "
|
||
f"advanced: {advanced_peak}"
|
||
)
|
||
return True, advanced_peak, fork_height
|
||
|
||
async def _finish_sync(self):
|
||
"""
|
||
Finalize sync by setting sync mode to False, clearing all sync information, and adding any final
|
||
blocks that we have finalized recently.
|
||
"""
|
||
self.log.info("long sync done")
|
||
self.sync_store.set_long_sync(False)
|
||
self.sync_store.set_sync_mode(False)
|
||
self._state_changed("sync_mode")
|
||
if self.server is None:
|
||
return None
|
||
|
||
peak: Optional[BlockRecord] = self.blockchain.get_peak()
|
||
async with self.blockchain.lock:
|
||
await self.sync_store.clear_sync_info()
|
||
|
||
peak_fb: FullBlock = await self.blockchain.get_full_peak()
|
||
if peak is not None:
|
||
await self.peak_post_processing(peak_fb, peak, peak.height - 1, None)
|
||
|
||
if peak is not None and self.weight_proof_handler is not None:
|
||
await self.weight_proof_handler.get_proof_of_weight(peak.header_hash)
|
||
self._state_changed("block")
|
||
|
||
def has_valid_pool_sig(self, block: Union[UnfinishedBlock, FullBlock]):
|
||
if (
|
||
block.foliage.foliage_block_data.pool_target
|
||
== PoolTarget(self.constants.GENESIS_PRE_FARM_POOL_PUZZLE_HASH, uint32(0))
|
||
and block.foliage.prev_block_hash != self.constants.GENESIS_CHALLENGE
|
||
and block.reward_chain_block.proof_of_space.pool_public_key is not None
|
||
):
|
||
if not AugSchemeMPL.verify(
|
||
block.reward_chain_block.proof_of_space.pool_public_key,
|
||
bytes(block.foliage.foliage_block_data.pool_target),
|
||
block.foliage.foliage_block_data.pool_signature,
|
||
):
|
||
return False
|
||
return True
|
||
|
||
async def signage_point_post_processing(
|
||
self,
|
||
request: full_node_protocol.RespondSignagePoint,
|
||
peer: ws.WSChiaConnection,
|
||
ip_sub_slot: Optional[EndOfSubSlotBundle],
|
||
):
|
||
self.log.info(
|
||
f"⏲️ Finished signage point {request.index_from_challenge}/"
|
||
f"{self.constants.NUM_SPS_SUB_SLOT}: "
|
||
f"CC: {request.challenge_chain_vdf.output.get_hash()} "
|
||
f"RC: {request.reward_chain_vdf.output.get_hash()} "
|
||
)
|
||
self.signage_point_times[request.index_from_challenge] = time.time()
|
||
sub_slot_tuple = self.full_node_store.get_sub_slot(request.challenge_chain_vdf.challenge)
|
||
if sub_slot_tuple is not None:
|
||
prev_challenge = sub_slot_tuple[0].challenge_chain.challenge_chain_end_of_slot_vdf.challenge
|
||
else:
|
||
prev_challenge = None
|
||
|
||
# Notify nodes of the new signage point
|
||
broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot(
|
||
prev_challenge,
|
||
request.challenge_chain_vdf.challenge,
|
||
request.index_from_challenge,
|
||
request.reward_chain_vdf.challenge,
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.new_signage_point_or_end_of_sub_slot, broadcast)
|
||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||
|
||
peak = self.blockchain.get_peak()
|
||
if peak is not None and peak.height > self.constants.MAX_SUB_SLOT_BLOCKS:
|
||
sub_slot_iters = peak.sub_slot_iters
|
||
difficulty = uint64(peak.weight - self.blockchain.block_record(peak.prev_hash).weight)
|
||
# Makes sure to potentially update the difficulty if we are past the peak (into a new sub-slot)
|
||
assert ip_sub_slot is not None
|
||
if request.challenge_chain_vdf.challenge != ip_sub_slot.challenge_chain.get_hash():
|
||
next_difficulty = self.blockchain.get_next_difficulty(peak.header_hash, True)
|
||
next_sub_slot_iters = self.blockchain.get_next_slot_iters(peak.header_hash, True)
|
||
difficulty = next_difficulty
|
||
sub_slot_iters = next_sub_slot_iters
|
||
else:
|
||
difficulty = self.constants.DIFFICULTY_STARTING
|
||
sub_slot_iters = self.constants.SUB_SLOT_ITERS_STARTING
|
||
|
||
# Notify farmers of the new signage point
|
||
broadcast_farmer = farmer_protocol.NewSignagePoint(
|
||
request.challenge_chain_vdf.challenge,
|
||
request.challenge_chain_vdf.output.get_hash(),
|
||
request.reward_chain_vdf.output.get_hash(),
|
||
difficulty,
|
||
sub_slot_iters,
|
||
request.index_from_challenge,
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.new_signage_point, broadcast_farmer)
|
||
await self.server.send_to_all([msg], NodeType.FARMER)
|
||
|
||
async def peak_post_processing(
|
||
self, block: FullBlock, record: BlockRecord, fork_height: uint32, peer: Optional[ws.WSChiaConnection]
|
||
):
|
||
"""
|
||
Must be called under self.blockchain.lock. This updates the internal state of the full node with the
|
||
latest peak information. It also notifies peers about the new peak.
|
||
"""
|
||
difficulty = self.blockchain.get_next_difficulty(record.header_hash, False)
|
||
sub_slot_iters = self.blockchain.get_next_slot_iters(record.header_hash, False)
|
||
|
||
self.log.info(
|
||
f"🌱 Updated peak to height {record.height}, weight {record.weight}, "
|
||
f"hh {record.header_hash}, "
|
||
f"forked at {fork_height}, rh: {record.reward_infusion_new_challenge}, "
|
||
f"total iters: {record.total_iters}, "
|
||
f"overflow: {record.overflow}, "
|
||
f"deficit: {record.deficit}, "
|
||
f"difficulty: {difficulty}, "
|
||
f"sub slot iters: {sub_slot_iters}, "
|
||
f"Generator size: "
|
||
f"{len(bytes(block.transactions_generator)) if block.transactions_generator else 'No tx'}, "
|
||
f"Generator ref list size: "
|
||
f"{len(block.transactions_generator_ref_list) if block.transactions_generator else 'No tx'}"
|
||
)
|
||
|
||
sub_slots = await self.blockchain.get_sp_and_ip_sub_slots(record.header_hash)
|
||
assert sub_slots is not None
|
||
|
||
if not self.sync_store.get_sync_mode():
|
||
self.blockchain.clean_block_records()
|
||
|
||
fork_block: Optional[BlockRecord] = None
|
||
if fork_height != block.height - 1 and block.height != 0:
|
||
# This is a reorg
|
||
fork_block = self.blockchain.block_record(self.blockchain.height_to_hash(fork_height))
|
||
|
||
added_eos, new_sps, new_ips = self.full_node_store.new_peak(
|
||
record,
|
||
block,
|
||
sub_slots[0],
|
||
sub_slots[1],
|
||
fork_block,
|
||
self.blockchain,
|
||
)
|
||
if sub_slots[1] is None:
|
||
assert record.ip_sub_slot_total_iters(self.constants) == 0
|
||
# Ensure the signage point is also in the store, for consistency
|
||
self.full_node_store.new_signage_point(
|
||
record.signage_point_index,
|
||
self.blockchain,
|
||
record,
|
||
record.sub_slot_iters,
|
||
SignagePoint(
|
||
block.reward_chain_block.challenge_chain_sp_vdf,
|
||
block.challenge_chain_sp_proof,
|
||
block.reward_chain_block.reward_chain_sp_vdf,
|
||
block.reward_chain_sp_proof,
|
||
),
|
||
skip_vdf_validation=True,
|
||
)
|
||
|
||
# Update the mempool (returns successful pending transactions added to the mempool)
|
||
for bundle, result, spend_name in await self.mempool_manager.new_peak(self.blockchain.get_peak()):
|
||
self.log.debug(f"Added transaction to mempool: {spend_name}")
|
||
mempool_item = self.mempool_manager.get_mempool_item(spend_name)
|
||
assert mempool_item is not None
|
||
fees = mempool_item.fee
|
||
assert fees >= 0
|
||
assert mempool_item.cost is not None
|
||
new_tx = full_node_protocol.NewTransaction(
|
||
spend_name,
|
||
mempool_item.cost,
|
||
uint64(bundle.fees()),
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.new_transaction, new_tx)
|
||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||
|
||
# If there were pending end of slots that happen after this peak, broadcast them if they are added
|
||
if added_eos is not None:
|
||
broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot(
|
||
added_eos.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
|
||
added_eos.challenge_chain.get_hash(),
|
||
uint8(0),
|
||
added_eos.reward_chain.end_of_slot_vdf.challenge,
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.new_signage_point_or_end_of_sub_slot, broadcast)
|
||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||
|
||
if new_sps is not None and peer is not None:
|
||
for index, sp in new_sps:
|
||
assert (
|
||
sp.cc_vdf is not None
|
||
and sp.cc_proof is not None
|
||
and sp.rc_vdf is not None
|
||
and sp.rc_proof is not None
|
||
)
|
||
await self.signage_point_post_processing(
|
||
RespondSignagePoint(index, sp.cc_vdf, sp.cc_proof, sp.rc_vdf, sp.rc_proof), peer, sub_slots[1]
|
||
)
|
||
|
||
# TODO: maybe add and broadcast new IPs as well
|
||
|
||
if record.height % 1000 == 0:
|
||
# Occasionally clear data in full node store to keep memory usage small
|
||
self.full_node_store.clear_seen_unfinished_blocks()
|
||
self.full_node_store.clear_old_cache_entries()
|
||
|
||
if self.sync_store.get_sync_mode() is False:
|
||
await self.send_peak_to_timelords(block)
|
||
|
||
# Tell full nodes about the new peak
|
||
msg = make_msg(
|
||
ProtocolMessageTypes.new_peak,
|
||
full_node_protocol.NewPeak(
|
||
record.header_hash,
|
||
record.height,
|
||
record.weight,
|
||
fork_height,
|
||
block.reward_chain_block.get_unfinished().get_hash(),
|
||
),
|
||
)
|
||
if peer is not None:
|
||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||
else:
|
||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||
|
||
# Tell wallets about the new peak
|
||
msg = make_msg(
|
||
ProtocolMessageTypes.new_peak_wallet,
|
||
wallet_protocol.NewPeakWallet(
|
||
record.header_hash,
|
||
record.height,
|
||
record.weight,
|
||
fork_height,
|
||
),
|
||
)
|
||
await self.server.send_to_all([msg], NodeType.WALLET)
|
||
|
||
# Check if we detected a spent transaction, to load up our generator cache
|
||
if block.transactions_generator is not None and self.full_node_store.previous_generator is None:
|
||
generator_arg = detect_potential_template_generator(block.height, block.transactions_generator)
|
||
if generator_arg:
|
||
self.log.info(f"Saving previous generator for height {block.height}")
|
||
self.full_node_store.previous_generator = generator_arg
|
||
|
||
self._state_changed("new_peak")
|
||
|
||
async def respond_block(
|
||
self,
|
||
respond_block: full_node_protocol.RespondBlock,
|
||
peer: Optional[ws.WSChiaConnection] = None,
|
||
) -> Optional[Message]:
|
||
"""
|
||
Receive a full block from a peer full node (or ourselves).
|
||
"""
|
||
block: FullBlock = respond_block.block
|
||
if self.sync_store.get_sync_mode():
|
||
return None
|
||
|
||
# Adds the block to seen, and check if it's seen before (which means header is in memory)
|
||
header_hash = block.header_hash
|
||
if self.blockchain.contains_block(header_hash):
|
||
return None
|
||
|
||
pre_validation_result: Optional[PreValidationResult] = None
|
||
if (
|
||
block.is_transaction_block()
|
||
and block.transactions_info is not None
|
||
and block.transactions_info.generator_root != bytes([0] * 32)
|
||
and block.transactions_generator is None
|
||
):
|
||
# This is the case where we already had the unfinished block, and asked for this block without
|
||
# the transactions (since we already had them). Therefore, here we add the transactions.
|
||
unfinished_rh: bytes32 = block.reward_chain_block.get_unfinished().get_hash()
|
||
unf_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block(unfinished_rh)
|
||
if (
|
||
unf_block is not None
|
||
and unf_block.transactions_generator is not None
|
||
and unf_block.foliage_transaction_block == block.foliage_transaction_block
|
||
):
|
||
pre_validation_result = self.full_node_store.get_unfinished_block_result(unfinished_rh)
|
||
assert pre_validation_result is not None
|
||
block = dataclasses.replace(
|
||
block,
|
||
transactions_generator=unf_block.transactions_generator,
|
||
transactions_generator_ref_list=unf_block.transactions_generator_ref_list,
|
||
)
|
||
else:
|
||
# We still do not have the correct information for this block, perhaps there is a duplicate block
|
||
# with the same unfinished block hash in the cache, so we need to fetch the correct one
|
||
if peer is None:
|
||
return None
|
||
|
||
block_response: Optional[Any] = await peer.request_block(
|
||
full_node_protocol.RequestBlock(block.height, True)
|
||
)
|
||
if block_response is None or not isinstance(block_response, full_node_protocol.RespondBlock):
|
||
self.log.warning(
|
||
f"Was not able to fetch the correct block for height {block.height} {block_response}"
|
||
)
|
||
return None
|
||
new_block: FullBlock = block_response.block
|
||
if new_block.foliage_transaction_block != block.foliage_transaction_block:
|
||
self.log.warning(f"Received the wrong block for height {block.height} {new_block.header_hash}")
|
||
return None
|
||
assert new_block.transactions_generator is not None
|
||
|
||
self.log.debug(
|
||
f"Wrong info in the cache for bh {new_block.header_hash}, there might be multiple blocks from the "
|
||
f"same farmer with the same pospace."
|
||
)
|
||
# This recursion ends here, we cannot recurse again because transactions_generator is not None
|
||
return await self.respond_block(block_response, peer)
|
||
|
||
async with self.blockchain.lock:
|
||
# After acquiring the lock, check again, because another asyncio thread might have added it
|
||
if self.blockchain.contains_block(header_hash):
|
||
return None
|
||
validation_start = time.time()
|
||
# Tries to add the block to the blockchain, if we already validated transactions, don't do it again
|
||
npc_results = {}
|
||
if pre_validation_result is not None and pre_validation_result.npc_result is not None:
|
||
npc_results[block.height] = pre_validation_result.npc_result
|
||
pre_validation_results: Optional[
|
||
List[PreValidationResult]
|
||
] = await self.blockchain.pre_validate_blocks_multiprocessing([block], npc_results)
|
||
if pre_validation_results is None:
|
||
raise ValueError(f"Failed to validate block {header_hash} height {block.height}")
|
||
if pre_validation_results[0].error is not None:
|
||
if Err(pre_validation_results[0].error) == Err.INVALID_PREV_BLOCK_HASH:
|
||
added: ReceiveBlockResult = ReceiveBlockResult.DISCONNECTED_BLOCK
|
||
error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH
|
||
fork_height: Optional[uint32] = None
|
||
else:
|
||
raise ValueError(
|
||
f"Failed to validate block {header_hash} height "
|
||
f"{block.height}: {Err(pre_validation_results[0].error).name}"
|
||
)
|
||
else:
|
||
result_to_validate = (
|
||
pre_validation_results[0] if pre_validation_result is None else pre_validation_result
|
||
)
|
||
assert result_to_validate.required_iters == pre_validation_results[0].required_iters
|
||
added, error_code, fork_height = await self.blockchain.receive_block(block, result_to_validate, None)
|
||
if (
|
||
self.full_node_store.previous_generator is not None
|
||
and fork_height is not None
|
||
and fork_height < self.full_node_store.previous_generator.block_height
|
||
):
|
||
self.full_node_store.previous_generator = None
|
||
validation_time = time.time() - validation_start
|
||
|
||
if added == ReceiveBlockResult.ALREADY_HAVE_BLOCK:
|
||
return None
|
||
elif added == ReceiveBlockResult.INVALID_BLOCK:
|
||
assert error_code is not None
|
||
self.log.error(f"Block {header_hash} at height {block.height} is invalid with code {error_code}.")
|
||
raise ConsensusError(error_code, header_hash)
|
||
|
||
elif added == ReceiveBlockResult.DISCONNECTED_BLOCK:
|
||
self.log.info(f"Disconnected block {header_hash} at height {block.height}")
|
||
return None
|
||
elif added == ReceiveBlockResult.NEW_PEAK:
|
||
# Only propagate blocks which extend the blockchain (becomes one of the heads)
|
||
new_peak: Optional[BlockRecord] = self.blockchain.get_peak()
|
||
assert new_peak is not None and fork_height is not None
|
||
|
||
await self.peak_post_processing(block, new_peak, fork_height, peer)
|
||
|
||
elif added == ReceiveBlockResult.ADDED_AS_ORPHAN:
|
||
self.log.info(
|
||
f"Received orphan block of height {block.height} rh " f"{block.reward_chain_block.get_hash()}"
|
||
)
|
||
else:
|
||
# Should never reach here, all the cases are covered
|
||
raise RuntimeError(f"Invalid result from receive_block {added}")
|
||
percent_full_str = (
|
||
(
|
||
", percent full: "
|
||
+ str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3))
|
||
+ "%"
|
||
)
|
||
if block.transactions_info is not None
|
||
else ""
|
||
)
|
||
self.log.info(
|
||
f"Block validation time: {validation_time}, "
|
||
f"cost: {block.transactions_info.cost if block.transactions_info is not None else 'None'}"
|
||
f"{percent_full_str}"
|
||
)
|
||
|
||
# This code path is reached if added == ADDED_AS_ORPHAN or NEW_TIP
|
||
peak = self.blockchain.get_peak()
|
||
assert peak is not None
|
||
|
||
# Removes all temporary data for old blocks
|
||
clear_height = uint32(max(0, peak.height - 50))
|
||
self.full_node_store.clear_candidate_blocks_below(clear_height)
|
||
self.full_node_store.clear_unfinished_blocks_below(clear_height)
|
||
if peak.height % 1000 == 0 and not self.sync_store.get_sync_mode():
|
||
await self.sync_store.clear_sync_info() # Occasionally clear sync peer info
|
||
self._state_changed("block")
|
||
record = self.blockchain.block_record(block.header_hash)
|
||
if self.weight_proof_handler is not None and record.sub_epoch_summary_included is not None:
|
||
if self._segment_task is None or self._segment_task.done():
|
||
self._segment_task = asyncio.create_task(self.weight_proof_handler.create_prev_sub_epoch_segments())
|
||
return None
|
||
|
||
async def respond_unfinished_block(
|
||
self,
|
||
respond_unfinished_block: full_node_protocol.RespondUnfinishedBlock,
|
||
peer: Optional[ws.WSChiaConnection],
|
||
farmed_block: bool = False,
|
||
):
|
||
"""
|
||
We have received an unfinished block, either created by us, or from another peer.
|
||
We can validate it and if it's a good block, propagate it to other peers and
|
||
timelords.
|
||
"""
|
||
block = respond_unfinished_block.unfinished_block
|
||
|
||
if block.prev_header_hash != self.constants.GENESIS_CHALLENGE and not self.blockchain.contains_block(
|
||
block.prev_header_hash
|
||
):
|
||
# No need to request the parent, since the peer will send it to us anyway, via NewPeak
|
||
self.log.debug("Received a disconnected unfinished block")
|
||
return None
|
||
|
||
# Adds the unfinished block to seen, and check if it's seen before, to prevent
|
||
# processing it twice. This searches for the exact version of the unfinished block (there can be many different
|
||
# foliages for the same trunk). This is intentional, to prevent DOS attacks.
|
||
# Note that it does not require that this block was successfully processed
|
||
if self.full_node_store.seen_unfinished_block(block.get_hash()):
|
||
return None
|
||
|
||
block_hash = block.reward_chain_block.get_hash()
|
||
|
||
# This searched for the trunk hash (unfinished reward hash). If we have already added a block with the same
|
||
# hash, return
|
||
if self.full_node_store.get_unfinished_block(block_hash) is not None:
|
||
return None
|
||
|
||
peak: Optional[BlockRecord] = self.blockchain.get_peak()
|
||
if peak is not None:
|
||
if block.total_iters < peak.sp_total_iters(self.constants):
|
||
# This means this unfinished block is pretty far behind, it will not add weight to our chain
|
||
return None
|
||
|
||
if block.prev_header_hash == self.constants.GENESIS_CHALLENGE:
|
||
prev_b = None
|
||
else:
|
||
prev_b = self.blockchain.block_record(block.prev_header_hash)
|
||
|
||
# Count the blocks in sub slot, and check if it's a new epoch
|
||
if len(block.finished_sub_slots) > 0:
|
||
num_blocks_in_ss = 1 # Curr
|
||
else:
|
||
curr = self.blockchain.try_block_record(block.prev_header_hash)
|
||
num_blocks_in_ss = 2 # Curr and prev
|
||
while (curr is not None) and not curr.first_in_sub_slot:
|
||
curr = self.blockchain.try_block_record(curr.prev_hash)
|
||
num_blocks_in_ss += 1
|
||
|
||
if num_blocks_in_ss > self.constants.MAX_SUB_SLOT_BLOCKS:
|
||
# TODO: potentially allow overflow blocks here, which count for the next slot
|
||
self.log.warning("Too many blocks added, not adding block")
|
||
return None
|
||
|
||
async with self.blockchain.lock:
|
||
# TODO: pre-validate VDFs outside of lock
|
||
validation_start = time.time()
|
||
validate_result = await self.blockchain.validate_unfinished_block(block)
|
||
if validate_result.error is not None:
|
||
if validate_result.error == Err.COIN_AMOUNT_NEGATIVE.value:
|
||
# TODO: remove in the future, hotfix for 1.1.5 peers to not disconnect older peers
|
||
self.log.info(f"Consensus error {validate_result.error}, not disconnecting")
|
||
return
|
||
raise ConsensusError(Err(validate_result.error))
|
||
validation_time = time.time() - validation_start
|
||
|
||
assert validate_result.required_iters is not None
|
||
|
||
# Perform another check, in case we have already concurrently added the same unfinished block
|
||
if self.full_node_store.get_unfinished_block(block_hash) is not None:
|
||
return None
|
||
|
||
if block.prev_header_hash == self.constants.GENESIS_CHALLENGE:
|
||
height = uint32(0)
|
||
else:
|
||
height = uint32(self.blockchain.block_record(block.prev_header_hash).height + 1)
|
||
|
||
ses: Optional[SubEpochSummary] = next_sub_epoch_summary(
|
||
self.constants,
|
||
self.blockchain,
|
||
validate_result.required_iters,
|
||
block,
|
||
True,
|
||
)
|
||
|
||
self.full_node_store.add_unfinished_block(height, block, validate_result)
|
||
if farmed_block is True:
|
||
self.log.info(
|
||
f"🍀 ️Farmed unfinished_block {block_hash}, SP: {block.reward_chain_block.signage_point_index}, "
|
||
f"validation time: {validation_time}, "
|
||
f"cost: {block.transactions_info.cost if block.transactions_info else 'None'}"
|
||
)
|
||
else:
|
||
percent_full_str = (
|
||
(
|
||
", percent full: "
|
||
+ str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3))
|
||
+ "%"
|
||
)
|
||
if block.transactions_info is not None
|
||
else ""
|
||
)
|
||
self.log.info(
|
||
f"Added unfinished_block {block_hash}, not farmed by us,"
|
||
f" SP: {block.reward_chain_block.signage_point_index} farmer response time: "
|
||
f"{time.time() - self.signage_point_times[block.reward_chain_block.signage_point_index]}, "
|
||
f"Pool pk {encode_puzzle_hash(block.foliage.foliage_block_data.pool_target.puzzle_hash, 'xch')}, "
|
||
f"validation time: {validation_time}, "
|
||
f"cost: {block.transactions_info.cost if block.transactions_info else 'None'}"
|
||
f"{percent_full_str}"
|
||
)
|
||
|
||
sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty(
|
||
self.constants,
|
||
len(block.finished_sub_slots) > 0,
|
||
prev_b,
|
||
self.blockchain,
|
||
)
|
||
|
||
if block.reward_chain_block.signage_point_index == 0:
|
||
res = self.full_node_store.get_sub_slot(block.reward_chain_block.pos_ss_cc_challenge_hash)
|
||
if res is None:
|
||
if block.reward_chain_block.pos_ss_cc_challenge_hash == self.constants.GENESIS_CHALLENGE:
|
||
rc_prev = self.constants.GENESIS_CHALLENGE
|
||
else:
|
||
self.log.warning(f"Do not have sub slot {block.reward_chain_block.pos_ss_cc_challenge_hash}")
|
||
return None
|
||
else:
|
||
rc_prev = res[0].reward_chain.get_hash()
|
||
else:
|
||
assert block.reward_chain_block.reward_chain_sp_vdf is not None
|
||
rc_prev = block.reward_chain_block.reward_chain_sp_vdf.challenge
|
||
|
||
timelord_request = timelord_protocol.NewUnfinishedBlockTimelord(
|
||
block.reward_chain_block,
|
||
difficulty,
|
||
sub_slot_iters,
|
||
block.foliage,
|
||
ses,
|
||
rc_prev,
|
||
)
|
||
|
||
timelord_msg = make_msg(ProtocolMessageTypes.new_unfinished_block_timelord, timelord_request)
|
||
await self.server.send_to_all([timelord_msg], NodeType.TIMELORD)
|
||
|
||
full_node_request = full_node_protocol.NewUnfinishedBlock(block.reward_chain_block.get_hash())
|
||
msg = make_msg(ProtocolMessageTypes.new_unfinished_block, full_node_request)
|
||
if peer is not None:
|
||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||
else:
|
||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||
self._state_changed("unfinished_block")
|
||
|
||
async def new_infusion_point_vdf(
|
||
self, request: timelord_protocol.NewInfusionPointVDF, timelord_peer: Optional[ws.WSChiaConnection] = None
|
||
) -> Optional[Message]:
|
||
# Lookup unfinished blocks
|
||
unfinished_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block(
|
||
request.unfinished_reward_hash
|
||
)
|
||
|
||
if unfinished_block is None:
|
||
self.log.warning(
|
||
f"Do not have unfinished reward chain block {request.unfinished_reward_hash}, cannot finish."
|
||
)
|
||
return None
|
||
|
||
prev_b: Optional[BlockRecord] = None
|
||
|
||
target_rc_hash = request.reward_chain_ip_vdf.challenge
|
||
last_slot_cc_hash = request.challenge_chain_ip_vdf.challenge
|
||
|
||
# Backtracks through end of slot objects, should work for multiple empty sub slots
|
||
for eos, _, _ in reversed(self.full_node_store.finished_sub_slots):
|
||
if eos is not None and eos.reward_chain.get_hash() == target_rc_hash:
|
||
target_rc_hash = eos.reward_chain.end_of_slot_vdf.challenge
|
||
if target_rc_hash == self.constants.GENESIS_CHALLENGE:
|
||
prev_b = None
|
||
else:
|
||
# Find the prev block, starts looking backwards from the peak. target_rc_hash must be the hash of a block
|
||
# and not an end of slot (since we just looked through the slots and backtracked)
|
||
curr: Optional[BlockRecord] = self.blockchain.get_peak()
|
||
|
||
for _ in range(10):
|
||
if curr is None:
|
||
break
|
||
if curr.reward_infusion_new_challenge == target_rc_hash:
|
||
# Found our prev block
|
||
prev_b = curr
|
||
break
|
||
curr = self.blockchain.try_block_record(curr.prev_hash)
|
||
|
||
# If not found, cache keyed on prev block
|
||
if prev_b is None:
|
||
self.full_node_store.add_to_future_ip(request)
|
||
self.log.warning(f"Previous block is None, infusion point {request.reward_chain_ip_vdf.challenge}")
|
||
return None
|
||
|
||
finished_sub_slots: Optional[List[EndOfSubSlotBundle]] = self.full_node_store.get_finished_sub_slots(
|
||
self.blockchain,
|
||
prev_b,
|
||
last_slot_cc_hash,
|
||
)
|
||
if finished_sub_slots is None:
|
||
return None
|
||
|
||
sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty(
|
||
self.constants,
|
||
len(finished_sub_slots) > 0,
|
||
prev_b,
|
||
self.blockchain,
|
||
)
|
||
|
||
if unfinished_block.reward_chain_block.pos_ss_cc_challenge_hash == self.constants.GENESIS_CHALLENGE:
|
||
sub_slot_start_iters = uint128(0)
|
||
else:
|
||
ss_res = self.full_node_store.get_sub_slot(unfinished_block.reward_chain_block.pos_ss_cc_challenge_hash)
|
||
if ss_res is None:
|
||
self.log.warning(f"Do not have sub slot {unfinished_block.reward_chain_block.pos_ss_cc_challenge_hash}")
|
||
return None
|
||
_, _, sub_slot_start_iters = ss_res
|
||
sp_total_iters = uint128(
|
||
sub_slot_start_iters
|
||
+ calculate_sp_iters(
|
||
self.constants,
|
||
sub_slot_iters,
|
||
unfinished_block.reward_chain_block.signage_point_index,
|
||
)
|
||
)
|
||
|
||
block: FullBlock = unfinished_block_to_full_block(
|
||
unfinished_block,
|
||
request.challenge_chain_ip_vdf,
|
||
request.challenge_chain_ip_proof,
|
||
request.reward_chain_ip_vdf,
|
||
request.reward_chain_ip_proof,
|
||
request.infused_challenge_chain_ip_vdf,
|
||
request.infused_challenge_chain_ip_proof,
|
||
finished_sub_slots,
|
||
prev_b,
|
||
self.blockchain,
|
||
sp_total_iters,
|
||
difficulty,
|
||
)
|
||
if not self.has_valid_pool_sig(block):
|
||
self.log.warning("Trying to make a pre-farm block but height is not 0")
|
||
return None
|
||
try:
|
||
await self.respond_block(full_node_protocol.RespondBlock(block))
|
||
except Exception as e:
|
||
self.log.warning(f"Consensus error validating block: {e}")
|
||
if timelord_peer is not None:
|
||
# Only sends to the timelord who sent us this VDF, to reset them to the correct peak
|
||
await self.send_peak_to_timelords(peer=timelord_peer)
|
||
return None
|
||
|
||
async def respond_end_of_sub_slot(
|
||
self, request: full_node_protocol.RespondEndOfSubSlot, peer: ws.WSChiaConnection
|
||
) -> Tuple[Optional[Message], bool]:
|
||
|
||
fetched_ss = self.full_node_store.get_sub_slot(request.end_of_slot_bundle.challenge_chain.get_hash())
|
||
if fetched_ss is not None:
|
||
# Already have the sub-slot
|
||
return None, True
|
||
|
||
async with self.timelord_lock:
|
||
fetched_ss = self.full_node_store.get_sub_slot(
|
||
request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge
|
||
)
|
||
if (
|
||
(fetched_ss is None)
|
||
and request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge
|
||
!= self.constants.GENESIS_CHALLENGE
|
||
):
|
||
# If we don't have the prev, request the prev instead
|
||
full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot(
|
||
request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
|
||
uint8(0),
|
||
bytes([0] * 32),
|
||
)
|
||
return (
|
||
make_msg(ProtocolMessageTypes.request_signage_point_or_end_of_sub_slot, full_node_request),
|
||
False,
|
||
)
|
||
|
||
peak = self.blockchain.get_peak()
|
||
if peak is not None and peak.height > 2:
|
||
next_sub_slot_iters = self.blockchain.get_next_slot_iters(peak.header_hash, True)
|
||
next_difficulty = self.blockchain.get_next_difficulty(peak.header_hash, True)
|
||
else:
|
||
next_sub_slot_iters = self.constants.SUB_SLOT_ITERS_STARTING
|
||
next_difficulty = self.constants.DIFFICULTY_STARTING
|
||
|
||
# Adds the sub slot and potentially get new infusions
|
||
new_infusions = self.full_node_store.new_finished_sub_slot(
|
||
request.end_of_slot_bundle,
|
||
self.blockchain,
|
||
peak,
|
||
await self.blockchain.get_full_peak(),
|
||
)
|
||
# It may be an empty list, even if it's not None. Not None means added successfully
|
||
if new_infusions is not None:
|
||
self.log.info(
|
||
f"⏲️ Finished sub slot, SP {self.constants.NUM_SPS_SUB_SLOT}/{self.constants.NUM_SPS_SUB_SLOT}, "
|
||
f"{request.end_of_slot_bundle.challenge_chain.get_hash()}, "
|
||
f"number of sub-slots: {len(self.full_node_store.finished_sub_slots)}, "
|
||
f"RC hash: {request.end_of_slot_bundle.reward_chain.get_hash()}, "
|
||
f"Deficit {request.end_of_slot_bundle.reward_chain.deficit}"
|
||
)
|
||
# Notify full nodes of the new sub-slot
|
||
broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot(
|
||
request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
|
||
request.end_of_slot_bundle.challenge_chain.get_hash(),
|
||
uint8(0),
|
||
request.end_of_slot_bundle.reward_chain.end_of_slot_vdf.challenge,
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.new_signage_point_or_end_of_sub_slot, broadcast)
|
||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||
|
||
for infusion in new_infusions:
|
||
await self.new_infusion_point_vdf(infusion)
|
||
|
||
# Notify farmers of the new sub-slot
|
||
broadcast_farmer = farmer_protocol.NewSignagePoint(
|
||
request.end_of_slot_bundle.challenge_chain.get_hash(),
|
||
request.end_of_slot_bundle.challenge_chain.get_hash(),
|
||
request.end_of_slot_bundle.reward_chain.get_hash(),
|
||
next_difficulty,
|
||
next_sub_slot_iters,
|
||
uint8(0),
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.new_signage_point, broadcast_farmer)
|
||
await self.server.send_to_all([msg], NodeType.FARMER)
|
||
return None, True
|
||
else:
|
||
self.log.info(
|
||
f"End of slot not added CC challenge "
|
||
f"{request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge}"
|
||
)
|
||
return None, False
|
||
|
||
async def respond_transaction(
|
||
self,
|
||
transaction: SpendBundle,
|
||
spend_name: bytes32,
|
||
peer: Optional[ws.WSChiaConnection] = None,
|
||
test: bool = False,
|
||
) -> Tuple[MempoolInclusionStatus, Optional[Err]]:
|
||
if self.sync_store.get_sync_mode():
|
||
return MempoolInclusionStatus.FAILED, Err.NO_TRANSACTIONS_WHILE_SYNCING
|
||
if not test and not (await self.synced()):
|
||
return MempoolInclusionStatus.FAILED, Err.NO_TRANSACTIONS_WHILE_SYNCING
|
||
|
||
# No transactions in mempool in initial client. Remove 6 weeks after launch
|
||
if int(time.time()) <= self.constants.INITIAL_FREEZE_END_TIMESTAMP:
|
||
return MempoolInclusionStatus.FAILED, Err.INITIAL_TRANSACTION_FREEZE
|
||
|
||
if self.mempool_manager.seen(spend_name):
|
||
return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION
|
||
self.mempool_manager.add_and_maybe_pop_seen(spend_name)
|
||
self.log.debug(f"Processing transaction: {spend_name}")
|
||
# Ignore if syncing
|
||
if self.sync_store.get_sync_mode():
|
||
status = MempoolInclusionStatus.FAILED
|
||
error: Optional[Err] = Err.NO_TRANSACTIONS_WHILE_SYNCING
|
||
self.mempool_manager.remove_seen(spend_name)
|
||
else:
|
||
try:
|
||
cost_result = await self.mempool_manager.pre_validate_spendbundle(transaction)
|
||
except Exception as e:
|
||
self.mempool_manager.remove_seen(spend_name)
|
||
raise e
|
||
async with self.mempool_manager.lock:
|
||
if self.mempool_manager.get_spendbundle(spend_name) is not None:
|
||
self.mempool_manager.remove_seen(spend_name)
|
||
return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION
|
||
cost, status, error = await self.mempool_manager.add_spendbundle(transaction, cost_result, spend_name)
|
||
if status == MempoolInclusionStatus.SUCCESS:
|
||
self.log.debug(
|
||
f"Added transaction to mempool: {spend_name} mempool size: "
|
||
f"{self.mempool_manager.mempool.total_mempool_cost}"
|
||
)
|
||
# Only broadcast successful transactions, not pending ones. Otherwise it's a DOS
|
||
# vector.
|
||
mempool_item = self.mempool_manager.get_mempool_item(spend_name)
|
||
assert mempool_item is not None
|
||
fees = mempool_item.fee
|
||
assert fees >= 0
|
||
assert cost is not None
|
||
new_tx = full_node_protocol.NewTransaction(
|
||
spend_name,
|
||
cost,
|
||
fees,
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.new_transaction, new_tx)
|
||
if peer is None:
|
||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||
else:
|
||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||
else:
|
||
self.mempool_manager.remove_seen(spend_name)
|
||
self.log.debug(
|
||
f"Wasn't able to add transaction with id {spend_name}, " f"status {status} error: {error}"
|
||
)
|
||
return status, error
|
||
|
||
async def _needs_compact_proof(
|
||
self, vdf_info: VDFInfo, header_block: HeaderBlock, field_vdf: CompressibleVDFField
|
||
) -> bool:
|
||
if field_vdf == CompressibleVDFField.CC_EOS_VDF:
|
||
for sub_slot in header_block.finished_sub_slots:
|
||
if sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf == vdf_info:
|
||
if (
|
||
sub_slot.proofs.challenge_chain_slot_proof.witness_type == 0
|
||
and sub_slot.proofs.challenge_chain_slot_proof.normalized_to_identity
|
||
):
|
||
return False
|
||
return True
|
||
if field_vdf == CompressibleVDFField.ICC_EOS_VDF:
|
||
for sub_slot in header_block.finished_sub_slots:
|
||
if (
|
||
sub_slot.infused_challenge_chain is not None
|
||
and sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf == vdf_info
|
||
):
|
||
assert sub_slot.proofs.infused_challenge_chain_slot_proof is not None
|
||
if (
|
||
sub_slot.proofs.infused_challenge_chain_slot_proof.witness_type == 0
|
||
and sub_slot.proofs.infused_challenge_chain_slot_proof.normalized_to_identity
|
||
):
|
||
return False
|
||
return True
|
||
if field_vdf == CompressibleVDFField.CC_SP_VDF:
|
||
if header_block.reward_chain_block.challenge_chain_sp_vdf is None:
|
||
return False
|
||
if vdf_info == header_block.reward_chain_block.challenge_chain_sp_vdf:
|
||
assert header_block.challenge_chain_sp_proof is not None
|
||
if (
|
||
header_block.challenge_chain_sp_proof.witness_type == 0
|
||
and header_block.challenge_chain_sp_proof.normalized_to_identity
|
||
):
|
||
return False
|
||
return True
|
||
if field_vdf == CompressibleVDFField.CC_IP_VDF:
|
||
if vdf_info == header_block.reward_chain_block.challenge_chain_ip_vdf:
|
||
if (
|
||
header_block.challenge_chain_ip_proof.witness_type == 0
|
||
and header_block.challenge_chain_ip_proof.normalized_to_identity
|
||
):
|
||
return False
|
||
return True
|
||
return False
|
||
|
||
async def _can_accept_compact_proof(
|
||
self,
|
||
vdf_info: VDFInfo,
|
||
vdf_proof: VDFProof,
|
||
height: uint32,
|
||
header_hash: bytes32,
|
||
field_vdf: CompressibleVDFField,
|
||
) -> bool:
|
||
"""
|
||
- Checks if the provided proof is indeed compact.
|
||
- Checks if proof verifies given the vdf_info from the start of sub-slot.
|
||
- Checks if the provided vdf_info is correct, assuming it refers to the start of sub-slot.
|
||
- Checks if the existing proof was non-compact. Ignore this proof if we already have a compact proof.
|
||
"""
|
||
is_fully_compactified = await self.block_store.is_fully_compactified(header_hash)
|
||
if is_fully_compactified is None or is_fully_compactified:
|
||
self.log.info(f"Already compactified block: {header_hash}. Ignoring.")
|
||
return False
|
||
if vdf_proof.witness_type > 0 or not vdf_proof.normalized_to_identity:
|
||
self.log.error(f"Received vdf proof is not compact: {vdf_proof}.")
|
||
return False
|
||
if not vdf_proof.is_valid(self.constants, ClassgroupElement.get_default_element(), vdf_info):
|
||
self.log.error(f"Received compact vdf proof is not valid: {vdf_proof}.")
|
||
return False
|
||
header_block = await self.blockchain.get_header_block_by_height(height, header_hash, tx_filter=False)
|
||
if header_block is None:
|
||
self.log.error(f"Can't find block for given compact vdf. Height: {height} Header hash: {header_hash}")
|
||
return False
|
||
is_new_proof = await self._needs_compact_proof(vdf_info, header_block, field_vdf)
|
||
if not is_new_proof:
|
||
self.log.info(f"Duplicate compact proof. Height: {height}. Header hash: {header_hash}.")
|
||
return is_new_proof
|
||
|
||
async def _replace_proof(
|
||
self,
|
||
vdf_info: VDFInfo,
|
||
vdf_proof: VDFProof,
|
||
height: uint32,
|
||
field_vdf: CompressibleVDFField,
|
||
):
|
||
full_blocks = await self.block_store.get_full_blocks_at([height])
|
||
assert len(full_blocks) > 0
|
||
for block in full_blocks:
|
||
new_block = None
|
||
block_record = await self.blockchain.get_block_record_from_db(self.blockchain.height_to_hash(height))
|
||
assert block_record is not None
|
||
|
||
if field_vdf == CompressibleVDFField.CC_EOS_VDF:
|
||
for index, sub_slot in enumerate(block.finished_sub_slots):
|
||
if sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf == vdf_info:
|
||
new_proofs = dataclasses.replace(sub_slot.proofs, challenge_chain_slot_proof=vdf_proof)
|
||
new_subslot = dataclasses.replace(sub_slot, proofs=new_proofs)
|
||
new_finished_subslots = block.finished_sub_slots
|
||
new_finished_subslots[index] = new_subslot
|
||
new_block = dataclasses.replace(block, finished_sub_slots=new_finished_subslots)
|
||
break
|
||
if field_vdf == CompressibleVDFField.ICC_EOS_VDF:
|
||
for index, sub_slot in enumerate(block.finished_sub_slots):
|
||
if (
|
||
sub_slot.infused_challenge_chain is not None
|
||
and sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf == vdf_info
|
||
):
|
||
new_proofs = dataclasses.replace(sub_slot.proofs, infused_challenge_chain_slot_proof=vdf_proof)
|
||
new_subslot = dataclasses.replace(sub_slot, proofs=new_proofs)
|
||
new_finished_subslots = block.finished_sub_slots
|
||
new_finished_subslots[index] = new_subslot
|
||
new_block = dataclasses.replace(block, finished_sub_slots=new_finished_subslots)
|
||
break
|
||
if field_vdf == CompressibleVDFField.CC_SP_VDF:
|
||
if block.reward_chain_block.challenge_chain_sp_vdf == vdf_info:
|
||
assert block.challenge_chain_sp_proof is not None
|
||
new_block = dataclasses.replace(block, challenge_chain_sp_proof=vdf_proof)
|
||
if field_vdf == CompressibleVDFField.CC_IP_VDF:
|
||
if block.reward_chain_block.challenge_chain_ip_vdf == vdf_info:
|
||
new_block = dataclasses.replace(block, challenge_chain_ip_proof=vdf_proof)
|
||
if new_block is None:
|
||
self.log.debug("did not replace any proof, vdf does not match")
|
||
return
|
||
async with self.db_wrapper.lock:
|
||
await self.block_store.add_full_block(new_block.header_hash, new_block, block_record)
|
||
await self.block_store.db_wrapper.commit_transaction()
|
||
|
||
async def respond_compact_proof_of_time(self, request: timelord_protocol.RespondCompactProofOfTime):
|
||
field_vdf = CompressibleVDFField(int(request.field_vdf))
|
||
if not await self._can_accept_compact_proof(
|
||
request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf
|
||
):
|
||
return None
|
||
async with self.blockchain.compact_proof_lock:
|
||
await self._replace_proof(request.vdf_info, request.vdf_proof, request.height, field_vdf)
|
||
msg = make_msg(
|
||
ProtocolMessageTypes.new_compact_vdf,
|
||
full_node_protocol.NewCompactVDF(request.height, request.header_hash, request.field_vdf, request.vdf_info),
|
||
)
|
||
if self.server is not None:
|
||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||
|
||
async def new_compact_vdf(self, request: full_node_protocol.NewCompactVDF, peer: ws.WSChiaConnection):
|
||
is_fully_compactified = await self.block_store.is_fully_compactified(request.header_hash)
|
||
if is_fully_compactified is None or is_fully_compactified:
|
||
return False
|
||
header_block = await self.blockchain.get_header_block_by_height(
|
||
request.height, request.header_hash, tx_filter=False
|
||
)
|
||
if header_block is None:
|
||
return None
|
||
field_vdf = CompressibleVDFField(int(request.field_vdf))
|
||
if await self._needs_compact_proof(request.vdf_info, header_block, field_vdf):
|
||
peer_request = full_node_protocol.RequestCompactVDF(
|
||
request.height, request.header_hash, request.field_vdf, request.vdf_info
|
||
)
|
||
response = await peer.request_compact_vdf(peer_request, timeout=10)
|
||
if response is not None and isinstance(response, full_node_protocol.RespondCompactVDF):
|
||
await self.respond_compact_vdf(response, peer)
|
||
|
||
async def request_compact_vdf(self, request: full_node_protocol.RequestCompactVDF, peer: ws.WSChiaConnection):
|
||
header_block = await self.blockchain.get_header_block_by_height(
|
||
request.height, request.header_hash, tx_filter=False
|
||
)
|
||
if header_block is None:
|
||
return None
|
||
vdf_proof: Optional[VDFProof] = None
|
||
field_vdf = CompressibleVDFField(int(request.field_vdf))
|
||
if field_vdf == CompressibleVDFField.CC_EOS_VDF:
|
||
for sub_slot in header_block.finished_sub_slots:
|
||
if sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf == request.vdf_info:
|
||
vdf_proof = sub_slot.proofs.challenge_chain_slot_proof
|
||
break
|
||
if field_vdf == CompressibleVDFField.ICC_EOS_VDF:
|
||
for sub_slot in header_block.finished_sub_slots:
|
||
if (
|
||
sub_slot.infused_challenge_chain is not None
|
||
and sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf == request.vdf_info
|
||
):
|
||
vdf_proof = sub_slot.proofs.infused_challenge_chain_slot_proof
|
||
break
|
||
if (
|
||
field_vdf == CompressibleVDFField.CC_SP_VDF
|
||
and header_block.reward_chain_block.challenge_chain_sp_vdf == request.vdf_info
|
||
):
|
||
vdf_proof = header_block.challenge_chain_sp_proof
|
||
if (
|
||
field_vdf == CompressibleVDFField.CC_IP_VDF
|
||
and header_block.reward_chain_block.challenge_chain_ip_vdf == request.vdf_info
|
||
):
|
||
vdf_proof = header_block.challenge_chain_ip_proof
|
||
if vdf_proof is None or vdf_proof.witness_type > 0 or not vdf_proof.normalized_to_identity:
|
||
self.log.error(f"{peer} requested compact vdf we don't have, height: {request.height}.")
|
||
return None
|
||
compact_vdf = full_node_protocol.RespondCompactVDF(
|
||
request.height,
|
||
request.header_hash,
|
||
request.field_vdf,
|
||
request.vdf_info,
|
||
vdf_proof,
|
||
)
|
||
msg = make_msg(ProtocolMessageTypes.respond_compact_vdf, compact_vdf)
|
||
await peer.send_message(msg)
|
||
|
||
async def respond_compact_vdf(self, request: full_node_protocol.RespondCompactVDF, peer: ws.WSChiaConnection):
|
||
field_vdf = CompressibleVDFField(int(request.field_vdf))
|
||
if not await self._can_accept_compact_proof(
|
||
request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf
|
||
):
|
||
return None
|
||
async with self.blockchain.compact_proof_lock:
|
||
if self.blockchain.seen_compact_proofs(request.vdf_info, request.height):
|
||
return None
|
||
await self._replace_proof(request.vdf_info, request.vdf_proof, request.height, field_vdf)
|
||
msg = make_msg(
|
||
ProtocolMessageTypes.new_compact_vdf,
|
||
full_node_protocol.NewCompactVDF(request.height, request.header_hash, request.field_vdf, request.vdf_info),
|
||
)
|
||
if self.server is not None:
|
||
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
|
||
|
||
async def broadcast_uncompact_blocks(
|
||
self, uncompact_interval_scan: int, target_uncompact_proofs: int, sanitize_weight_proof_only: bool
|
||
):
|
||
min_height: Optional[int] = 0
|
||
try:
|
||
while not self._shut_down:
|
||
while self.sync_store.get_sync_mode():
|
||
if self._shut_down:
|
||
return None
|
||
await asyncio.sleep(30)
|
||
|
||
broadcast_list: List[timelord_protocol.RequestCompactProofOfTime] = []
|
||
new_min_height = None
|
||
max_height = self.blockchain.get_peak_height()
|
||
if max_height is None:
|
||
await asyncio.sleep(30)
|
||
continue
|
||
# Calculate 'min_height' correctly the first time this task is launched, using the db
|
||
assert min_height is not None
|
||
min_height = await self.block_store.get_first_not_compactified(min_height)
|
||
if min_height is None or min_height > max(0, max_height - 1000):
|
||
min_height = max(0, max_height - 1000)
|
||
batches_finished = 0
|
||
self.log.info("Scanning the blockchain for uncompact blocks.")
|
||
assert max_height is not None
|
||
assert min_height is not None
|
||
for h in range(min_height, max_height, 100):
|
||
# Got 10 times the target header count, sampling the target headers should contain
|
||
# enough randomness to split the work between blueboxes.
|
||
if len(broadcast_list) > target_uncompact_proofs * 10:
|
||
break
|
||
stop_height = min(h + 99, max_height)
|
||
assert min_height is not None
|
||
headers = await self.blockchain.get_header_blocks_in_range(min_height, stop_height, tx_filter=False)
|
||
records: Dict[bytes32, BlockRecord] = {}
|
||
if sanitize_weight_proof_only:
|
||
records = await self.blockchain.get_block_records_in_range(min_height, stop_height)
|
||
for header in headers.values():
|
||
prev_broadcast_list_len = len(broadcast_list)
|
||
expected_header_hash = self.blockchain.height_to_hash(header.height)
|
||
if header.header_hash != expected_header_hash:
|
||
continue
|
||
if sanitize_weight_proof_only:
|
||
assert header.header_hash in records
|
||
record = records[header.header_hash]
|
||
for sub_slot in header.finished_sub_slots:
|
||
if (
|
||
sub_slot.proofs.challenge_chain_slot_proof.witness_type > 0
|
||
or not sub_slot.proofs.challenge_chain_slot_proof.normalized_to_identity
|
||
):
|
||
broadcast_list.append(
|
||
timelord_protocol.RequestCompactProofOfTime(
|
||
sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf,
|
||
header.header_hash,
|
||
header.height,
|
||
uint8(CompressibleVDFField.CC_EOS_VDF),
|
||
)
|
||
)
|
||
if sub_slot.proofs.infused_challenge_chain_slot_proof is not None and (
|
||
sub_slot.proofs.infused_challenge_chain_slot_proof.witness_type > 0
|
||
or not sub_slot.proofs.infused_challenge_chain_slot_proof.normalized_to_identity
|
||
):
|
||
assert sub_slot.infused_challenge_chain is not None
|
||
broadcast_list.append(
|
||
timelord_protocol.RequestCompactProofOfTime(
|
||
sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf,
|
||
header.header_hash,
|
||
header.height,
|
||
uint8(CompressibleVDFField.ICC_EOS_VDF),
|
||
)
|
||
)
|
||
# Running in 'sanitize_weight_proof_only' ignores CC_SP_VDF and CC_IP_VDF
|
||
# unless this is a challenge block.
|
||
if sanitize_weight_proof_only:
|
||
if not record.is_challenge_block(self.constants):
|
||
# Calculates 'new_min_height' as described below.
|
||
if (
|
||
prev_broadcast_list_len == 0
|
||
and len(broadcast_list) > 0
|
||
and h <= max(0, max_height - 1000)
|
||
):
|
||
new_min_height = header.height
|
||
# Skip calculations for CC_SP_VDF and CC_IP_VDF.
|
||
continue
|
||
if header.challenge_chain_sp_proof is not None and (
|
||
header.challenge_chain_sp_proof.witness_type > 0
|
||
or not header.challenge_chain_sp_proof.normalized_to_identity
|
||
):
|
||
assert header.reward_chain_block.challenge_chain_sp_vdf is not None
|
||
broadcast_list.append(
|
||
timelord_protocol.RequestCompactProofOfTime(
|
||
header.reward_chain_block.challenge_chain_sp_vdf,
|
||
header.header_hash,
|
||
header.height,
|
||
uint8(CompressibleVDFField.CC_SP_VDF),
|
||
)
|
||
)
|
||
|
||
if (
|
||
header.challenge_chain_ip_proof.witness_type > 0
|
||
or not header.challenge_chain_ip_proof.normalized_to_identity
|
||
):
|
||
broadcast_list.append(
|
||
timelord_protocol.RequestCompactProofOfTime(
|
||
header.reward_chain_block.challenge_chain_ip_vdf,
|
||
header.header_hash,
|
||
header.height,
|
||
uint8(CompressibleVDFField.CC_IP_VDF),
|
||
)
|
||
)
|
||
# This is the first header with uncompact proofs. Store its height so next time we iterate
|
||
# only from here. Fix header block iteration window to at least 1000, so reorgs will be
|
||
# handled correctly.
|
||
if prev_broadcast_list_len == 0 and len(broadcast_list) > 0 and h <= max(0, max_height - 1000):
|
||
new_min_height = header.height
|
||
|
||
# Small sleep between batches.
|
||
batches_finished += 1
|
||
if batches_finished % 10 == 0:
|
||
await asyncio.sleep(1)
|
||
|
||
# We have no uncompact blocks, but mentain the block iteration window to at least 1000 blocks.
|
||
if new_min_height is None:
|
||
new_min_height = max(0, max_height - 1000)
|
||
min_height = new_min_height
|
||
if len(broadcast_list) > target_uncompact_proofs:
|
||
random.shuffle(broadcast_list)
|
||
broadcast_list = broadcast_list[:target_uncompact_proofs]
|
||
if self.sync_store.get_sync_mode():
|
||
continue
|
||
if self.server is not None:
|
||
for new_pot in broadcast_list:
|
||
msg = make_msg(ProtocolMessageTypes.request_compact_proof_of_time, new_pot)
|
||
await self.server.send_to_all([msg], NodeType.TIMELORD)
|
||
await asyncio.sleep(uncompact_interval_scan)
|
||
except Exception as e:
|
||
error_stack = traceback.format_exc()
|
||
self.log.error(f"Exception in broadcast_uncompact_blocks: {e}")
|
||
self.log.error(f"Exception Stack: {error_stack}")
|