chia-blockchain/tests/core/test_cost_calculation.py

230 lines
9.2 KiB
Python

import asyncio
import logging
import pathlib
import time
import pytest
from clvm_tools import binutils
from chia.consensus.condition_costs import ConditionCost
from chia.consensus.cost_calculator import NPCResult, calculate_cost_of_program
from chia.full_node.bundle_tools import simple_solution_generator
from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions, get_puzzle_and_solution_for_coin
from chia.types.blockchain_format.program import Program, SerializedProgram
from chia.types.generator_types import BlockGenerator
from chia.wallet.puzzles import p2_delegated_puzzle_or_hidden_puzzle
from tests.setup_nodes import bt, test_constants
from .make_block_generator import make_block_generator
BURN_PUZZLE_HASH = b"0" * 32
SMALL_BLOCK_GENERATOR = make_block_generator(1)
log = logging.getLogger(__name__)
@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.get_event_loop()
yield loop
def large_block_generator(size):
# make a small block and hash it
# use this in the name for the cached big block
# the idea is, if the algorithm for building the big block changes,
# the name of the cache file will also change
name = SMALL_BLOCK_GENERATOR.program.get_tree_hash().hex()[:16]
my_dir = pathlib.Path(__file__).absolute().parent
hex_path = my_dir / f"large-block-{name}-{size}.hex"
try:
with open(hex_path) as f:
hex_str = f.read()
return bytes.fromhex(hex_str)
except FileNotFoundError:
generator = make_block_generator(size)
blob = bytes(generator.program)
# TODO: Re-enable large-block*.hex but cache in ~/.chia/subdir
# with open(hex_path, "w") as f:
# f.write(blob.hex())
return blob
class TestCostCalculation:
@pytest.mark.asyncio
async def test_basics(self):
wallet_tool = bt.get_pool_wallet_tool()
ph = wallet_tool.get_new_puzzlehash()
num_blocks = 3
blocks = bt.get_consecutive_blocks(
num_blocks, [], guarantee_transaction_block=True, pool_reward_puzzle_hash=ph, farmer_reward_puzzle_hash=ph
)
coinbase = None
for coin in blocks[2].get_included_reward_coins():
if coin.puzzle_hash == ph:
coinbase = coin
break
assert coinbase is not None
spend_bundle = wallet_tool.generate_signed_transaction(
coinbase.amount,
BURN_PUZZLE_HASH,
coinbase,
)
assert spend_bundle is not None
program: BlockGenerator = simple_solution_generator(spend_bundle)
npc_result: NPCResult = get_name_puzzle_conditions(program, test_constants.MAX_BLOCK_COST_CLVM, False)
cost = calculate_cost_of_program(program.program, npc_result, test_constants.COST_PER_BYTE)
assert npc_result.error is None
coin_name = npc_result.npc_list[0].coin_name
error, puzzle, solution = get_puzzle_and_solution_for_coin(
program, coin_name, test_constants.MAX_BLOCK_COST_CLVM
)
assert error is None
# Create condition + agg_sig_condition + length + cpu_cost
assert (
cost
== ConditionCost.CREATE_COIN.value
+ ConditionCost.AGG_SIG.value
+ len(bytes(program.program)) * test_constants.COST_PER_BYTE
+ npc_result.clvm_cost
)
@pytest.mark.asyncio
async def test_strict_mode(self):
wallet_tool = bt.get_pool_wallet_tool()
ph = wallet_tool.get_new_puzzlehash()
num_blocks = 3
blocks = bt.get_consecutive_blocks(
num_blocks, [], guarantee_transaction_block=True, pool_reward_puzzle_hash=ph, farmer_reward_puzzle_hash=ph
)
coinbase = None
for coin in blocks[2].get_included_reward_coins():
if coin.puzzle_hash == ph:
coinbase = coin
break
assert coinbase is not None
spend_bundle = wallet_tool.generate_signed_transaction(
coinbase.amount,
BURN_PUZZLE_HASH,
coinbase,
)
assert spend_bundle is not None
pk = bytes.fromhex(
"88bc9360319e7c54ab42e19e974288a2d7a817976f7633f4b43f36ce72074e59c4ab8ddac362202f3e366f0aebbb6280"
)
puzzle = p2_delegated_puzzle_or_hidden_puzzle.puzzle_for_pk(pk)
disassembly = binutils.disassemble(puzzle)
program = SerializedProgram.from_bytes(
binutils.assemble(
f"(q ((0x3d2331635a58c0d49912bc1427d7db51afe3f20a7b4bcaffa17ee250dcbcbfaa {disassembly} 300"
f" (() (q . ((65 '00000000000000000000000000000000' 0x0cbba106e000))) ()))))"
).as_bin()
)
generator = BlockGenerator(program, [])
npc_result: NPCResult = get_name_puzzle_conditions(generator, test_constants.MAX_BLOCK_COST_CLVM, True)
assert npc_result.error is not None
npc_result: NPCResult = get_name_puzzle_conditions(generator, test_constants.MAX_BLOCK_COST_CLVM, False)
assert npc_result.error is None
coin_name = npc_result.npc_list[0].coin_name
error, puzzle, solution = get_puzzle_and_solution_for_coin(
generator, coin_name, test_constants.MAX_BLOCK_COST_CLVM
)
assert error is None
@pytest.mark.asyncio
async def test_clvm_strict_mode(self):
block = Program.from_bytes(bytes(SMALL_BLOCK_GENERATOR.program))
disassembly = binutils.disassemble(block)
# this is a valid generator program except the first clvm
# if-condition, that depends on executing an unknown operator
# ("0xfe"). In strict mode, this should fail, but in non-strict
# mode, the unknown operator should be treated as if it returns ().
program = SerializedProgram.from_bytes(binutils.assemble(f"(i (0xfe (q . 0)) (q . ()) {disassembly})").as_bin())
generator = BlockGenerator(program, [])
npc_result: NPCResult = get_name_puzzle_conditions(generator, test_constants.MAX_BLOCK_COST_CLVM, True)
assert npc_result.error is not None
npc_result: NPCResult = get_name_puzzle_conditions(generator, test_constants.MAX_BLOCK_COST_CLVM, False)
assert npc_result.error is None
@pytest.mark.asyncio
async def test_tx_generator_speed(self):
LARGE_BLOCK_COIN_CONSUMED_COUNT = 687
generator_bytes = large_block_generator(LARGE_BLOCK_COIN_CONSUMED_COUNT)
program = SerializedProgram.from_bytes(generator_bytes)
start_time = time.time()
generator = BlockGenerator(program, [])
npc_result = get_name_puzzle_conditions(generator, test_constants.MAX_BLOCK_COST_CLVM, False)
end_time = time.time()
duration = end_time - start_time
assert npc_result.error is None
assert len(npc_result.npc_list) == LARGE_BLOCK_COIN_CONSUMED_COUNT
log.info(f"Time spent: {duration}")
assert duration < 1
@pytest.mark.asyncio
async def test_clvm_max_cost(self):
block = Program.from_bytes(bytes(SMALL_BLOCK_GENERATOR.program))
disassembly = binutils.disassemble(block)
# this is a valid generator program except the first clvm
# if-condition, that depends on executing an unknown operator
# ("0xfe"). In strict mode, this should fail, but in non-strict
# mode, the unknown operator should be treated as if it returns ().
# the CLVM program has a cost of 391969
program = SerializedProgram.from_bytes(
binutils.assemble(f"(i (softfork (q . 10000000)) (q . ()) {disassembly})").as_bin()
)
# ensure we fail if the program exceeds the cost
generator = BlockGenerator(program, [])
npc_result: NPCResult = get_name_puzzle_conditions(generator, 10000000, False)
assert npc_result.error is not None
assert npc_result.clvm_cost == 0
# raise the max cost to make sure this passes
# ensure we pass if the program does not exceeds the cost
npc_result: NPCResult = get_name_puzzle_conditions(generator, 20000000, False)
assert npc_result.error is None
assert npc_result.clvm_cost > 10000000
@pytest.mark.asyncio
async def test_standard_tx(self):
# this isn't a real public key, but we don't care
public_key = bytes.fromhex(
"af949b78fa6a957602c3593a3d6cb7711e08720415dad83" "1ab18adacaa9b27ec3dda508ee32e24bc811c0abc5781ae21"
)
puzzle_program = SerializedProgram.from_bytes(p2_delegated_puzzle_or_hidden_puzzle.puzzle_for_pk(public_key))
conditions = binutils.assemble(
"((51 0x699eca24f2b6f4b25b16f7a418d0dc4fc5fce3b9145aecdda184158927738e3e 10)"
" (51 0x847bb2385534070c39a39cc5dfdc7b35e2db472dc0ab10ab4dec157a2178adbf 0x00cbba106df6))"
)
solution_program = SerializedProgram.from_bytes(
p2_delegated_puzzle_or_hidden_puzzle.solution_for_conditions(conditions)
)
time_start = time.time()
total_cost = 0
for i in range(0, 1000):
cost, result = puzzle_program.run_with_cost(test_constants.MAX_BLOCK_COST_CLVM, solution_program)
total_cost += cost
time_end = time.time()
duration = time_end - time_start
log.info(f"Time spent: {duration}")
assert duration < 3