chia-blockchain/chia/full_node/mempool_check_conditions.py

230 lines
9.2 KiB
Python

import time
from typing import Dict, List, Optional, Set
from chia.consensus.cost_calculator import NPCResult
from chia.full_node.generator import create_generator_args, setup_generator_args
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import NIL
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_record import CoinRecord
from chia.types.condition_with_args import ConditionWithArgs
from chia.types.generator_types import BlockGenerator
from chia.types.name_puzzle_condition import NPC
from chia.util.clvm import int_from_bytes
from chia.util.condition_tools import ConditionOpcode, conditions_by_opcode
from chia.util.errors import Err
from chia.util.ints import uint32, uint64, uint16
from chia.wallet.puzzles.generator_loader import GENERATOR_FOR_SINGLE_COIN_MOD
from chia.wallet.puzzles.rom_bootstrap_generator import get_generator
GENERATOR_MOD = get_generator()
def mempool_assert_announcement(condition: ConditionWithArgs, announcements: Set[bytes32]) -> Optional[Err]:
"""
Check if an announcement is included in the list of announcements
"""
announcement_hash = bytes32(condition.vars[0])
if announcement_hash not in announcements:
return Err.ASSERT_ANNOUNCE_CONSUMED_FAILED
return None
def mempool_assert_my_coin_id(condition: ConditionWithArgs, unspent: CoinRecord) -> Optional[Err]:
"""
Checks if CoinID matches the id from the condition
"""
if unspent.coin.name() != condition.vars[0]:
return Err.ASSERT_MY_COIN_ID_FAILED
return None
def mempool_assert_absolute_block_height_exceeds(
condition: ConditionWithArgs, prev_transaction_block_height: uint32
) -> Optional[Err]:
"""
Checks if the next block index exceeds the block index from the condition
"""
try:
block_index_exceeds_this = int_from_bytes(condition.vars[0])
except ValueError:
return Err.INVALID_CONDITION
if prev_transaction_block_height < block_index_exceeds_this:
return Err.ASSERT_HEIGHT_ABSOLUTE_FAILED
return None
def mempool_assert_relative_block_height_exceeds(
condition: ConditionWithArgs, unspent: CoinRecord, prev_transaction_block_height: uint32
) -> Optional[Err]:
"""
Checks if the coin age exceeds the age from the condition
"""
try:
expected_block_age = int_from_bytes(condition.vars[0])
block_index_exceeds_this = expected_block_age + unspent.confirmed_block_index
except ValueError:
return Err.INVALID_CONDITION
if prev_transaction_block_height < block_index_exceeds_this:
return Err.ASSERT_HEIGHT_RELATIVE_FAILED
return None
def mempool_assert_absolute_time_exceeds(condition: ConditionWithArgs, timestamp: uint64) -> Optional[Err]:
"""
Check if the current time in seconds exceeds the time specified by condition
"""
try:
expected_seconds = int_from_bytes(condition.vars[0])
except ValueError:
return Err.INVALID_CONDITION
if timestamp is None:
timestamp = uint64(int(time.time()))
if timestamp < expected_seconds:
return Err.ASSERT_SECONDS_ABSOLUTE_FAILED
return None
def mempool_assert_relative_time_exceeds(
condition: ConditionWithArgs, unspent: CoinRecord, timestamp: uint64
) -> Optional[Err]:
"""
Check if the current time in seconds exceeds the time specified by condition
"""
try:
expected_seconds = int_from_bytes(condition.vars[0])
except ValueError:
return Err.INVALID_CONDITION
if timestamp is None:
timestamp = uint64(int(time.time()))
if timestamp < expected_seconds + unspent.timestamp:
return Err.ASSERT_SECONDS_RELATIVE_FAILED
return None
def mempool_assert_my_parent_id(condition: ConditionWithArgs, unspent: CoinRecord) -> Optional[Err]:
"""
Checks if coin's parent ID matches the ID from the condition
"""
if unspent.coin.parent_coin_info != condition.vars[0]:
return Err.ASSERT_MY_PARENT_ID_FAILED
return None
def mempool_assert_my_puzzlehash(condition: ConditionWithArgs, unspent: CoinRecord) -> Optional[Err]:
"""
Checks if coin's puzzlehash matches the puzzlehash from the condition
"""
if unspent.coin.puzzle_hash != condition.vars[0]:
return Err.ASSERT_MY_PUZZLEHASH_FAILED
return None
def mempool_assert_my_amount(condition: ConditionWithArgs, unspent: CoinRecord) -> Optional[Err]:
"""
Checks if coin's amount matches the amount from the condition
"""
if unspent.coin.amount != int_from_bytes(condition.vars[0]):
return Err.ASSERT_MY_AMOUNT_FAILED
return None
def get_name_puzzle_conditions(generator: BlockGenerator, max_cost: int, safe_mode: bool) -> NPCResult:
try:
block_program, block_program_args = setup_generator_args(generator)
if safe_mode:
cost, result = GENERATOR_MOD.run_safe_with_cost(max_cost, block_program, block_program_args)
else:
cost, result = GENERATOR_MOD.run_with_cost(max_cost, block_program, block_program_args)
npc_list: List[NPC] = []
opcodes: Set[bytes] = set(item.value for item in ConditionOpcode)
for res in result.first().as_iter():
conditions_list: List[ConditionWithArgs] = []
spent_coin_parent_id: bytes32 = res.first().as_atom()
spent_coin_puzzle_hash: bytes32 = res.rest().first().as_atom()
spent_coin_amount: uint64 = uint64(res.rest().rest().first().as_int())
spent_coin: Coin = Coin(spent_coin_parent_id, spent_coin_puzzle_hash, spent_coin_amount)
for cond in res.rest().rest().rest().first().as_iter():
if cond.first().as_atom() in opcodes:
opcode: ConditionOpcode = ConditionOpcode(cond.first().as_atom())
elif not safe_mode:
opcode = ConditionOpcode.UNKNOWN
else:
return NPCResult(uint16(Err.GENERATOR_RUNTIME_ERROR.value), [], uint64(0))
cvl = ConditionWithArgs(opcode, cond.rest().as_atom_list())
conditions_list.append(cvl)
conditions_dict = conditions_by_opcode(conditions_list)
if conditions_dict is None:
conditions_dict = {}
npc_list.append(
NPC(spent_coin.name(), spent_coin.puzzle_hash, [(a, b) for a, b in conditions_dict.items()])
)
return NPCResult(None, npc_list, uint64(cost))
except Exception:
return NPCResult(uint16(Err.GENERATOR_RUNTIME_ERROR.value), [], uint64(0))
def get_puzzle_and_solution_for_coin(generator: BlockGenerator, coin_name: bytes, max_cost: int):
try:
block_program = generator.program
if not generator.generator_args:
block_program_args = NIL
else:
block_program_args = create_generator_args(generator.generator_refs())
cost, result = GENERATOR_FOR_SINGLE_COIN_MOD.run_with_cost(
max_cost, block_program, block_program_args, coin_name
)
puzzle = result.first()
solution = result.rest().first()
return None, puzzle, solution
except Exception as e:
return e, None, None
def mempool_check_conditions_dict(
unspent: CoinRecord,
coin_announcement_names: Set[bytes32],
puzzle_announcement_names: Set[bytes32],
conditions_dict: Dict[ConditionOpcode, List[ConditionWithArgs]],
prev_transaction_block_height: uint32,
timestamp: uint64,
) -> Optional[Err]:
"""
Check all conditions against current state.
"""
for con_list in conditions_dict.values():
cvp: ConditionWithArgs
for cvp in con_list:
error: Optional[Err] = None
if cvp.opcode is ConditionOpcode.ASSERT_MY_COIN_ID:
error = mempool_assert_my_coin_id(cvp, unspent)
elif cvp.opcode is ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT:
error = mempool_assert_announcement(cvp, coin_announcement_names)
elif cvp.opcode is ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT:
error = mempool_assert_announcement(cvp, puzzle_announcement_names)
elif cvp.opcode is ConditionOpcode.ASSERT_HEIGHT_ABSOLUTE:
error = mempool_assert_absolute_block_height_exceeds(cvp, prev_transaction_block_height)
elif cvp.opcode is ConditionOpcode.ASSERT_HEIGHT_RELATIVE:
error = mempool_assert_relative_block_height_exceeds(cvp, unspent, prev_transaction_block_height)
elif cvp.opcode is ConditionOpcode.ASSERT_SECONDS_ABSOLUTE:
error = mempool_assert_absolute_time_exceeds(cvp, timestamp)
elif cvp.opcode is ConditionOpcode.ASSERT_SECONDS_RELATIVE:
error = mempool_assert_relative_time_exceeds(cvp, unspent, timestamp)
elif cvp.opcode is ConditionOpcode.ASSERT_MY_PARENT_ID:
error = mempool_assert_my_parent_id(cvp, unspent)
elif cvp.opcode is ConditionOpcode.ASSERT_MY_PUZZLEHASH:
error = mempool_assert_my_puzzlehash(cvp, unspent)
elif cvp.opcode is ConditionOpcode.ASSERT_MY_AMOUNT:
error = mempool_assert_my_amount(cvp, unspent)
if error:
return error
return None