chia-blockchain/tests/generator/test_compression.py

295 lines
13 KiB
Python

# flake8: noqa: F501
from dataclasses import dataclass
from typing import List, Any
from unittest import TestCase
from chia.full_node.bundle_tools import (
bundle_suitable_for_compression,
compressed_coin_solution_entry_list,
compressed_spend_bundle_solution,
match_standard_transaction_at_any_index,
simple_solution_generator,
spend_bundle_to_serialized_coin_solution_entry_list,
)
from chia.full_node.generator import run_generator, create_generator_args
from chia.types.blockchain_format.program import Program, SerializedProgram, INFINITE_COST
from chia.types.generator_types import BlockGenerator, CompressorArg, GeneratorArg
from chia.types.spend_bundle import SpendBundle
from chia.util.byte_types import hexstr_to_bytes
from chia.util.ints import uint32
from chia.wallet.puzzles.load_clvm import load_clvm
from tests.core.make_block_generator import make_spend_bundle
from clvm import SExp
import io
from clvm.serialize import sexp_from_stream
from clvm_tools import binutils
TEST_GEN_DESERIALIZE = load_clvm("test_generator_deserialize.clvm", package_or_requirement="chia.wallet.puzzles")
DESERIALIZE_MOD = load_clvm("chialisp_deserialisation.clvm", package_or_requirement="chia.wallet.puzzles")
DECOMPRESS_PUZZLE = load_clvm("decompress_puzzle.clvm", package_or_requirement="chia.wallet.puzzles")
DECOMPRESS_CSE = load_clvm("decompress_coin_solution_entry.clvm", package_or_requirement="chia.wallet.puzzles")
DECOMPRESS_CSE_WITH_PREFIX = load_clvm(
"decompress_coin_solution_entry_with_prefix.clvm", package_or_requirement="chia.wallet.puzzles"
)
DECOMPRESS_BLOCK = load_clvm("block_program_zero.clvm", package_or_requirement="chia.wallet.puzzles")
TEST_MULTIPLE = load_clvm("test_multiple_generator_input_arguments.clvm", package_or_requirement="chia.wallet.puzzles")
Nil = Program.from_bytes(b"\x80")
original_generator = hexstr_to_bytes(
"ff01ffffffa00000000000000000000000000000000000000000000000000000000000000000ff830186a080ffffff02ffff01ff02ffff01ff02ffff03ff0bffff01ff02ffff03ffff09ff05ffff1dff0bffff1effff0bff0bffff02ff06ffff04ff02ffff04ff17ff8080808080808080ffff01ff02ff17ff2f80ffff01ff088080ff0180ffff01ff04ffff04ff04ffff04ff05ffff04ffff02ff06ffff04ff02ffff04ff17ff80808080ff80808080ffff02ff17ff2f808080ff0180ffff04ffff01ff32ff02ffff03ffff07ff0580ffff01ff0bffff0102ffff02ff06ffff04ff02ffff04ff09ff80808080ffff02ff06ffff04ff02ffff04ff0dff8080808080ffff01ff0bffff0101ff058080ff0180ff018080ffff04ffff01b081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3ff018080ffff80ffff01ffff33ffa06b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9ff830186a08080ff8080808080"
) # noqa
gen1 = b"aaaaaaaaaa" + original_generator
gen2 = b"bb" + original_generator
FAKE_BLOCK_HEIGHT1 = uint32(100)
FAKE_BLOCK_HEIGHT2 = uint32(200)
@dataclass(frozen=True)
class MultipleCompressorArg:
arg: List[CompressorArg]
split_offset: int
def create_multiple_ref_generator(args: MultipleCompressorArg, spend_bundle: SpendBundle) -> BlockGenerator:
"""
Decompress a transaction by referencing bytes from multiple input generator references
"""
compressed_cse_list = compressed_coin_solution_entry_list(spend_bundle)
program = TEST_MULTIPLE.curry(
DECOMPRESS_PUZZLE,
DECOMPRESS_CSE_WITH_PREFIX,
args.arg[0].start,
args.arg[0].end - args.split_offset,
args.arg[1].end - args.split_offset,
args.arg[1].end,
compressed_cse_list,
)
# TODO aqk: Improve ergonomics of CompressorArg -> GeneratorArg conversion
generator_args = [
GeneratorArg(FAKE_BLOCK_HEIGHT1, args.arg[0].generator),
GeneratorArg(FAKE_BLOCK_HEIGHT2, args.arg[1].generator),
]
return BlockGenerator(program, generator_args)
def spend_bundle_to_coin_solution_entry_list(bundle: SpendBundle) -> List[Any]:
r = []
for coin_solution in bundle.coin_solutions:
entry = [
coin_solution.coin.parent_coin_info,
sexp_from_stream(io.BytesIO(bytes(coin_solution.puzzle_reveal)), SExp.to),
coin_solution.coin.amount,
sexp_from_stream(io.BytesIO(bytes(coin_solution.solution)), SExp.to),
]
r.append(entry)
return r
class TestCompression(TestCase):
def test_spend_bundle_suitable(self):
sb: SpendBundle = make_spend_bundle(1)
assert bundle_suitable_for_compression(sb)
def test_compress_spend_bundle(self):
pass
def test_multiple_input_gen_refs(self):
start1, end1 = match_standard_transaction_at_any_index(gen1)
start2, end2 = match_standard_transaction_at_any_index(gen2)
ca1 = CompressorArg(FAKE_BLOCK_HEIGHT1, SerializedProgram.from_bytes(gen1), start1, end1)
ca2 = CompressorArg(FAKE_BLOCK_HEIGHT2, SerializedProgram.from_bytes(gen2), start2, end2)
prefix_len1 = end1 - start1
prefix_len2 = end2 - start2
assert prefix_len1 == prefix_len2
prefix_len = prefix_len1
results = []
for split_offset in range(prefix_len):
gen_args = MultipleCompressorArg([ca1, ca2], split_offset)
spend_bundle: SpendBundle = make_spend_bundle(1)
multi_gen = create_multiple_ref_generator(gen_args, spend_bundle)
cost, result = run_generator(multi_gen, INFINITE_COST)
results.append(result)
assert result is not None
assert cost > 0
assert all(r == results[0] for r in results)
def test_compressed_block_results(self):
sb: SpendBundle = make_spend_bundle(1)
start, end = match_standard_transaction_at_any_index(original_generator)
ca = CompressorArg(uint32(0), SerializedProgram.from_bytes(original_generator), start, end)
c = compressed_spend_bundle_solution(ca, sb)
s = simple_solution_generator(sb)
assert c != s
cost_c, result_c = run_generator(c, INFINITE_COST)
cost_s, result_s = run_generator(s, INFINITE_COST)
print(result_c)
assert result_c is not None
assert result_s is not None
assert result_c == result_s
def test_spend_byndle_coin_solution(self):
for i in range(0, 10):
sb: SpendBundle = make_spend_bundle(i)
cs1 = SExp.to(spend_bundle_to_coin_solution_entry_list(sb)).as_bin()
cs2 = spend_bundle_to_serialized_coin_solution_entry_list(sb)
assert cs1 == cs2
class TestDecompression(TestCase):
def __init__(self, *args, **kwargs):
super(TestDecompression, self).__init__(*args, **kwargs)
self.maxDiff = None
def test_deserialization(self):
self.maxDiff = None
cost, out = DESERIALIZE_MOD.run_with_cost(INFINITE_COST, [bytes(Program.to("hello"))])
assert out == Program.to("hello")
def test_deserialization_as_argument(self):
self.maxDiff = None
cost, out = TEST_GEN_DESERIALIZE.run_with_cost(
INFINITE_COST, [DESERIALIZE_MOD, Nil, bytes(Program.to("hello"))]
)
print(bytes(Program.to("hello")))
print()
print(out)
assert out == Program.to("hello")
def test_decompress_puzzle(self):
cost, out = DECOMPRESS_PUZZLE.run_with_cost(
INFINITE_COST, [DESERIALIZE_MOD, b"\xff", bytes(Program.to("pubkey")), b"\x80"]
)
print()
print(out)
# An empty CSE is invalid. (An empty CSE list may be okay)
# def test_decompress_empty_cse(self):
# cse0 = binutils.assemble("()")
# cost, out = DECOMPRESS_CSE.run_with_cost(INFINITE_COST, [DESERIALIZE_MOD, DECOMPRESS_PUZZLE, b"\xff", b"\x80", cse0])
# print()
# print(out)
def test_decompress_cse(self):
"""Decompress a single CSE / CoinSolutionEntry"""
cse0 = binutils.assemble(
"((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ())))"
) # noqa
cost, out = DECOMPRESS_CSE.run_with_cost(
INFINITE_COST, [DESERIALIZE_MOD, DECOMPRESS_PUZZLE, b"\xff", b"\x80", cse0]
)
print()
print(out)
def test_decompress_cse_with_prefix(self):
cse0 = binutils.assemble(
"((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ())))"
) # noqa
start = 2 + 44
end = start + 238
prefix = original_generator[start:end]
# (deserialize decompress_puzzle puzzle_prefix cse)
cost, out = DECOMPRESS_CSE_WITH_PREFIX.run_with_cost(
INFINITE_COST, [DESERIALIZE_MOD, DECOMPRESS_PUZZLE, prefix, cse0]
)
print()
print(out)
def test_block_program_zero(self):
"Decompress a list of CSEs"
self.maxDiff = None
cse1 = binutils.assemble(
"(((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))))"
) # noqa
cse2 = binutils.assemble(
"""
(
((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0)
(0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3
(() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))
)
((0x0000000000000000000000000000000000000000000000000000000000000001 0x0186a0)
(0xb0a6207f5173ec41491d9f2c1b8fff5579e13703077e0eaca8fe587669dcccf51e9209a6b65576845ece5f7c2f3229e7e3
(() (q (51 0x24254a3efc3ebfac9979bbe0d615e2eda043aa329905f65b63846fa24149e2b6 0x0186a0)) ())))
)
"""
) # noqa
start = 2 + 44
end = start + 238
# (mod (decompress_puzzle decompress_coin_solution_entry start end compressed_cses deserialize generator_list reserved_arg)
# cost, out = DECOMPRESS_BLOCK.run_with_cost(INFINITE_COST, [DECOMPRESS_PUZZLE, DECOMPRESS_CSE, start, Program.to(end), cse0, DESERIALIZE_MOD, bytes(original_generator)])
cost, out = DECOMPRESS_BLOCK.run_with_cost(
INFINITE_COST,
[
DECOMPRESS_PUZZLE,
DECOMPRESS_CSE_WITH_PREFIX,
start,
Program.to(end),
cse2,
DESERIALIZE_MOD,
[bytes(original_generator)],
],
)
print()
print(out)
def test_block_program_zero_with_curry(self):
self.maxDiff = None
cse1 = binutils.assemble(
"(((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))))"
) # noqa
cse2 = binutils.assemble(
"""
(
((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0)
(0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3
(() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))
)
((0x0000000000000000000000000000000000000000000000000000000000000001 0x0186a0)
(0xb0a6207f5173ec41491d9f2c1b8fff5579e13703077e0eaca8fe587669dcccf51e9209a6b65576845ece5f7c2f3229e7e3
(() (q (51 0x24254a3efc3ebfac9979bbe0d615e2eda043aa329905f65b63846fa24149e2b6 0x0186a0)) ())))
)
"""
) # noqa
start = 2 + 44
end = start + 238
# (mod (decompress_puzzle decompress_coin_solution_entry start end compressed_cses deserialize generator_list reserved_arg)
# cost, out = DECOMPRESS_BLOCK.run_with_cost(INFINITE_COST, [DECOMPRESS_PUZZLE, DECOMPRESS_CSE, start, Program.to(end), cse0, DESERIALIZE_MOD, bytes(original_generator)])
p = DECOMPRESS_BLOCK.curry(DECOMPRESS_PUZZLE, DECOMPRESS_CSE_WITH_PREFIX, start, Program.to(end))
cost, out = p.run_with_cost(INFINITE_COST, [cse2, DESERIALIZE_MOD, [bytes(original_generator)]])
print()
print(p)
print(out)
p_with_cses = DECOMPRESS_BLOCK.curry(
DECOMPRESS_PUZZLE, DECOMPRESS_CSE_WITH_PREFIX, start, Program.to(end), cse2, DESERIALIZE_MOD
)
generator_args = create_generator_args([SerializedProgram.from_bytes(original_generator)])
cost, out = p_with_cses.run_with_cost(INFINITE_COST, generator_args)
print()
print(p_with_cses)
print(out)