2592 lines
121 KiB
Python
2592 lines
121 KiB
Python
# flake8: noqa: F811, F401
|
|
import asyncio
|
|
import logging
|
|
import multiprocessing
|
|
import time
|
|
from dataclasses import replace
|
|
from secrets import token_bytes
|
|
|
|
import pytest
|
|
from blspy import AugSchemeMPL, G2Element
|
|
from clvm.casts import int_to_bytes
|
|
|
|
from chia.consensus.block_rewards import calculate_base_farmer_reward
|
|
from chia.consensus.blockchain import ReceiveBlockResult
|
|
from chia.consensus.coinbase import create_farmer_coin
|
|
from chia.consensus.pot_iterations import is_overflow_block
|
|
from chia.full_node.bundle_tools import detect_potential_template_generator
|
|
from chia.types.blockchain_format.classgroup import ClassgroupElement
|
|
from chia.types.blockchain_format.coin import Coin
|
|
from chia.types.blockchain_format.foliage import TransactionsInfo
|
|
from chia.types.blockchain_format.program import SerializedProgram
|
|
from chia.types.blockchain_format.sized_bytes import bytes32
|
|
from chia.types.blockchain_format.slots import InfusedChallengeChainSubSlot
|
|
from chia.types.blockchain_format.vdf import VDFInfo, VDFProof
|
|
from chia.types.condition_opcodes import ConditionOpcode
|
|
from chia.types.condition_with_args import ConditionWithArgs
|
|
from chia.types.end_of_slot_bundle import EndOfSubSlotBundle
|
|
from chia.types.full_block import FullBlock
|
|
from chia.types.spend_bundle import SpendBundle
|
|
from chia.types.unfinished_block import UnfinishedBlock
|
|
from chia.util.block_tools import BlockTools, get_vdf_info_and_proof
|
|
from chia.util.errors import Err
|
|
from chia.util.hash import std_hash
|
|
from chia.util.ints import uint8, uint64, uint32
|
|
from chia.util.merkle_set import MerkleSet
|
|
from chia.util.recursive_replace import recursive_replace
|
|
from chia.util.wallet_tools import WalletTool
|
|
from tests.core.fixtures import default_400_blocks # noqa: F401; noqa: F401
|
|
from tests.core.fixtures import default_1000_blocks # noqa: F401
|
|
from tests.core.fixtures import default_10000_blocks # noqa: F401
|
|
from tests.core.fixtures import default_10000_blocks_compact # noqa: F401
|
|
from tests.core.fixtures import empty_blockchain # noqa: F401
|
|
from tests.core.fixtures import create_blockchain
|
|
from tests.setup_nodes import bt, test_constants
|
|
|
|
log = logging.getLogger(__name__)
|
|
bad_element = ClassgroupElement.from_bytes(b"\x00")
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def event_loop():
|
|
loop = asyncio.get_event_loop()
|
|
yield loop
|
|
|
|
|
|
class TestGenesisBlock:
|
|
@pytest.mark.asyncio
|
|
async def test_block_tools_proofs_400(self, default_400_blocks):
|
|
vdf, proof = get_vdf_info_and_proof(
|
|
test_constants, ClassgroupElement.get_default_element(), test_constants.GENESIS_CHALLENGE, uint64(231)
|
|
)
|
|
if proof.is_valid(test_constants, ClassgroupElement.get_default_element(), vdf) is False:
|
|
raise Exception("invalid proof")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_block_tools_proofs_1000(self, default_1000_blocks):
|
|
vdf, proof = get_vdf_info_and_proof(
|
|
test_constants, ClassgroupElement.get_default_element(), test_constants.GENESIS_CHALLENGE, uint64(231)
|
|
)
|
|
if proof.is_valid(test_constants, ClassgroupElement.get_default_element(), vdf) is False:
|
|
raise Exception("invalid proof")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_block_tools_proofs(self):
|
|
vdf, proof = get_vdf_info_and_proof(
|
|
test_constants, ClassgroupElement.get_default_element(), test_constants.GENESIS_CHALLENGE, uint64(231)
|
|
)
|
|
if proof.is_valid(test_constants, ClassgroupElement.get_default_element(), vdf) is False:
|
|
raise Exception("invalid proof")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_non_overflow_genesis(self, empty_blockchain):
|
|
assert empty_blockchain.get_peak() is None
|
|
genesis = bt.get_consecutive_blocks(1, force_overflow=False)[0]
|
|
result, err, _ = await empty_blockchain.receive_block(genesis)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert empty_blockchain.get_peak().height == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_overflow_genesis(self, empty_blockchain):
|
|
genesis = bt.get_consecutive_blocks(1, force_overflow=True)[0]
|
|
result, err, _ = await empty_blockchain.receive_block(genesis)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_genesis_empty_slots(self, empty_blockchain):
|
|
genesis = bt.get_consecutive_blocks(1, force_overflow=False, skip_slots=30)[0]
|
|
result, err, _ = await empty_blockchain.receive_block(genesis)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_overflow_genesis_empty_slots(self, empty_blockchain):
|
|
genesis = bt.get_consecutive_blocks(1, force_overflow=True, skip_slots=3)[0]
|
|
result, err, _ = await empty_blockchain.receive_block(genesis)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_genesis_validate_1(self, empty_blockchain):
|
|
genesis = bt.get_consecutive_blocks(1, force_overflow=False)[0]
|
|
bad_prev = bytes([1] * 32)
|
|
genesis = recursive_replace(genesis, "foliage.prev_block_hash", bad_prev)
|
|
result, err, _ = await empty_blockchain.receive_block(genesis)
|
|
assert err == Err.INVALID_PREV_BLOCK_HASH
|
|
|
|
|
|
class TestBlockHeaderValidation:
|
|
@pytest.mark.asyncio
|
|
async def test_long_chain(self, empty_blockchain, default_1000_blocks):
|
|
blocks = default_1000_blocks
|
|
for block in blocks:
|
|
if (
|
|
len(block.finished_sub_slots) > 0
|
|
and block.finished_sub_slots[0].challenge_chain.subepoch_summary_hash is not None
|
|
):
|
|
# Sub/Epoch. Try using a bad ssi and difficulty to test 2m and 2n
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[0],
|
|
"challenge_chain.new_sub_slot_iters",
|
|
uint64(10000000),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", [new_finished_ss] + block.finished_sub_slots[1:]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_NEW_SUB_SLOT_ITERS
|
|
new_finished_ss_2 = recursive_replace(
|
|
block.finished_sub_slots[0],
|
|
"challenge_chain.new_difficulty",
|
|
uint64(10000000),
|
|
)
|
|
block_bad_2 = recursive_replace(
|
|
block, "finished_sub_slots", [new_finished_ss_2] + block.finished_sub_slots[1:]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_2)
|
|
assert err == Err.INVALID_NEW_DIFFICULTY
|
|
|
|
# 3c
|
|
new_finished_ss_3: EndOfSubSlotBundle = recursive_replace(
|
|
block.finished_sub_slots[0],
|
|
"challenge_chain.subepoch_summary_hash",
|
|
bytes([0] * 32),
|
|
)
|
|
new_finished_ss_3 = recursive_replace(
|
|
new_finished_ss_3,
|
|
"reward_chain.challenge_chain_sub_slot_hash",
|
|
new_finished_ss_3.challenge_chain.get_hash(),
|
|
)
|
|
block_bad_3 = recursive_replace(
|
|
block, "finished_sub_slots", [new_finished_ss_3] + block.finished_sub_slots[1:]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_3)
|
|
assert err == Err.INVALID_SUB_EPOCH_SUMMARY
|
|
|
|
# 3d
|
|
new_finished_ss_4 = recursive_replace(
|
|
block.finished_sub_slots[0],
|
|
"challenge_chain.subepoch_summary_hash",
|
|
None,
|
|
)
|
|
new_finished_ss_4 = recursive_replace(
|
|
new_finished_ss_4,
|
|
"reward_chain.challenge_chain_sub_slot_hash",
|
|
new_finished_ss_4.challenge_chain.get_hash(),
|
|
)
|
|
block_bad_4 = recursive_replace(
|
|
block, "finished_sub_slots", [new_finished_ss_4] + block.finished_sub_slots[1:]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_4)
|
|
assert err == Err.INVALID_SUB_EPOCH_SUMMARY or err == Err.INVALID_NEW_SUB_SLOT_ITERS
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
log.info(
|
|
f"Added block {block.height} total iters {block.total_iters} "
|
|
f"new slot? {len(block.finished_sub_slots)}"
|
|
)
|
|
assert empty_blockchain.get_peak().height == len(blocks) - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unfinished_blocks(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(3)
|
|
for block in blocks[:-1]:
|
|
result, err, _ = await blockchain.receive_block(block)
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
block = blocks[-1]
|
|
unf = UnfinishedBlock(
|
|
block.finished_sub_slots,
|
|
block.reward_chain_block.get_unfinished(),
|
|
block.challenge_chain_sp_proof,
|
|
block.reward_chain_sp_proof,
|
|
block.foliage,
|
|
block.foliage_transaction_block,
|
|
block.transactions_info,
|
|
block.transactions_generator,
|
|
[],
|
|
)
|
|
validate_res = await blockchain.validate_unfinished_block(unf, False)
|
|
err = validate_res.error
|
|
assert err is None
|
|
result, err, _ = await blockchain.receive_block(block)
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, force_overflow=True)
|
|
block = blocks[-1]
|
|
unf = UnfinishedBlock(
|
|
block.finished_sub_slots,
|
|
block.reward_chain_block.get_unfinished(),
|
|
block.challenge_chain_sp_proof,
|
|
block.reward_chain_sp_proof,
|
|
block.foliage,
|
|
block.foliage_transaction_block,
|
|
block.transactions_info,
|
|
block.transactions_generator,
|
|
[],
|
|
)
|
|
validate_res = await blockchain.validate_unfinished_block(unf, False)
|
|
assert validate_res.error is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_genesis(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
for block in bt.get_consecutive_blocks(2, skip_slots=3):
|
|
result, err, _ = await blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_slots_non_genesis(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(10)
|
|
for block in blocks:
|
|
result, err, _ = await blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
blocks = bt.get_consecutive_blocks(10, skip_slots=2, block_list_input=blocks)
|
|
for block in blocks[10:]:
|
|
result, err, _ = await blockchain.receive_block(block)
|
|
assert err is None
|
|
assert blockchain.get_peak().height == 19
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_one_sb_per_slot(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
num_blocks = 20
|
|
blocks = []
|
|
for i in range(num_blocks):
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, skip_slots=1)
|
|
result, err, _ = await blockchain.receive_block(blocks[-1])
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert blockchain.get_peak().height == num_blocks - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_all_overflow(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
num_rounds = 5
|
|
blocks = []
|
|
num_blocks = 0
|
|
for i in range(1, num_rounds):
|
|
num_blocks += i
|
|
blocks = bt.get_consecutive_blocks(i, block_list_input=blocks, skip_slots=1, force_overflow=True)
|
|
for block in blocks[-i:]:
|
|
result, err, _ = await blockchain.receive_block(block)
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert err is None
|
|
assert blockchain.get_peak().height == num_blocks - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unf_block_overflow(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
|
|
blocks = []
|
|
while True:
|
|
# This creates an overflow block, then a normal block, and then an overflow in the next sub-slot
|
|
# blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, force_overflow=True)
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, force_overflow=True)
|
|
|
|
await blockchain.receive_block(blocks[-2])
|
|
|
|
sb_1 = blockchain.block_record(blocks[-2].header_hash)
|
|
|
|
sb_2_next_ss = blocks[-1].total_iters - blocks[-2].total_iters < sb_1.sub_slot_iters
|
|
# We might not get a normal block for sb_2, and we might not get them in the right slots
|
|
# So this while loop keeps trying
|
|
if sb_1.overflow and sb_2_next_ss:
|
|
block = blocks[-1]
|
|
unf = UnfinishedBlock(
|
|
[],
|
|
block.reward_chain_block.get_unfinished(),
|
|
block.challenge_chain_sp_proof,
|
|
block.reward_chain_sp_proof,
|
|
block.foliage,
|
|
block.foliage_transaction_block,
|
|
block.transactions_info,
|
|
block.transactions_generator,
|
|
[],
|
|
)
|
|
validate_res = await blockchain.validate_unfinished_block(unf, skip_overflow_ss_validation=True)
|
|
assert validate_res.error is None
|
|
return None
|
|
|
|
await blockchain.receive_block(blocks[-1])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_one_sb_per_two_slots(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
num_blocks = 20
|
|
blocks = []
|
|
for i in range(num_blocks): # Same thing, but 2 sub-slots per block
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, skip_slots=2)
|
|
result, err, _ = await blockchain.receive_block(blocks[-1])
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert blockchain.get_peak().height == num_blocks - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_one_sb_per_five_slots(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
num_blocks = 10
|
|
blocks = []
|
|
for i in range(num_blocks): # Same thing, but 5 sub-slots per block
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, skip_slots=5)
|
|
result, err, _ = await blockchain.receive_block(blocks[-1])
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert blockchain.get_peak().height == num_blocks - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_basic_chain_overflow(self, empty_blockchain):
|
|
blocks = bt.get_consecutive_blocks(5, force_overflow=True)
|
|
for block in blocks:
|
|
result, err, _ = await empty_blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert empty_blockchain.get_peak().height == len(blocks) - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_one_sb_per_two_slots_force_overflow(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
num_blocks = 10
|
|
blocks = []
|
|
for i in range(num_blocks):
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, skip_slots=2, force_overflow=True)
|
|
result, err, _ = await blockchain.receive_block(blocks[-1])
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert blockchain.get_peak().height == num_blocks - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_prev(self, empty_blockchain):
|
|
# 1
|
|
blocks = bt.get_consecutive_blocks(2, force_overflow=False)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_1_bad = recursive_replace(blocks[-1], "foliage.prev_block_hash", bytes([0] * 32))
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(block_1_bad)
|
|
assert result == ReceiveBlockResult.DISCONNECTED_BLOCK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_pospace(self, empty_blockchain):
|
|
# 2
|
|
blocks = bt.get_consecutive_blocks(2, force_overflow=False)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_1_bad = recursive_replace(blocks[-1], "reward_chain_block.proof_of_space.proof", bytes([0] * 32))
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(block_1_bad)
|
|
assert result == ReceiveBlockResult.INVALID_BLOCK
|
|
assert err == Err.INVALID_POSPACE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_sub_slot_challenge_hash_genesis(self, empty_blockchain):
|
|
# 2a
|
|
blocks = bt.get_consecutive_blocks(1, force_overflow=False, skip_slots=1)
|
|
new_finished_ss = recursive_replace(
|
|
blocks[0].finished_sub_slots[0],
|
|
"challenge_chain.challenge_chain_end_of_slot_vdf.challenge",
|
|
bytes([2] * 32),
|
|
)
|
|
block_0_bad = recursive_replace(
|
|
blocks[0], "finished_sub_slots", [new_finished_ss] + blocks[0].finished_sub_slots[1:]
|
|
)
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(block_0_bad)
|
|
assert result == ReceiveBlockResult.INVALID_BLOCK
|
|
assert err == Err.INVALID_PREV_CHALLENGE_SLOT_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_sub_slot_challenge_hash_non_genesis(self, empty_blockchain):
|
|
# 2b
|
|
blocks = bt.get_consecutive_blocks(1, force_overflow=False, skip_slots=0)
|
|
blocks = bt.get_consecutive_blocks(1, force_overflow=False, skip_slots=1, block_list_input=blocks)
|
|
new_finished_ss = recursive_replace(
|
|
blocks[1].finished_sub_slots[0],
|
|
"challenge_chain.challenge_chain_end_of_slot_vdf.challenge",
|
|
bytes([2] * 32),
|
|
)
|
|
block_1_bad = recursive_replace(
|
|
blocks[1], "finished_sub_slots", [new_finished_ss] + blocks[1].finished_sub_slots[1:]
|
|
)
|
|
|
|
_, _, _ = await empty_blockchain.receive_block(blocks[0])
|
|
result, err, _ = await empty_blockchain.receive_block(block_1_bad)
|
|
assert result == ReceiveBlockResult.INVALID_BLOCK
|
|
assert err == Err.INVALID_PREV_CHALLENGE_SLOT_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_sub_slot_challenge_hash_empty_ss(self, empty_blockchain):
|
|
# 2c
|
|
blocks = bt.get_consecutive_blocks(1, force_overflow=False, skip_slots=0)
|
|
blocks = bt.get_consecutive_blocks(1, force_overflow=False, skip_slots=2, block_list_input=blocks)
|
|
new_finished_ss = recursive_replace(
|
|
blocks[1].finished_sub_slots[-1],
|
|
"challenge_chain.challenge_chain_end_of_slot_vdf.challenge",
|
|
bytes([2] * 32),
|
|
)
|
|
block_1_bad = recursive_replace(
|
|
blocks[1], "finished_sub_slots", blocks[1].finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
|
|
_, _, _ = await empty_blockchain.receive_block(blocks[0])
|
|
result, err, _ = await empty_blockchain.receive_block(block_1_bad)
|
|
assert result == ReceiveBlockResult.INVALID_BLOCK
|
|
assert err == Err.INVALID_PREV_CHALLENGE_SLOT_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_genesis_no_icc(self, empty_blockchain):
|
|
# 2d
|
|
blocks = bt.get_consecutive_blocks(1, force_overflow=False, skip_slots=1)
|
|
new_finished_ss = recursive_replace(
|
|
blocks[0].finished_sub_slots[0],
|
|
"infused_challenge_chain",
|
|
InfusedChallengeChainSubSlot(
|
|
VDFInfo(
|
|
bytes([0] * 32),
|
|
uint64(1200),
|
|
ClassgroupElement.get_default_element(),
|
|
)
|
|
),
|
|
)
|
|
block_0_bad = recursive_replace(
|
|
blocks[0], "finished_sub_slots", [new_finished_ss] + blocks[0].finished_sub_slots[1:]
|
|
)
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(block_0_bad)
|
|
assert result == ReceiveBlockResult.INVALID_BLOCK
|
|
assert err == Err.SHOULD_NOT_HAVE_ICC
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_icc_sub_slot_vdf(self):
|
|
bt_high_iters = BlockTools(
|
|
constants=test_constants.replace(SUB_SLOT_ITERS_STARTING=(2 ** 12), DIFFICULTY_STARTING=(2 ** 14))
|
|
)
|
|
bc1, connection, db_path = await create_blockchain(bt_high_iters.constants)
|
|
blocks = bt_high_iters.get_consecutive_blocks(10)
|
|
for block in blocks:
|
|
if len(block.finished_sub_slots) > 0 and block.finished_sub_slots[-1].infused_challenge_chain is not None:
|
|
# Bad iters
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"infused_challenge_chain",
|
|
InfusedChallengeChainSubSlot(
|
|
replace(
|
|
block.finished_sub_slots[
|
|
-1
|
|
].infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf,
|
|
number_of_iterations=10000000,
|
|
)
|
|
),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
result, err, _ = await bc1.receive_block(block_bad)
|
|
assert err == Err.INVALID_ICC_EOS_VDF
|
|
|
|
# Bad output
|
|
new_finished_ss_2 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"infused_challenge_chain",
|
|
InfusedChallengeChainSubSlot(
|
|
replace(
|
|
block.finished_sub_slots[
|
|
-1
|
|
].infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf,
|
|
output=ClassgroupElement.get_default_element(),
|
|
)
|
|
),
|
|
)
|
|
log.warning(f"Proof: {block.finished_sub_slots[-1].proofs}")
|
|
block_bad_2 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_2]
|
|
)
|
|
result, err, _ = await bc1.receive_block(block_bad_2)
|
|
assert err == Err.INVALID_ICC_EOS_VDF
|
|
|
|
# Bad challenge hash
|
|
new_finished_ss_3 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"infused_challenge_chain",
|
|
InfusedChallengeChainSubSlot(
|
|
replace(
|
|
block.finished_sub_slots[
|
|
-1
|
|
].infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf,
|
|
challenge=bytes([0] * 32),
|
|
)
|
|
),
|
|
)
|
|
block_bad_3 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_3]
|
|
)
|
|
result, err, _ = await bc1.receive_block(block_bad_3)
|
|
assert err == Err.INVALID_ICC_EOS_VDF
|
|
|
|
# Bad proof
|
|
new_finished_ss_5 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"proofs.infused_challenge_chain_slot_proof",
|
|
VDFProof(uint8(0), b"1239819023890", False),
|
|
)
|
|
block_bad_5 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_5]
|
|
)
|
|
result, err, _ = await bc1.receive_block(block_bad_5)
|
|
assert err == Err.INVALID_ICC_EOS_VDF
|
|
|
|
result, err, _ = await bc1.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
await connection.close()
|
|
bc1.shut_down()
|
|
db_path.unlink()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_icc_into_cc(self, empty_blockchain):
|
|
blockchain = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
case_1, case_2 = False, False
|
|
while not case_1 or not case_2:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, skip_slots=1)
|
|
block = blocks[-1]
|
|
if len(block.finished_sub_slots) > 0 and block.finished_sub_slots[-1].infused_challenge_chain is not None:
|
|
if block.finished_sub_slots[-1].reward_chain.deficit == test_constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK:
|
|
# 2g
|
|
case_1 = True
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"challenge_chain",
|
|
replace(
|
|
block.finished_sub_slots[-1].challenge_chain,
|
|
infused_challenge_chain_sub_slot_hash=bytes([1] * 32),
|
|
),
|
|
)
|
|
else:
|
|
# 2h
|
|
case_2 = True
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"challenge_chain",
|
|
replace(
|
|
block.finished_sub_slots[-1].challenge_chain,
|
|
infused_challenge_chain_sub_slot_hash=block.finished_sub_slots[
|
|
-1
|
|
].infused_challenge_chain.get_hash(),
|
|
),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
result, err, _ = await blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_ICC_HASH_CC
|
|
|
|
# 2i
|
|
new_finished_ss_bad_rc = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"reward_chain",
|
|
replace(block.finished_sub_slots[-1].reward_chain, infused_challenge_chain_sub_slot_hash=None),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_bad_rc]
|
|
)
|
|
result, err, _ = await blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_ICC_HASH_RC
|
|
elif len(block.finished_sub_slots) > 0 and block.finished_sub_slots[-1].infused_challenge_chain is None:
|
|
# 2j
|
|
new_finished_ss_bad_cc = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"challenge_chain",
|
|
replace(
|
|
block.finished_sub_slots[-1].challenge_chain,
|
|
infused_challenge_chain_sub_slot_hash=bytes([1] * 32),
|
|
),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_bad_cc]
|
|
)
|
|
result, err, _ = await blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_ICC_HASH_CC
|
|
|
|
# 2k
|
|
new_finished_ss_bad_rc = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"reward_chain",
|
|
replace(
|
|
block.finished_sub_slots[-1].reward_chain, infused_challenge_chain_sub_slot_hash=bytes([1] * 32)
|
|
),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_bad_rc]
|
|
)
|
|
result, err, _ = await blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_ICC_HASH_RC
|
|
|
|
# Finally, add the block properly
|
|
result, err, _ = await blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_slot_no_ses(self, empty_blockchain):
|
|
# 2l
|
|
blockchain = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, skip_slots=4)
|
|
|
|
new_finished_ss = recursive_replace(
|
|
blocks[-1].finished_sub_slots[-1],
|
|
"challenge_chain",
|
|
replace(blocks[-1].finished_sub_slots[-1].challenge_chain, subepoch_summary_hash=std_hash(b"0")),
|
|
)
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "finished_sub_slots", blocks[-1].finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
result, err, _ = await blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_SUB_EPOCH_SUMMARY_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_sub_slots_epoch(self, empty_blockchain):
|
|
# 2m
|
|
# Tests adding an empty sub slot after the sub-epoch / epoch.
|
|
# Also tests overflow block in epoch
|
|
blocks_base = bt.get_consecutive_blocks(test_constants.EPOCH_BLOCKS)
|
|
blocks_1 = bt.get_consecutive_blocks(1, block_list_input=blocks_base, force_overflow=True)
|
|
blocks_2 = bt.get_consecutive_blocks(1, skip_slots=1, block_list_input=blocks_base, force_overflow=True)
|
|
blocks_3 = bt.get_consecutive_blocks(1, skip_slots=2, block_list_input=blocks_base, force_overflow=True)
|
|
blocks_4 = bt.get_consecutive_blocks(1, block_list_input=blocks_base)
|
|
for block in blocks_base:
|
|
result, err, _ = await empty_blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
for block in [blocks_1[-1], blocks_2[-1], blocks_3[-1], blocks_4[-1]]:
|
|
result, err, _ = await empty_blockchain.receive_block(block)
|
|
assert err is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wrong_cc_hash_rc(self, empty_blockchain):
|
|
# 2o
|
|
blockchain = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(1, skip_slots=1)
|
|
blocks = bt.get_consecutive_blocks(1, skip_slots=1, block_list_input=blocks)
|
|
assert (await blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
new_finished_ss = recursive_replace(
|
|
blocks[-1].finished_sub_slots[-1],
|
|
"reward_chain",
|
|
replace(blocks[-1].finished_sub_slots[-1].reward_chain, challenge_chain_sub_slot_hash=bytes([3] * 32)),
|
|
)
|
|
block_1_bad = recursive_replace(
|
|
blocks[-1], "finished_sub_slots", blocks[-1].finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
|
|
result, err, _ = await blockchain.receive_block(block_1_bad)
|
|
assert result == ReceiveBlockResult.INVALID_BLOCK
|
|
assert err == Err.INVALID_CHALLENGE_SLOT_HASH_RC
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_cc_sub_slot_vdf(self, empty_blockchain):
|
|
# 2q
|
|
blocks = bt.get_consecutive_blocks(10)
|
|
|
|
for block in blocks:
|
|
if len(block.finished_sub_slots):
|
|
# Bad iters
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"challenge_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[-1].challenge_chain,
|
|
"challenge_chain_end_of_slot_vdf.number_of_iterations",
|
|
uint64(10000000),
|
|
),
|
|
)
|
|
new_finished_ss = recursive_replace(
|
|
new_finished_ss,
|
|
"reward_chain.challenge_chain_sub_slot_hash",
|
|
new_finished_ss.challenge_chain.get_hash(),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_CC_EOS_VDF
|
|
|
|
# Bad output
|
|
new_finished_ss_2 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"challenge_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[-1].challenge_chain,
|
|
"challenge_chain_end_of_slot_vdf.output",
|
|
ClassgroupElement.get_default_element(),
|
|
),
|
|
)
|
|
|
|
new_finished_ss_2 = recursive_replace(
|
|
new_finished_ss_2,
|
|
"reward_chain.challenge_chain_sub_slot_hash",
|
|
new_finished_ss_2.challenge_chain.get_hash(),
|
|
)
|
|
block_bad_2 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_2]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_2)
|
|
assert err == Err.INVALID_CC_EOS_VDF
|
|
|
|
# Bad challenge hash
|
|
new_finished_ss_3 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"challenge_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[-1].challenge_chain,
|
|
"challenge_chain_end_of_slot_vdf.challenge",
|
|
bytes([1] * 32),
|
|
),
|
|
)
|
|
|
|
new_finished_ss_3 = recursive_replace(
|
|
new_finished_ss_3,
|
|
"reward_chain.challenge_chain_sub_slot_hash",
|
|
new_finished_ss_3.challenge_chain.get_hash(),
|
|
)
|
|
block_bad_3 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_3]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_3)
|
|
assert err == Err.INVALID_CC_EOS_VDF or err == Err.INVALID_PREV_CHALLENGE_SLOT_HASH
|
|
|
|
# Bad proof
|
|
new_finished_ss_5 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"proofs.challenge_chain_slot_proof",
|
|
VDFProof(uint8(0), b"1239819023890", False),
|
|
)
|
|
block_bad_5 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_5]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_5)
|
|
assert err == Err.INVALID_CC_EOS_VDF
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_rc_sub_slot_vdf(self, empty_blockchain):
|
|
# 2p
|
|
blocks = bt.get_consecutive_blocks(10)
|
|
for block in blocks:
|
|
if len(block.finished_sub_slots):
|
|
# Bad iters
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"reward_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[-1].reward_chain,
|
|
"end_of_slot_vdf.number_of_iterations",
|
|
uint64(10000000),
|
|
),
|
|
)
|
|
block_bad = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_RC_EOS_VDF
|
|
|
|
# Bad output
|
|
new_finished_ss_2 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"reward_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[-1].reward_chain,
|
|
"end_of_slot_vdf.output",
|
|
ClassgroupElement.get_default_element(),
|
|
),
|
|
)
|
|
block_bad_2 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_2]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_2)
|
|
assert err == Err.INVALID_RC_EOS_VDF
|
|
|
|
# Bad challenge hash
|
|
new_finished_ss_3 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"reward_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[-1].reward_chain,
|
|
"end_of_slot_vdf.challenge",
|
|
bytes([1] * 32),
|
|
),
|
|
)
|
|
block_bad_3 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_3]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_3)
|
|
assert err == Err.INVALID_RC_EOS_VDF
|
|
|
|
# Bad proof
|
|
new_finished_ss_5 = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"proofs.reward_chain_slot_proof",
|
|
VDFProof(uint8(0), b"1239819023890", False),
|
|
)
|
|
block_bad_5 = recursive_replace(
|
|
block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss_5]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad_5)
|
|
assert err == Err.INVALID_RC_EOS_VDF
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(block)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_genesis_bad_deficit(self, empty_blockchain):
|
|
# 2r
|
|
block = bt.get_consecutive_blocks(1, skip_slots=2)[0]
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[-1],
|
|
"reward_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[-1].reward_chain,
|
|
"deficit",
|
|
test_constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK - 1,
|
|
),
|
|
)
|
|
block_bad = recursive_replace(block, "finished_sub_slots", block.finished_sub_slots[:-1] + [new_finished_ss])
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_DEFICIT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reset_deficit(self, empty_blockchain):
|
|
# 2s, 2t
|
|
blockchain = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
await empty_blockchain.receive_block(blocks[0])
|
|
await empty_blockchain.receive_block(blocks[1])
|
|
case_1, case_2 = False, False
|
|
while not case_1 or not case_2:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks, skip_slots=1)
|
|
if len(blocks[-1].finished_sub_slots) > 0:
|
|
new_finished_ss = recursive_replace(
|
|
blocks[-1].finished_sub_slots[-1],
|
|
"reward_chain",
|
|
recursive_replace(
|
|
blocks[-1].finished_sub_slots[-1].reward_chain,
|
|
"deficit",
|
|
uint8(0),
|
|
),
|
|
)
|
|
if blockchain.block_record(blocks[-2].header_hash).deficit == 0:
|
|
case_1 = True
|
|
else:
|
|
case_2 = True
|
|
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "finished_sub_slots", blocks[-1].finished_sub_slots[:-1] + [new_finished_ss]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_DEFICIT or err == Err.INVALID_ICC_HASH_CC
|
|
|
|
result, err, _ = await empty_blockchain.receive_block(blocks[-1])
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_genesis_has_ses(self, empty_blockchain):
|
|
# 3a
|
|
block = bt.get_consecutive_blocks(1, skip_slots=1)[0]
|
|
new_finished_ss = recursive_replace(
|
|
block.finished_sub_slots[0],
|
|
"challenge_chain",
|
|
recursive_replace(
|
|
block.finished_sub_slots[0].challenge_chain,
|
|
"subepoch_summary_hash",
|
|
bytes([0] * 32),
|
|
),
|
|
)
|
|
|
|
new_finished_ss = recursive_replace(
|
|
new_finished_ss,
|
|
"reward_chain",
|
|
replace(
|
|
new_finished_ss.reward_chain, challenge_chain_sub_slot_hash=new_finished_ss.challenge_chain.get_hash()
|
|
),
|
|
)
|
|
block_bad = recursive_replace(block, "finished_sub_slots", [new_finished_ss] + block.finished_sub_slots[1:])
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_SUB_EPOCH_SUMMARY_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_ses_if_no_se(self, empty_blockchain):
|
|
# 3b
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if len(blocks[-1].finished_sub_slots) > 0:
|
|
new_finished_ss: EndOfSubSlotBundle = recursive_replace(
|
|
blocks[-1].finished_sub_slots[0],
|
|
"challenge_chain",
|
|
recursive_replace(
|
|
blocks[-1].finished_sub_slots[0].challenge_chain,
|
|
"subepoch_summary_hash",
|
|
bytes([0] * 32),
|
|
),
|
|
)
|
|
|
|
new_finished_ss = recursive_replace(
|
|
new_finished_ss,
|
|
"reward_chain",
|
|
replace(
|
|
new_finished_ss.reward_chain,
|
|
challenge_chain_sub_slot_hash=new_finished_ss.challenge_chain.get_hash(),
|
|
),
|
|
)
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "finished_sub_slots", [new_finished_ss] + blocks[-1].finished_sub_slots[1:]
|
|
)
|
|
result, err, _ = await empty_blockchain.receive_block(block_bad)
|
|
assert err == Err.INVALID_SUB_EPOCH_SUMMARY_HASH
|
|
return None
|
|
await empty_blockchain.receive_block(blocks[-1])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_too_many_blocks(self, empty_blockchain):
|
|
# 4: TODO
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_pos(self, empty_blockchain):
|
|
# 5
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
block_bad = recursive_replace(blocks[-1], "reward_chain_block.proof_of_space.challenge", std_hash(b""))
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POSPACE
|
|
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.proof_of_space.pool_contract_puzzle_hash", std_hash(b"")
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POSPACE
|
|
|
|
block_bad = recursive_replace(blocks[-1], "reward_chain_block.proof_of_space.size", 62)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POSPACE
|
|
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.proof_of_space.plot_public_key",
|
|
AugSchemeMPL.key_gen(std_hash(b"1231n")).get_g1(),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POSPACE
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.proof_of_space.size",
|
|
32,
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POSPACE
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.proof_of_space.proof",
|
|
bytes([1] * int(blocks[-1].reward_chain_block.proof_of_space.size * 64 / 8)),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POSPACE
|
|
|
|
# TODO: test not passing the plot filter
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_signage_point_index(self, empty_blockchain):
|
|
# 6
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
with pytest.raises(ValueError):
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.signage_point_index", test_constants.NUM_SPS_SUB_SLOT
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_SP_INDEX
|
|
with pytest.raises(ValueError):
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.signage_point_index", test_constants.NUM_SPS_SUB_SLOT + 1
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_SP_INDEX
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sp_0_no_sp(self, empty_blockchain):
|
|
# 7
|
|
blocks = []
|
|
case_1, case_2 = False, False
|
|
while not case_1 or not case_2:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].reward_chain_block.signage_point_index == 0:
|
|
case_1 = True
|
|
block_bad = recursive_replace(blocks[-1], "reward_chain_block.signage_point_index", uint8(1))
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_SP_INDEX
|
|
elif not is_overflow_block(test_constants, blocks[-1].reward_chain_block.signage_point_index):
|
|
case_2 = True
|
|
block_bad = recursive_replace(blocks[-1], "reward_chain_block.signage_point_index", uint8(0))
|
|
error_code = (await empty_blockchain.receive_block(block_bad))[1]
|
|
assert error_code == Err.INVALID_SP_INDEX or error_code == Err.INVALID_POSPACE
|
|
assert (await empty_blockchain.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_epoch_overflows(self, empty_blockchain):
|
|
# 9. TODO. This is hard to test because it requires modifying the block tools to make these special blocks
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_total_iters(self, empty_blockchain):
|
|
# 10
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.total_iters", blocks[-1].reward_chain_block.total_iters + 1
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_TOTAL_ITERS
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_rc_sp_vdf(self, empty_blockchain):
|
|
# 11
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].reward_chain_block.signage_point_index != 0:
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.reward_chain_sp_vdf.challenge", std_hash(b"1")
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_SP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.reward_chain_sp_vdf.output",
|
|
bad_element,
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_SP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.reward_chain_sp_vdf.number_of_iterations",
|
|
uint64(1111111111111),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_SP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_sp_proof",
|
|
VDFProof(uint8(0), std_hash(b""), False),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_SP_VDF
|
|
return None
|
|
assert (await empty_blockchain.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_rc_sp_sig(self, empty_blockchain):
|
|
# 12
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad = recursive_replace(blocks[-1], "reward_chain_block.reward_chain_sp_signature", G2Element.generator())
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_SIGNATURE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_cc_sp_vdf(self, empty_blockchain):
|
|
# 13. Note: does not validate fully due to proof of space being validated first
|
|
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].reward_chain_block.signage_point_index != 0:
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.challenge_chain_sp_vdf.challenge", std_hash(b"1")
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[0] == ReceiveBlockResult.INVALID_BLOCK
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.challenge_chain_sp_vdf.output",
|
|
bad_element,
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[0] == ReceiveBlockResult.INVALID_BLOCK
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.challenge_chain_sp_vdf.number_of_iterations",
|
|
uint64(1111111111111),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[0] == ReceiveBlockResult.INVALID_BLOCK
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"challenge_chain_sp_proof",
|
|
VDFProof(uint8(0), std_hash(b""), False),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_CC_SP_VDF
|
|
return None
|
|
assert (await empty_blockchain.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_cc_sp_sig(self, empty_blockchain):
|
|
# 14
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.challenge_chain_sp_signature", G2Element.generator()
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_CC_SIGNATURE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_is_transaction_block(self, empty_blockchain):
|
|
# 15: TODO
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_foliage_sb_sig(self, empty_blockchain):
|
|
# 16
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad = recursive_replace(blocks[-1], "foliage.foliage_block_data_signature", G2Element.generator())
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_PLOT_SIGNATURE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_foliage_transaction_block_sig(self, empty_blockchain):
|
|
# 17
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].foliage_transaction_block is not None:
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "foliage.foliage_transaction_block_signature", G2Element.generator()
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_PLOT_SIGNATURE
|
|
return None
|
|
assert (await empty_blockchain.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unfinished_reward_chain_sb_hash(self, empty_blockchain):
|
|
# 18
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage.foliage_block_data.unfinished_reward_block_hash", std_hash(b"2")
|
|
)
|
|
new_m = block_bad.foliage.foliage_block_data.get_hash()
|
|
new_fsb_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_block_data_signature", new_fsb_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_URSB_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pool_target_height(self, empty_blockchain):
|
|
# 19
|
|
blocks = bt.get_consecutive_blocks(3)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await empty_blockchain.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "foliage.foliage_block_data.pool_target.max_height", 1)
|
|
new_m = block_bad.foliage.foliage_block_data.get_hash()
|
|
new_fsb_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_block_data_signature", new_fsb_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.OLD_POOL_TARGET
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pool_target_pre_farm(self, empty_blockchain):
|
|
# 20a
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage.foliage_block_data.pool_target.puzzle_hash", std_hash(b"12")
|
|
)
|
|
new_m = block_bad.foliage.foliage_block_data.get_hash()
|
|
new_fsb_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_block_data_signature", new_fsb_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_PREFARM
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pool_target_signature(self, empty_blockchain):
|
|
# 20b
|
|
blocks_initial = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks_initial[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await empty_blockchain.receive_block(blocks_initial[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
attempts = 0
|
|
while True:
|
|
# Go until we get a block that has a pool pk, as opposed to a pool contract
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, blocks_initial, seed=std_hash(attempts.to_bytes(4, byteorder="big", signed=False))
|
|
)
|
|
if blocks[-1].foliage.foliage_block_data.pool_signature is not None:
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage.foliage_block_data.pool_signature", G2Element.generator()
|
|
)
|
|
new_m = block_bad.foliage.foliage_block_data.get_hash()
|
|
new_fsb_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_block_data_signature", new_fsb_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POOL_SIGNATURE
|
|
return None
|
|
attempts += 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pool_target_contract(self, empty_blockchain):
|
|
# 20c invalid pool target with contract
|
|
blocks_initial = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks_initial[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await empty_blockchain.receive_block(blocks_initial[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
attempts = 0
|
|
while True:
|
|
# Go until we get a block that has a pool contract opposed to a pool pk
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, blocks_initial, seed=std_hash(attempts.to_bytes(4, byteorder="big", signed=False))
|
|
)
|
|
if blocks[-1].foliage.foliage_block_data.pool_signature is None:
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage.foliage_block_data.pool_target.puzzle_hash", bytes32(token_bytes(32))
|
|
)
|
|
new_m = block_bad.foliage.foliage_block_data.get_hash()
|
|
new_fsb_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_block_data_signature", new_fsb_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_POOL_TARGET
|
|
return None
|
|
attempts += 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foliage_data_presence(self, empty_blockchain):
|
|
# 22
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
case_1, case_2 = False, False
|
|
while not case_1 or not case_2:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].foliage_transaction_block is not None:
|
|
case_1 = True
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "foliage.foliage_transaction_block_hash", None)
|
|
else:
|
|
case_2 = True
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage.foliage_transaction_block_hash", std_hash(b"")
|
|
)
|
|
err_code = (await empty_blockchain.receive_block(block_bad))[1]
|
|
assert err_code == Err.INVALID_FOLIAGE_BLOCK_PRESENCE or err_code == Err.INVALID_IS_TRANSACTION_BLOCK
|
|
await empty_blockchain.receive_block(blocks[-1])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foliage_transaction_block_hash(self, empty_blockchain):
|
|
# 23
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
case_1, case_2 = False, False
|
|
while not case_1 or not case_2:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].foliage_transaction_block is not None:
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage.foliage_transaction_block_hash", std_hash(b"2")
|
|
)
|
|
|
|
new_m = block_bad.foliage.foliage_transaction_block_hash
|
|
new_fbh_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_transaction_block_signature", new_fbh_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_FOLIAGE_BLOCK_HASH
|
|
return None
|
|
await empty_blockchain.receive_block(blocks[-1])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_genesis_bad_prev_block(self, empty_blockchain):
|
|
# 24a
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage_transaction_block.prev_transaction_block_hash", std_hash(b"2")
|
|
)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.foliage_transaction_block_hash", block_bad.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_bad.foliage.foliage_transaction_block_hash
|
|
new_fbh_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_transaction_block_signature", new_fbh_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_PREV_BLOCK_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_prev_block_non_genesis(self, empty_blockchain):
|
|
# 24b
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].foliage_transaction_block is not None:
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage_transaction_block.prev_transaction_block_hash", std_hash(b"2")
|
|
)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.foliage_transaction_block_hash", block_bad.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_bad.foliage.foliage_transaction_block_hash
|
|
new_fbh_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_transaction_block_signature", new_fbh_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_PREV_BLOCK_HASH
|
|
return None
|
|
await empty_blockchain.receive_block(blocks[-1])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_filter_hash(self, empty_blockchain):
|
|
# 25
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].foliage_transaction_block is not None:
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1], "foliage_transaction_block.filter_hash", std_hash(b"2")
|
|
)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.foliage_transaction_block_hash", block_bad.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_bad.foliage.foliage_transaction_block_hash
|
|
new_fbh_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_transaction_block_signature", new_fbh_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_TRANSACTIONS_FILTER_HASH
|
|
return None
|
|
await empty_blockchain.receive_block(blocks[-1])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_timestamp(self, empty_blockchain):
|
|
# 26
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if blocks[-1].foliage_transaction_block is not None:
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1],
|
|
"foliage_transaction_block.timestamp",
|
|
blocks[0].foliage_transaction_block.timestamp - 10,
|
|
)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.foliage_transaction_block_hash", block_bad.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_bad.foliage.foliage_transaction_block_hash
|
|
new_fbh_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_transaction_block_signature", new_fbh_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.TIMESTAMP_TOO_FAR_IN_PAST
|
|
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1],
|
|
"foliage_transaction_block.timestamp",
|
|
blocks[0].foliage_transaction_block.timestamp,
|
|
)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.foliage_transaction_block_hash", block_bad.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_bad.foliage.foliage_transaction_block_hash
|
|
new_fbh_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_transaction_block_signature", new_fbh_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.TIMESTAMP_TOO_FAR_IN_PAST
|
|
|
|
block_bad: FullBlock = recursive_replace(
|
|
blocks[-1],
|
|
"foliage_transaction_block.timestamp",
|
|
blocks[0].foliage_transaction_block.timestamp + 10000000,
|
|
)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.foliage_transaction_block_hash", block_bad.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_bad.foliage.foliage_transaction_block_hash
|
|
new_fbh_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block_bad = recursive_replace(block_bad, "foliage.foliage_transaction_block_signature", new_fbh_sig)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.TIMESTAMP_TOO_FAR_IN_FUTURE
|
|
return None
|
|
await empty_blockchain.receive_block(blocks[-1])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_height(self, empty_blockchain):
|
|
# 27
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "reward_chain_block.height", 2)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_HEIGHT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_height_genesis(self, empty_blockchain):
|
|
# 27
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "reward_chain_block.height", 1)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_PREV_BLOCK_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_weight(self, empty_blockchain):
|
|
# 28
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "reward_chain_block.weight", 22131)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_WEIGHT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_weight_genesis(self, empty_blockchain):
|
|
# 28
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "reward_chain_block.weight", 0)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_WEIGHT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_cc_ip_vdf(self, empty_blockchain):
|
|
# 29
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
block_bad = recursive_replace(blocks[-1], "reward_chain_block.challenge_chain_ip_vdf.challenge", std_hash(b"1"))
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_CC_IP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.challenge_chain_ip_vdf.output",
|
|
bad_element,
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_CC_IP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.challenge_chain_ip_vdf.number_of_iterations",
|
|
uint64(1111111111111),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_CC_IP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"challenge_chain_ip_proof",
|
|
VDFProof(uint8(0), std_hash(b""), False),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_CC_IP_VDF
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_rc_ip_vdf(self, empty_blockchain):
|
|
# 30
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
block_bad = recursive_replace(blocks[-1], "reward_chain_block.reward_chain_ip_vdf.challenge", std_hash(b"1"))
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_IP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.reward_chain_ip_vdf.output",
|
|
bad_element,
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_IP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.reward_chain_ip_vdf.number_of_iterations",
|
|
uint64(1111111111111),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_IP_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_ip_proof",
|
|
VDFProof(uint8(0), std_hash(b""), False),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_RC_IP_VDF
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_icc_ip_vdf(self, empty_blockchain):
|
|
# 31
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.infused_challenge_chain_ip_vdf.challenge", std_hash(b"1")
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_ICC_VDF
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_ICC_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.infused_challenge_chain_ip_vdf.output",
|
|
bad_element,
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_ICC_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"reward_chain_block.infused_challenge_chain_ip_vdf.number_of_iterations",
|
|
uint64(1111111111111),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_ICC_VDF
|
|
block_bad = recursive_replace(
|
|
blocks[-1],
|
|
"infused_challenge_chain_ip_proof",
|
|
VDFProof(uint8(0), std_hash(b""), False),
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_ICC_VDF
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reward_block_hash(self, empty_blockchain):
|
|
# 32
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "foliage.reward_block_hash", std_hash(b""))
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_REWARD_BLOCK_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reward_block_hash_2(self, empty_blockchain):
|
|
# 33
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
block_bad: FullBlock = recursive_replace(blocks[0], "reward_chain_block.is_transaction_block", False)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.reward_block_hash", block_bad.reward_chain_block.get_hash()
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_FOLIAGE_BLOCK_PRESENCE
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
# Test one which should not be a tx block
|
|
while True:
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
if not blocks[-1].is_transaction_block():
|
|
block_bad: FullBlock = recursive_replace(blocks[-1], "reward_chain_block.is_transaction_block", True)
|
|
block_bad: FullBlock = recursive_replace(
|
|
block_bad, "foliage.reward_block_hash", block_bad.reward_chain_block.get_hash()
|
|
)
|
|
assert (await empty_blockchain.receive_block(block_bad))[1] == Err.INVALID_FOLIAGE_BLOCK_PRESENCE
|
|
return None
|
|
assert (await empty_blockchain.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
|
|
class TestPreValidation:
|
|
@pytest.mark.asyncio
|
|
async def test_pre_validation_fails_bad_blocks(self, empty_blockchain):
|
|
blocks = bt.get_consecutive_blocks(2)
|
|
assert (await empty_blockchain.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
block_bad = recursive_replace(
|
|
blocks[-1], "reward_chain_block.total_iters", blocks[-1].reward_chain_block.total_iters + 1
|
|
)
|
|
res = await empty_blockchain.pre_validate_blocks_multiprocessing([blocks[0], block_bad], {})
|
|
assert res[0].error is None
|
|
assert res[1].error is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pre_validation(self, empty_blockchain, default_1000_blocks):
|
|
blocks = default_1000_blocks[:100]
|
|
start = time.time()
|
|
n_at_a_time = min(multiprocessing.cpu_count(), 32)
|
|
times_pv = []
|
|
times_rb = []
|
|
for i in range(0, len(blocks), n_at_a_time):
|
|
end_i = min(i + n_at_a_time, len(blocks))
|
|
blocks_to_validate = blocks[i:end_i]
|
|
start_pv = time.time()
|
|
res = await empty_blockchain.pre_validate_blocks_multiprocessing(blocks_to_validate, {})
|
|
end_pv = time.time()
|
|
times_pv.append(end_pv - start_pv)
|
|
assert res is not None
|
|
for n in range(end_i - i):
|
|
assert res[n] is not None
|
|
assert res[n].error is None
|
|
block = blocks_to_validate[n]
|
|
start_rb = time.time()
|
|
result, err, _ = await empty_blockchain.receive_block(block, res[n])
|
|
end_rb = time.time()
|
|
times_rb.append(end_rb - start_rb)
|
|
assert err is None
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
log.info(
|
|
f"Added block {block.height} total iters {block.total_iters} "
|
|
f"new slot? {len(block.finished_sub_slots)}, time {end_rb - start_rb}"
|
|
)
|
|
end = time.time()
|
|
log.info(f"Total time: {end - start} seconds")
|
|
log.info(f"Average pv: {sum(times_pv)/(len(blocks)/n_at_a_time)}")
|
|
log.info(f"Average rb: {sum(times_rb)/(len(blocks))}")
|
|
|
|
|
|
class TestBodyValidation:
|
|
@pytest.mark.asyncio
|
|
async def test_not_tx_block_but_has_data(self, empty_blockchain):
|
|
# 1
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(1)
|
|
while blocks[-1].foliage_transaction_block is not None:
|
|
assert (await b.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
blocks = bt.get_consecutive_blocks(1, block_list_input=blocks)
|
|
original_block: FullBlock = blocks[-1]
|
|
|
|
block = recursive_replace(original_block, "transactions_generator", SerializedProgram())
|
|
assert (await b.receive_block(block))[1] == Err.NOT_BLOCK_BUT_HAS_DATA
|
|
h = std_hash(b"")
|
|
i = uint64(1)
|
|
block = recursive_replace(
|
|
original_block,
|
|
"transactions_info",
|
|
TransactionsInfo(h, h, G2Element(), uint64(1), uint64(1), []),
|
|
)
|
|
assert (await b.receive_block(block))[1] == Err.NOT_BLOCK_BUT_HAS_DATA
|
|
|
|
block = recursive_replace(original_block, "transactions_generator_ref_list", [i])
|
|
assert (await b.receive_block(block))[1] == Err.NOT_BLOCK_BUT_HAS_DATA
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tx_block_missing_data(self, empty_blockchain):
|
|
# 2
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(2, guarantee_transaction_block=True)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block = recursive_replace(
|
|
blocks[-1],
|
|
"foliage_transaction_block",
|
|
None,
|
|
)
|
|
err = (await b.receive_block(block))[1]
|
|
assert err == Err.IS_TRANSACTION_BLOCK_BUT_NO_DATA or err == Err.INVALID_FOLIAGE_BLOCK_PRESENCE
|
|
|
|
block = recursive_replace(
|
|
blocks[-1],
|
|
"transactions_info",
|
|
None,
|
|
)
|
|
try:
|
|
err = (await b.receive_block(block))[1]
|
|
except AssertionError:
|
|
return None
|
|
assert err == Err.IS_TRANSACTION_BLOCK_BUT_NO_DATA or err == Err.INVALID_FOLIAGE_BLOCK_PRESENCE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_transactions_info_hash(self, empty_blockchain):
|
|
# 3
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(2, guarantee_transaction_block=True)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
h = std_hash(b"")
|
|
block = recursive_replace(
|
|
blocks[-1],
|
|
"foliage_transaction_block.transactions_info_hash",
|
|
h,
|
|
)
|
|
block = recursive_replace(
|
|
block, "foliage.foliage_transaction_block_hash", std_hash(block.foliage_transaction_block)
|
|
)
|
|
new_m = block.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block = recursive_replace(block, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block))[1]
|
|
assert err == Err.INVALID_TRANSACTIONS_INFO_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_transactions_block_hash(self, empty_blockchain):
|
|
# 4
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(2, guarantee_transaction_block=True)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
h = std_hash(b"")
|
|
block = recursive_replace(blocks[-1], "foliage.foliage_transaction_block_hash", h)
|
|
new_m = block.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, blocks[-1].reward_chain_block.proof_of_space.plot_public_key)
|
|
block = recursive_replace(block, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block))[1]
|
|
assert err == Err.INVALID_FOLIAGE_BLOCK_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_reward_claims(self, empty_blockchain):
|
|
# 5
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(2, guarantee_transaction_block=True)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
block: FullBlock = blocks[-1]
|
|
|
|
# Too few
|
|
too_few_reward_claims = block.transactions_info.reward_claims_incorporated[:-1]
|
|
block_2: FullBlock = recursive_replace(
|
|
block, "transactions_info.reward_claims_incorporated", too_few_reward_claims
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_REWARD_COINS
|
|
|
|
# Too many
|
|
h = std_hash(b"")
|
|
too_many_reward_claims = block.transactions_info.reward_claims_incorporated + [
|
|
Coin(h, h, too_few_reward_claims[0].amount)
|
|
]
|
|
block_2 = recursive_replace(block, "transactions_info.reward_claims_incorporated", too_many_reward_claims)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_REWARD_COINS
|
|
|
|
# Duplicates
|
|
duplicate_reward_claims = block.transactions_info.reward_claims_incorporated + [
|
|
block.transactions_info.reward_claims_incorporated[-1]
|
|
]
|
|
block_2 = recursive_replace(block, "transactions_info.reward_claims_incorporated", duplicate_reward_claims)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_REWARD_COINS
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initial_freeze(self, empty_blockchain):
|
|
# 6
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
genesis_timestamp=time.time() - 1000,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[2].get_included_reward_coins())[0]
|
|
)
|
|
blocks = bt.get_consecutive_blocks(
|
|
1,
|
|
block_list_input=blocks,
|
|
guarantee_transaction_block=True,
|
|
transaction_data=tx,
|
|
)
|
|
err = (await b.receive_block(blocks[-1]))[1]
|
|
assert err == Err.INITIAL_TRANSACTION_FREEZE
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_transactions_generator_hash(self, empty_blockchain):
|
|
# 7
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(2, guarantee_transaction_block=True)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
# No tx should have all zeroes
|
|
block: FullBlock = blocks[-1]
|
|
block_2 = recursive_replace(block, "transactions_info.generator_root", bytes([1] * 32))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_TRANSACTIONS_GENERATOR_HASH
|
|
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
blocks = bt.get_consecutive_blocks(
|
|
2,
|
|
block_list_input=blocks,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[3]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
|
|
# Non empty generator hash must be correct
|
|
block = blocks[-1]
|
|
block_2 = recursive_replace(block, "transactions_info.generator_root", bytes([0] * 32))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_TRANSACTIONS_GENERATOR_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_transactions_ref_list(self, empty_blockchain):
|
|
# No generator should have [1]s for the root
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
block: FullBlock = blocks[-1]
|
|
block_2 = recursive_replace(block, "transactions_info.generator_refs_root", bytes([0] * 32))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_TRANSACTIONS_GENERATOR_REFS_ROOT
|
|
|
|
# No generator should have no refs list
|
|
block_2 = recursive_replace(block, "transactions_generator_ref_list", [uint32(0)])
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_TRANSACTIONS_GENERATOR_REFS_ROOT
|
|
|
|
# Hash should be correct when there is a ref list
|
|
assert (await b.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
blocks = bt.get_consecutive_blocks(5, block_list_input=blocks, guarantee_transaction_block=False)
|
|
for block in blocks[-5:]:
|
|
assert (await b.receive_block(block))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
assert (await b.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
generator_arg = detect_potential_template_generator(blocks[-1].height, blocks[-1].transactions_generator)
|
|
assert generator_arg is not None
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1,
|
|
block_list_input=blocks,
|
|
guarantee_transaction_block=True,
|
|
transaction_data=tx,
|
|
previous_generator=generator_arg,
|
|
)
|
|
block = blocks[-1]
|
|
assert len(block.transactions_generator_ref_list) > 0
|
|
|
|
block_2 = recursive_replace(block, "transactions_info.generator_refs_root", bytes([1] * 32))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_TRANSACTIONS_GENERATOR_REFS_ROOT
|
|
|
|
# Too many heights
|
|
block_2 = recursive_replace(block, "transactions_generator_ref_list", [block.height - 2, block.height - 1])
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.GENERATOR_REF_HAS_NO_GENERATOR
|
|
assert (await b.pre_validate_blocks_multiprocessing([block_2], {})) is None
|
|
|
|
# Not tx block
|
|
for h in range(0, block.height - 1):
|
|
block_2 = recursive_replace(block, "transactions_generator_ref_list", [h])
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.GENERATOR_REF_HAS_NO_GENERATOR or err == Err.INVALID_TRANSACTIONS_GENERATOR_REFS_ROOT
|
|
assert (await b.pre_validate_blocks_multiprocessing([block_2], {})) is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cost_exceeds_max(self, empty_blockchain):
|
|
# 7
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
condition_dict = {ConditionOpcode.CREATE_COIN: []}
|
|
for i in range(7000):
|
|
output = ConditionWithArgs(ConditionOpcode.CREATE_COIN, [bt.pool_ph, int_to_bytes(i)])
|
|
condition_dict[ConditionOpcode.CREATE_COIN].append(output)
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0], condition_dic=condition_dict
|
|
)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
assert (await b.receive_block(blocks[-1]))[1] == Err.BLOCK_COST_EXCEEDS_MAX
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clvm_must_not_fail(self, empty_blockchain):
|
|
# 8
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_cost_in_block(self, empty_blockchain):
|
|
# 9
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
block: FullBlock = blocks[-1]
|
|
|
|
# zero
|
|
block_2: FullBlock = recursive_replace(block, "transactions_info.cost", uint64(0))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_BLOCK_COST
|
|
|
|
# too low
|
|
block_2: FullBlock = recursive_replace(block, "transactions_info.cost", uint64(1))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.GENERATOR_RUNTIME_ERROR
|
|
|
|
# too high
|
|
block_2: FullBlock = recursive_replace(block, "transactions_info.cost", uint64(1000000))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_BLOCK_COST
|
|
|
|
err = (await b.receive_block(block))[1]
|
|
assert err is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_max_coin_amount(self):
|
|
# 10
|
|
# TODO: fix, this is not reaching validation. Because we can't create a block with such amounts due to uint64
|
|
# limit in Coin
|
|
pass
|
|
#
|
|
# new_test_constants = test_constants.replace(
|
|
# **{"GENESIS_PRE_FARM_POOL_PUZZLE_HASH": bt.pool_ph, "GENESIS_PRE_FARM_FARMER_PUZZLE_HASH": bt.pool_ph}
|
|
# )
|
|
# b, connection, db_path = await create_blockchain(new_test_constants)
|
|
# bt_2 = BlockTools(new_test_constants)
|
|
# bt_2.constants = bt_2.constants.replace(
|
|
# **{"GENESIS_PRE_FARM_POOL_PUZZLE_HASH": bt.pool_ph, "GENESIS_PRE_FARM_FARMER_PUZZLE_HASH": bt.pool_ph}
|
|
# )
|
|
# blocks = bt_2.get_consecutive_blocks(
|
|
# 3,
|
|
# guarantee_transaction_block=True,
|
|
# farmer_reward_puzzle_hash=bt.pool_ph,
|
|
# pool_reward_puzzle_hash=bt.pool_ph,
|
|
# )
|
|
# assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
# assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
# assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
#
|
|
# wt: WalletTool = bt_2.get_pool_wallet_tool()
|
|
#
|
|
# condition_dict = {ConditionOpcode.CREATE_COIN: []}
|
|
# output = ConditionWithArgs(ConditionOpcode.CREATE_COIN, [bt_2.pool_ph, int_to_bytes(2 ** 64)])
|
|
# condition_dict[ConditionOpcode.CREATE_COIN].append(output)
|
|
#
|
|
# tx: SpendBundle = wt.generate_signed_transaction_multiple_coins(
|
|
# 10,
|
|
# wt.get_new_puzzlehash(),
|
|
# list(blocks[1].get_included_reward_coins()),
|
|
# condition_dic=condition_dict,
|
|
# )
|
|
# try:
|
|
# blocks = bt_2.get_consecutive_blocks(
|
|
# 1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
# )
|
|
# assert False
|
|
# except Exception as e:
|
|
# pass
|
|
# await connection.close()
|
|
# b.shut_down()
|
|
# db_path.unlink()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_merkle_roots(self, empty_blockchain):
|
|
# 11
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
block: FullBlock = blocks[-1]
|
|
|
|
merkle_set = MerkleSet()
|
|
# additions
|
|
block_2 = recursive_replace(block, "foliage_transaction_block.additions_root", merkle_set.get_root())
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.BAD_ADDITION_ROOT
|
|
|
|
# removals
|
|
merkle_set.add_already_hashed(std_hash(b"1"))
|
|
block_2 = recursive_replace(block, "foliage_transaction_block.removals_root", merkle_set.get_root())
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.BAD_REMOVAL_ROOT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_filter(self, empty_blockchain):
|
|
# 12
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
block: FullBlock = blocks[-1]
|
|
block_2 = recursive_replace(block, "foliage_transaction_block.filter_hash", std_hash(b"3"))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_TRANSACTIONS_FILTER_HASH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_duplicate_outputs(self, empty_blockchain):
|
|
# 13
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
condition_dict = {ConditionOpcode.CREATE_COIN: []}
|
|
for i in range(2):
|
|
output = ConditionWithArgs(ConditionOpcode.CREATE_COIN, [bt.pool_ph, int_to_bytes(1)])
|
|
condition_dict[ConditionOpcode.CREATE_COIN].append(output)
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0], condition_dic=condition_dict
|
|
)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
assert (await b.receive_block(blocks[-1]))[1] == Err.DUPLICATE_OUTPUT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_duplicate_removals(self, empty_blockchain):
|
|
# 14
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
tx_2: SpendBundle = wt.generate_signed_transaction(
|
|
11, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
agg = SpendBundle.aggregate([tx, tx_2])
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=agg
|
|
)
|
|
assert (await b.receive_block(blocks[-1]))[1] == Err.DOUBLE_SPEND
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_double_spent_in_coin_store(self, empty_blockchain):
|
|
# 15
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
assert (await b.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
tx_2: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-2].get_included_reward_coins())[0]
|
|
)
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx_2
|
|
)
|
|
|
|
assert (await b.receive_block(blocks[-1]))[1] == Err.DOUBLE_SPEND
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_double_spent_in_reorg(self, empty_blockchain):
|
|
# 15
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
assert (await b.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
new_coin: Coin = tx.additions()[0]
|
|
tx_2: SpendBundle = wt.generate_signed_transaction(10, wt.get_new_puzzlehash(), new_coin)
|
|
# This is fine because coin exists
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx_2
|
|
)
|
|
assert (await b.receive_block(blocks[-1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
blocks = bt.get_consecutive_blocks(5, block_list_input=blocks, guarantee_transaction_block=True)
|
|
for block in blocks[-5:]:
|
|
assert (await b.receive_block(block))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
blocks_reorg = bt.get_consecutive_blocks(2, block_list_input=blocks[:-7], guarantee_transaction_block=True)
|
|
assert (await b.receive_block(blocks_reorg[-2]))[0] == ReceiveBlockResult.ADDED_AS_ORPHAN
|
|
assert (await b.receive_block(blocks_reorg[-1]))[0] == ReceiveBlockResult.ADDED_AS_ORPHAN
|
|
|
|
# Coin does not exist in reorg
|
|
blocks_reorg = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks_reorg, guarantee_transaction_block=True, transaction_data=tx_2
|
|
)
|
|
|
|
assert (await b.receive_block(blocks_reorg[-1]))[1] == Err.UNKNOWN_UNSPENT
|
|
|
|
# Finally add the block to the fork (spending both in same bundle, this is ephemeral)
|
|
agg = SpendBundle.aggregate([tx, tx_2])
|
|
blocks_reorg = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks_reorg[:-1], guarantee_transaction_block=True, transaction_data=agg
|
|
)
|
|
assert (await b.receive_block(blocks_reorg[-1]))[1] is None
|
|
|
|
blocks_reorg = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks_reorg, guarantee_transaction_block=True, transaction_data=tx_2
|
|
)
|
|
assert (await b.receive_block(blocks_reorg[-1]))[1] == Err.DOUBLE_SPEND_IN_FORK
|
|
|
|
rewards_ph = wt.get_new_puzzlehash()
|
|
blocks_reorg = bt.get_consecutive_blocks(
|
|
10,
|
|
block_list_input=blocks_reorg[:-1],
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=rewards_ph,
|
|
)
|
|
for block in blocks_reorg[-10:]:
|
|
r, e, _ = await b.receive_block(block)
|
|
assert e is None
|
|
|
|
# ephemeral coin is spent
|
|
first_coin = await b.coin_store.get_coin_record(new_coin.name())
|
|
assert first_coin is not None and first_coin.spent
|
|
second_coin = await b.coin_store.get_coin_record(tx_2.additions()[0].name())
|
|
assert second_coin is not None and not second_coin.spent
|
|
|
|
farmer_coin = create_farmer_coin(
|
|
blocks_reorg[-1].height,
|
|
rewards_ph,
|
|
calculate_base_farmer_reward(blocks_reorg[-1].height),
|
|
bt.constants.GENESIS_CHALLENGE,
|
|
)
|
|
tx_3: SpendBundle = wt.generate_signed_transaction(10, wt.get_new_puzzlehash(), farmer_coin)
|
|
|
|
blocks_reorg = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks_reorg, guarantee_transaction_block=True, transaction_data=tx_3
|
|
)
|
|
assert (await b.receive_block(blocks_reorg[-1]))[1] is None
|
|
|
|
farmer_coin = await b.coin_store.get_coin_record(farmer_coin.name())
|
|
assert first_coin is not None and farmer_coin.spent
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_minting_coin(self, empty_blockchain):
|
|
# 16 TODO
|
|
# 17 is tested in mempool tests
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_max_coin_amount_fee(self):
|
|
# 18 TODO: we can't create a block with such amounts due to uint64
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_fees_in_block(self, empty_blockchain):
|
|
# 19
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[-1].get_included_reward_coins())[0]
|
|
)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
1, block_list_input=blocks, guarantee_transaction_block=True, transaction_data=tx
|
|
)
|
|
block: FullBlock = blocks[-1]
|
|
|
|
# wrong feees
|
|
block_2: FullBlock = recursive_replace(block, "transactions_info.fees", uint64(1239))
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage_transaction_block.transactions_info_hash", block_2.transactions_info.get_hash()
|
|
)
|
|
block_2 = recursive_replace(
|
|
block_2, "foliage.foliage_transaction_block_hash", block_2.foliage_transaction_block.get_hash()
|
|
)
|
|
new_m = block_2.foliage.foliage_transaction_block_hash
|
|
new_fsb_sig = bt.get_plot_signature(new_m, block.reward_chain_block.proof_of_space.plot_public_key)
|
|
block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fsb_sig)
|
|
|
|
err = (await b.receive_block(block_2))[1]
|
|
assert err == Err.INVALID_BLOCK_FEE_AMOUNT
|
|
|
|
|
|
class TestReorgs:
|
|
@pytest.mark.asyncio
|
|
async def test_basic_reorg(self, empty_blockchain):
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(15)
|
|
|
|
for block in blocks:
|
|
assert (await b.receive_block(block))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert b.get_peak().height == 14
|
|
|
|
blocks_reorg_chain = bt.get_consecutive_blocks(7, blocks[:10], seed=b"2")
|
|
for reorg_block in blocks_reorg_chain:
|
|
result, error_code, fork_height = await b.receive_block(reorg_block)
|
|
if reorg_block.height < 10:
|
|
assert result == ReceiveBlockResult.ALREADY_HAVE_BLOCK
|
|
elif reorg_block.height < 14:
|
|
assert result == ReceiveBlockResult.ADDED_AS_ORPHAN
|
|
elif reorg_block.height >= 15:
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert error_code is None
|
|
assert b.get_peak().height == 16
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_long_reorg(self, empty_blockchain, default_10000_blocks):
|
|
# Reorg longer than a difficulty adjustment
|
|
# Also tests higher weight chain but lower height
|
|
b = empty_blockchain
|
|
num_blocks_chain_1 = 3 * test_constants.EPOCH_BLOCKS + test_constants.MAX_SUB_SLOT_BLOCKS + 10
|
|
num_blocks_chain_2_start = test_constants.EPOCH_BLOCKS - 20
|
|
num_blocks_chain_2 = 3 * test_constants.EPOCH_BLOCKS + test_constants.MAX_SUB_SLOT_BLOCKS + 8
|
|
|
|
assert num_blocks_chain_1 < 10000
|
|
blocks = default_10000_blocks[:num_blocks_chain_1]
|
|
|
|
for block in blocks:
|
|
assert (await b.receive_block(block))[0] == ReceiveBlockResult.NEW_PEAK
|
|
chain_1_height = b.get_peak().height
|
|
chain_1_weight = b.get_peak().weight
|
|
assert chain_1_height == (num_blocks_chain_1 - 1)
|
|
|
|
# These blocks will have less time between them (timestamp) and therefore will make difficulty go up
|
|
# This means that the weight will grow faster, and we can get a heavier chain with lower height
|
|
blocks_reorg_chain = bt.get_consecutive_blocks(
|
|
num_blocks_chain_2 - num_blocks_chain_2_start,
|
|
blocks[:num_blocks_chain_2_start],
|
|
seed=b"2",
|
|
time_per_block=8,
|
|
)
|
|
found_orphan = False
|
|
for reorg_block in blocks_reorg_chain:
|
|
result, error_code, fork_height = await b.receive_block(reorg_block)
|
|
if reorg_block.height < num_blocks_chain_2_start:
|
|
assert result == ReceiveBlockResult.ALREADY_HAVE_BLOCK
|
|
if reorg_block.weight <= chain_1_weight:
|
|
if result == ReceiveBlockResult.ADDED_AS_ORPHAN:
|
|
found_orphan = True
|
|
assert error_code is None
|
|
assert result == ReceiveBlockResult.ADDED_AS_ORPHAN or result == ReceiveBlockResult.ALREADY_HAVE_BLOCK
|
|
elif reorg_block.weight > chain_1_weight:
|
|
assert reorg_block.height < chain_1_height
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert error_code is None
|
|
assert found_orphan
|
|
|
|
assert b.get_peak().weight > chain_1_weight
|
|
assert b.get_peak().height < chain_1_height
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_long_compact_blockchain(self, empty_blockchain, default_10000_blocks_compact):
|
|
b = empty_blockchain
|
|
for block in default_10000_blocks_compact:
|
|
assert (await b.receive_block(block))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert b.get_peak().height == len(default_10000_blocks_compact) - 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reorg_from_genesis(self, empty_blockchain):
|
|
b = empty_blockchain
|
|
WALLET_A = WalletTool(b.constants)
|
|
WALLET_A_PUZZLE_HASHES = [WALLET_A.get_new_puzzlehash() for _ in range(5)]
|
|
|
|
blocks = bt.get_consecutive_blocks(15)
|
|
|
|
for block in blocks:
|
|
assert (await b.receive_block(block))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert b.get_peak().height == 14
|
|
|
|
# Reorg to alternate chain that is 1 height longer
|
|
found_orphan = False
|
|
blocks_reorg_chain = bt.get_consecutive_blocks(16, [], seed=b"2")
|
|
for reorg_block in blocks_reorg_chain:
|
|
result, error_code, fork_height = await b.receive_block(reorg_block)
|
|
if reorg_block.height < 14:
|
|
if result == ReceiveBlockResult.ADDED_AS_ORPHAN:
|
|
found_orphan = True
|
|
assert result == ReceiveBlockResult.ADDED_AS_ORPHAN or result == ReceiveBlockResult.ALREADY_HAVE_BLOCK
|
|
elif reorg_block.height >= 15:
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert error_code is None
|
|
|
|
# Back to original chain
|
|
blocks_reorg_chain_2 = bt.get_consecutive_blocks(3, blocks, seed=b"3")
|
|
|
|
result, error_code, fork_height = await b.receive_block(blocks_reorg_chain_2[-3])
|
|
assert result == ReceiveBlockResult.ADDED_AS_ORPHAN
|
|
|
|
result, error_code, fork_height = await b.receive_block(blocks_reorg_chain_2[-2])
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
result, error_code, fork_height = await b.receive_block(blocks_reorg_chain_2[-1])
|
|
assert result == ReceiveBlockResult.NEW_PEAK
|
|
assert found_orphan
|
|
assert b.get_peak().height == 17
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reorg_transaction(self, empty_blockchain):
|
|
b = empty_blockchain
|
|
wallet_a = WalletTool(b.constants)
|
|
WALLET_A_PUZZLE_HASHES = [wallet_a.get_new_puzzlehash() for _ in range(5)]
|
|
coinbase_puzzlehash = WALLET_A_PUZZLE_HASHES[0]
|
|
receiver_puzzlehash = WALLET_A_PUZZLE_HASHES[1]
|
|
|
|
blocks = bt.get_consecutive_blocks(10, farmer_reward_puzzle_hash=coinbase_puzzlehash)
|
|
blocks = bt.get_consecutive_blocks(
|
|
2, blocks, farmer_reward_puzzle_hash=coinbase_puzzlehash, guarantee_transaction_block=True
|
|
)
|
|
|
|
spend_block = blocks[10]
|
|
spend_coin = None
|
|
for coin in list(spend_block.get_included_reward_coins()):
|
|
if coin.puzzle_hash == coinbase_puzzlehash:
|
|
spend_coin = coin
|
|
spend_bundle = wallet_a.generate_signed_transaction(1000, receiver_puzzlehash, spend_coin)
|
|
|
|
blocks = bt.get_consecutive_blocks(
|
|
2,
|
|
blocks,
|
|
farmer_reward_puzzle_hash=coinbase_puzzlehash,
|
|
transaction_data=spend_bundle,
|
|
guarantee_transaction_block=True,
|
|
)
|
|
|
|
blocks_fork = bt.get_consecutive_blocks(
|
|
1, blocks[:12], farmer_reward_puzzle_hash=coinbase_puzzlehash, seed=b"123", guarantee_transaction_block=True
|
|
)
|
|
blocks_fork = bt.get_consecutive_blocks(
|
|
2,
|
|
blocks_fork,
|
|
farmer_reward_puzzle_hash=coinbase_puzzlehash,
|
|
transaction_data=spend_bundle,
|
|
guarantee_transaction_block=True,
|
|
seed=b"1245",
|
|
)
|
|
for block in blocks:
|
|
result, error_code, _ = await b.receive_block(block)
|
|
assert error_code is None and result == ReceiveBlockResult.NEW_PEAK
|
|
|
|
for block in blocks_fork:
|
|
result, error_code, _ = await b.receive_block(block)
|
|
assert error_code is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_header_blocks_in_range_tx_filter(self, empty_blockchain):
|
|
b = empty_blockchain
|
|
blocks = bt.get_consecutive_blocks(
|
|
3,
|
|
guarantee_transaction_block=True,
|
|
pool_reward_puzzle_hash=bt.pool_ph,
|
|
farmer_reward_puzzle_hash=bt.pool_ph,
|
|
)
|
|
assert (await b.receive_block(blocks[0]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[1]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
assert (await b.receive_block(blocks[2]))[0] == ReceiveBlockResult.NEW_PEAK
|
|
wt: WalletTool = bt.get_pool_wallet_tool()
|
|
tx: SpendBundle = wt.generate_signed_transaction(
|
|
10, wt.get_new_puzzlehash(), list(blocks[2].get_included_reward_coins())[0]
|
|
)
|
|
blocks = bt.get_consecutive_blocks(
|
|
1,
|
|
block_list_input=blocks,
|
|
guarantee_transaction_block=True,
|
|
transaction_data=tx,
|
|
)
|
|
err = (await b.receive_block(blocks[-1]))[1]
|
|
assert not err
|
|
|
|
blocks_with_filter = await b.get_header_blocks_in_range(0, 10, tx_filter=True)
|
|
blocks_without_filter = await b.get_header_blocks_in_range(0, 10, tx_filter=False)
|
|
header_hash = blocks[-1].header_hash
|
|
assert (
|
|
blocks_with_filter[header_hash].transactions_filter
|
|
!= blocks_without_filter[header_hash].transactions_filter
|
|
)
|
|
assert blocks_with_filter[header_hash].header_hash == blocks_without_filter[header_hash].header_hash
|