chia-blockchain/chia/consensus/difficulty_adjustment.py

411 lines
18 KiB
Python

from typing import List, Optional, Tuple
from chia.consensus.block_record import BlockRecord
from chia.consensus.blockchain_interface import BlockchainInterface
from chia.consensus.constants import ConsensusConstants
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.significant_bits import count_significant_bits, truncate_to_significant_bits
def _get_blocks_at_height(
blocks: BlockchainInterface,
prev_b: BlockRecord,
target_height: uint32,
max_num_blocks: uint32 = uint32(1),
) -> List[BlockRecord]:
"""
Return a consecutive list of BlockRecords starting at target_height, returning a maximum of
max_num_blocks. Assumes all block records are present. Does a slot linear search, if the blocks are not
in the path of the peak. Can only fetch ancestors of prev_b.
Args:
blocks: dict from header hash to BlockRecord.
prev_b: prev_b (to start backwards search).
target_height: target block to start
max_num_blocks: max number of blocks to fetch (although less might be fetched)
"""
if blocks.contains_height(prev_b.height):
header_hash = blocks.height_to_hash(prev_b.height)
if header_hash == prev_b.header_hash:
# Efficient fetching, since we are fetching ancestor blocks within the heaviest chain. We can directly
# use the height_to_block_record method
block_list: List[BlockRecord] = []
for h in range(target_height, target_height + max_num_blocks):
assert blocks.contains_height(uint32(h))
block_list.append(blocks.height_to_block_record(uint32(h)))
return block_list
# Slow fetching, goes back one by one, since we are in a fork
curr_b: BlockRecord = prev_b
target_blocks = []
while curr_b.height >= target_height:
if curr_b.height < target_height + max_num_blocks:
target_blocks.append(curr_b)
if curr_b.height == 0:
break
curr_b = blocks.block_record(curr_b.prev_hash)
return list(reversed(target_blocks))
def _get_second_to_last_transaction_block_in_previous_epoch(
constants: ConsensusConstants,
blocks: BlockchainInterface,
last_b: BlockRecord,
) -> BlockRecord:
"""
Retrieves the second to last transaction block in the previous epoch.
Args:
constants: consensus constants being used for this chain
blocks: dict from header hash to block of all relevant blocks
last_b: last-block in the current epoch, or last block we have seen, if potentially finishing epoch soon
prev epoch surpassed prev epoch started epoch sur. epoch started
v v v v
|.B...B....B. B....B...|......B....B.....B...B.|.B.B.B..|..B...B.B.B...|.B.B.B. B.|........
PREV EPOCH CURR EPOCH NEW EPOCH
The blocks selected for the timestamps are the second to last transaction blocks in each epoch.
Block at height 0 is an exception. Note that H mod EPOCH_BLOCKS where H is the height of the first block in the
epoch, must be >= 0, and < 128.
"""
# This height is guaranteed to be in the next epoch (even when last_b is not actually the last block)
height_in_next_epoch = (
last_b.height + 2 * constants.MAX_SUB_SLOT_BLOCKS + constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK + 5
)
height_epoch_surpass: uint32 = uint32(height_in_next_epoch - (height_in_next_epoch % constants.EPOCH_BLOCKS))
height_prev_epoch_surpass: uint32 = uint32(height_epoch_surpass - constants.EPOCH_BLOCKS)
assert height_prev_epoch_surpass % constants.EPOCH_BLOCKS == height_prev_epoch_surpass % constants.EPOCH_BLOCKS == 0
# Sanity check, don't go too far past epoch barrier
assert (height_in_next_epoch - height_epoch_surpass) < (5 * constants.MAX_SUB_SLOT_BLOCKS)
if height_prev_epoch_surpass == 0:
# The genesis block is an edge case, where we measure from the first block in epoch (height 0), as opposed to
# a block in the previous epoch, which would be height < 0
return _get_blocks_at_height(blocks, last_b, uint32(0))[0]
# If the prev slot is the first slot, the iterations start at 0
# We will compute the timestamps of the 2nd to last block in epoch, as well as the total iterations at infusion
prev_slot_start_iters: uint128
prev_slot_time_start: uint64
# The target block must be in this range. Either the surpass block must be a transaction block, or something
# in it's sub slot must be a transaction block. If that is the only transaction block in the sub-slot, the last
# block in the previous sub-slot from that must also be a transaction block (therefore -1 is used).
# The max height for the new epoch to start is surpass + 2*MAX_SUB_SLOT_BLOCKS + MIN_BLOCKS_PER_CHALLENGE_BLOCK - 3,
# since we might have a deficit > 0 when surpass is hit. The +3 is added just in case
fetched_blocks = _get_blocks_at_height(
blocks,
last_b,
uint32(height_prev_epoch_surpass - constants.MAX_SUB_SLOT_BLOCKS - 1),
uint32(3 * constants.MAX_SUB_SLOT_BLOCKS + constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK + 3),
)
# We want to find the last block in the slot at which we surpass the height.
# The last block in epoch will be before this.
fetched_index: int = constants.MAX_SUB_SLOT_BLOCKS
curr_b: BlockRecord = fetched_blocks[fetched_index]
fetched_index += 1
assert curr_b.height == height_prev_epoch_surpass - 1
next_b: BlockRecord = fetched_blocks[fetched_index]
assert next_b.height == height_prev_epoch_surpass
# Wait until the slot finishes with a challenge chain infusion at start of slot
# Note that there are no overflow blocks at the start of new epochs
while next_b.sub_epoch_summary_included is None:
curr_b = next_b
next_b = fetched_blocks[fetched_index]
fetched_index += 1
# Backtrack to find the second to last tx block
found_tx_block = 1 if curr_b.is_transaction_block else 0
while found_tx_block < 2:
curr_b = blocks.block_record(curr_b.prev_hash)
if curr_b.is_transaction_block:
found_tx_block += 1
return curr_b
def height_can_be_first_in_epoch(constants: ConsensusConstants, height: uint32) -> bool:
return (height - (height % constants.SUB_EPOCH_BLOCKS)) % constants.EPOCH_BLOCKS == 0
def can_finish_sub_and_full_epoch(
constants: ConsensusConstants,
blocks: BlockchainInterface,
height: uint32,
prev_header_hash: Optional[bytes32],
deficit: uint8,
block_at_height_included_ses: bool,
) -> Tuple[bool, bool]:
"""
Returns a bool tuple
first bool is true if the next sub-slot after height will form part of a new sub-epoch. Therefore
block height is the last block, and height + 1 is in a new sub-epoch.
second bool is true if the next sub-slot after height will form part of a new sub-epoch and epoch.
Therefore, block height is the last block, and height + 1 is in a new epoch.
Args:
constants: consensus constants being used for this chain
blocks: dictionary from header hash to SBR of all included SBR
height: block height of the (potentially) last block in the sub-epoch
prev_header_hash: prev_header hash of the block at height, assuming not genesis
deficit: deficit of block at height height
block_at_height_included_ses: whether or not the block at height height already included a SES
"""
if height < constants.SUB_EPOCH_BLOCKS - 1:
return False, False
assert prev_header_hash is not None
if deficit > 0:
return False, False
if block_at_height_included_ses:
# If we just included a sub_epoch_summary, we cannot include one again
return False, False
# This does not check the two edge cases where (height + 1) % constants.SUB_EPOCH_BLOCKS is 0 or 1
# If it's 0, height+1 is the first place that a sub-epoch can be included
# If it's 1, we just checked whether 0 included it in the previous check
if (height + 1) % constants.SUB_EPOCH_BLOCKS > 1:
curr: BlockRecord = blocks.block_record(prev_header_hash)
while curr.height % constants.SUB_EPOCH_BLOCKS > 0:
if curr.sub_epoch_summary_included is not None:
return False, False
curr = blocks.block_record(curr.prev_hash)
if curr.sub_epoch_summary_included is not None:
return False, False
# For checking new epoch, make sure the epoch blocks are aligned
return True, height_can_be_first_in_epoch(constants, uint32(height + 1))
def _get_next_sub_slot_iters(
constants: ConsensusConstants,
blocks: BlockchainInterface,
prev_header_hash: bytes32,
height: uint32,
curr_sub_slot_iters: uint64,
deficit: uint8,
block_at_height_included_ses: bool,
new_slot: bool,
signage_point_total_iters: uint128,
skip_epoch_check=False,
) -> uint64:
"""
Returns the slot iterations required for the next block after the one at height, where new_slot is true
iff the next block will be in the next slot. WARNING: assumes that the block at height is not the first block
in a sub-epoch.
Args:
constants: consensus constants being used for this chain
blocks: dictionary from header hash to SBR of all included SBR
prev_header_hash: header hash of the previous block
height: the block height of the block to look at
curr_sub_slot_iters: sub-slot iters at the infusion point of the block at height
deficit: deficit of block at height height
new_slot: whether or not there is a new slot after height
signage_point_total_iters: signage point iters of the block at height
skip_epoch_check: don't check correct epoch
"""
next_height: uint32 = uint32(height + 1)
if next_height < constants.EPOCH_BLOCKS:
return uint64(constants.SUB_SLOT_ITERS_STARTING)
if not blocks.contains_block(prev_header_hash):
raise ValueError(f"Header hash {prev_header_hash} not in blocks")
prev_b: BlockRecord = blocks.block_record(prev_header_hash)
# If we are in the same epoch, return same ssi
if not skip_epoch_check:
_, can_finish_epoch = can_finish_sub_and_full_epoch(
constants, blocks, height, prev_header_hash, deficit, block_at_height_included_ses
)
if not new_slot or not can_finish_epoch:
return curr_sub_slot_iters
last_block_prev: BlockRecord = _get_second_to_last_transaction_block_in_previous_epoch(constants, blocks, prev_b)
# This gets the last transaction block before this block's signage point. Assuming the block at height height
# is the last block infused in the epoch: If this block ends up being a
# transaction block, then last_block_curr will be the second to last tx block in the epoch. If this block
# is not a transaction block, that means there was exactly one other tx block included in between our signage
# point and infusion point, and therefore last_block_curr is the second to last as well.
last_block_curr = prev_b
while last_block_curr.total_iters > signage_point_total_iters or not last_block_curr.is_transaction_block:
last_block_curr = blocks.block_record(last_block_curr.prev_hash)
assert last_block_curr.timestamp is not None and last_block_prev.timestamp is not None
# This is computed as the iterations per second in last epoch, times the target number of seconds per slot
new_ssi_precise: uint64 = uint64(
constants.SUB_SLOT_TIME_TARGET
* (last_block_curr.total_iters - last_block_prev.total_iters)
// (last_block_curr.timestamp - last_block_prev.timestamp)
)
# Only change by a max factor as a sanity check
max_ssi = uint64(constants.DIFFICULTY_CHANGE_MAX_FACTOR * last_block_curr.sub_slot_iters)
min_ssi = uint64(last_block_curr.sub_slot_iters // constants.DIFFICULTY_CHANGE_MAX_FACTOR)
if new_ssi_precise >= last_block_curr.sub_slot_iters:
new_ssi_precise = uint64(min(new_ssi_precise, max_ssi))
else:
new_ssi_precise = uint64(max([constants.NUM_SPS_SUB_SLOT, new_ssi_precise, min_ssi]))
new_ssi = truncate_to_significant_bits(new_ssi_precise, constants.SIGNIFICANT_BITS)
new_ssi = uint64(new_ssi - new_ssi % constants.NUM_SPS_SUB_SLOT) # Must divide the sub slot
assert count_significant_bits(new_ssi) <= constants.SIGNIFICANT_BITS
return new_ssi
def _get_next_difficulty(
constants: ConsensusConstants,
blocks: BlockchainInterface,
prev_header_hash: bytes32,
height: uint32,
current_difficulty: uint64,
deficit: uint8,
block_at_height_included_ses: bool,
new_slot: bool,
signage_point_total_iters: uint128,
skip_epoch_check=False,
) -> uint64:
"""
Returns the difficulty of the next block that extends onto block.
Used to calculate the number of iterations. WARNING: assumes that the block at height is not the first block
in a sub-epoch.
Args:
constants: consensus constants being used for this chain
blocks: dictionary from header hash to SBR of all included SBR
prev_header_hash: header hash of the previous block
height: the block height of the block to look at
deficit: deficit of block at height height
current_difficulty: difficulty at the infusion point of the block at height
new_slot: whether or not there is a new slot after height
signage_point_total_iters: signage point iters of the block at height
skip_epoch_check: don't check correct epoch
"""
next_height: uint32 = uint32(height + 1)
if next_height < (constants.EPOCH_BLOCKS - 3 * constants.MAX_SUB_SLOT_BLOCKS):
# We are in the first epoch
return uint64(constants.DIFFICULTY_STARTING)
if not blocks.contains_block(prev_header_hash):
raise ValueError(f"Header hash {prev_header_hash} not in blocks")
prev_b: BlockRecord = blocks.block_record(prev_header_hash)
# If we are in the same slot as previous block, return same difficulty
if not skip_epoch_check:
_, can_finish_epoch = can_finish_sub_and_full_epoch(
constants, blocks, height, prev_header_hash, deficit, block_at_height_included_ses
)
if not new_slot or not can_finish_epoch:
return current_difficulty
last_block_prev: BlockRecord = _get_second_to_last_transaction_block_in_previous_epoch(constants, blocks, prev_b)
# This gets the last transaction block before this block's signage point. Assuming the block at height height
# is the last block infused in the epoch: If this block ends up being a
# transaction block, then last_block_curr will be the second to last tx block in the epoch. If this block
# is not a transaction block, that means there was exactly one other tx block included in between our signage
# point and infusion point, and therefore last_block_curr is the second to last as well.
last_block_curr = prev_b
while last_block_curr.total_iters > signage_point_total_iters or not last_block_curr.is_transaction_block:
last_block_curr = blocks.block_record(last_block_curr.prev_hash)
assert last_block_curr.timestamp is not None
assert last_block_prev.timestamp is not None
actual_epoch_time: uint64 = uint64(last_block_curr.timestamp - last_block_prev.timestamp)
old_difficulty = uint64(prev_b.weight - blocks.block_record(prev_b.prev_hash).weight)
# Terms are rearranged so there is only one division.
new_difficulty_precise = uint64(
(last_block_curr.weight - last_block_prev.weight)
* constants.SUB_SLOT_TIME_TARGET
// (constants.SLOT_BLOCKS_TARGET * actual_epoch_time)
)
# Only change by a max factor, to prevent attacks, as in greenpaper, and must be at least 1
max_diff = uint64(constants.DIFFICULTY_CHANGE_MAX_FACTOR * old_difficulty)
min_diff = uint64(old_difficulty // constants.DIFFICULTY_CHANGE_MAX_FACTOR)
if new_difficulty_precise >= old_difficulty:
new_difficulty_precise = uint64(min(new_difficulty_precise, max_diff))
else:
new_difficulty_precise = uint64(max([uint64(1), new_difficulty_precise, min_diff]))
new_difficulty = truncate_to_significant_bits(new_difficulty_precise, constants.SIGNIFICANT_BITS)
assert count_significant_bits(new_difficulty) <= constants.SIGNIFICANT_BITS
return uint64(new_difficulty)
def get_next_sub_slot_iters_and_difficulty(
constants: ConsensusConstants,
is_first_in_sub_slot: bool,
prev_b: Optional[BlockRecord],
blocks: BlockchainInterface,
) -> Tuple[uint64, uint64]:
"""
Retrieves the current sub_slot iters and difficulty of the next block after prev_b.
Args:
constants: consensus constants being used for this chain
is_first_in_sub_slot: Whether the next block is the first in the sub slot
prev_b: the previous block (last block in the epoch)
blocks: dictionary from header hash to SBR of all included SBR
"""
# genesis
if prev_b is None:
return constants.SUB_SLOT_ITERS_STARTING, constants.DIFFICULTY_STARTING
if prev_b.height != 0:
prev_difficulty: uint64 = uint64(prev_b.weight - blocks.block_record(prev_b.prev_hash).weight)
else:
# prev block is genesis
prev_difficulty = uint64(prev_b.weight)
if prev_b.sub_epoch_summary_included is not None:
return prev_b.sub_slot_iters, prev_difficulty
sp_total_iters = prev_b.sp_total_iters(constants)
difficulty: uint64 = _get_next_difficulty(
constants,
blocks,
prev_b.prev_hash,
prev_b.height,
prev_difficulty,
prev_b.deficit,
False, # Already checked above
is_first_in_sub_slot,
sp_total_iters,
)
sub_slot_iters: uint64 = _get_next_sub_slot_iters(
constants,
blocks,
prev_b.prev_hash,
prev_b.height,
prev_b.sub_slot_iters,
prev_b.deficit,
False, # Already checked above
is_first_in_sub_slot,
sp_total_iters,
)
return sub_slot_iters, difficulty