295 lines
13 KiB
Python
295 lines
13 KiB
Python
# flake8: noqa: F501
|
|
from dataclasses import dataclass
|
|
from typing import List, Any
|
|
from unittest import TestCase
|
|
|
|
from chia.full_node.bundle_tools import (
|
|
bundle_suitable_for_compression,
|
|
compressed_coin_solution_entry_list,
|
|
compressed_spend_bundle_solution,
|
|
match_standard_transaction_at_any_index,
|
|
simple_solution_generator,
|
|
spend_bundle_to_serialized_coin_solution_entry_list,
|
|
)
|
|
from chia.full_node.generator import run_generator, create_generator_args
|
|
from chia.types.blockchain_format.program import Program, SerializedProgram, INFINITE_COST
|
|
from chia.types.generator_types import BlockGenerator, CompressorArg, GeneratorArg
|
|
from chia.types.spend_bundle import SpendBundle
|
|
from chia.util.byte_types import hexstr_to_bytes
|
|
from chia.util.ints import uint32
|
|
from chia.wallet.puzzles.load_clvm import load_clvm
|
|
|
|
from tests.core.make_block_generator import make_spend_bundle
|
|
|
|
from clvm import SExp
|
|
import io
|
|
from clvm.serialize import sexp_from_stream
|
|
|
|
from clvm_tools import binutils
|
|
|
|
TEST_GEN_DESERIALIZE = load_clvm("test_generator_deserialize.clvm", package_or_requirement="chia.wallet.puzzles")
|
|
DESERIALIZE_MOD = load_clvm("chialisp_deserialisation.clvm", package_or_requirement="chia.wallet.puzzles")
|
|
|
|
DECOMPRESS_PUZZLE = load_clvm("decompress_puzzle.clvm", package_or_requirement="chia.wallet.puzzles")
|
|
DECOMPRESS_CSE = load_clvm("decompress_coin_solution_entry.clvm", package_or_requirement="chia.wallet.puzzles")
|
|
|
|
DECOMPRESS_CSE_WITH_PREFIX = load_clvm(
|
|
"decompress_coin_solution_entry_with_prefix.clvm", package_or_requirement="chia.wallet.puzzles"
|
|
)
|
|
DECOMPRESS_BLOCK = load_clvm("block_program_zero.clvm", package_or_requirement="chia.wallet.puzzles")
|
|
TEST_MULTIPLE = load_clvm("test_multiple_generator_input_arguments.clvm", package_or_requirement="chia.wallet.puzzles")
|
|
|
|
Nil = Program.from_bytes(b"\x80")
|
|
|
|
original_generator = hexstr_to_bytes(
|
|
"ff01ffffffa00000000000000000000000000000000000000000000000000000000000000000ff830186a080ffffff02ffff01ff02ffff01ff02ffff03ff0bffff01ff02ffff03ffff09ff05ffff1dff0bffff1effff0bff0bffff02ff06ffff04ff02ffff04ff17ff8080808080808080ffff01ff02ff17ff2f80ffff01ff088080ff0180ffff01ff04ffff04ff04ffff04ff05ffff04ffff02ff06ffff04ff02ffff04ff17ff80808080ff80808080ffff02ff17ff2f808080ff0180ffff04ffff01ff32ff02ffff03ffff07ff0580ffff01ff0bffff0102ffff02ff06ffff04ff02ffff04ff09ff80808080ffff02ff06ffff04ff02ffff04ff0dff8080808080ffff01ff0bffff0101ff058080ff0180ff018080ffff04ffff01b081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3ff018080ffff80ffff01ffff33ffa06b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9ff830186a08080ff8080808080"
|
|
) # noqa
|
|
|
|
gen1 = b"aaaaaaaaaa" + original_generator
|
|
gen2 = b"bb" + original_generator
|
|
FAKE_BLOCK_HEIGHT1 = uint32(100)
|
|
FAKE_BLOCK_HEIGHT2 = uint32(200)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MultipleCompressorArg:
|
|
arg: List[CompressorArg]
|
|
split_offset: int
|
|
|
|
|
|
def create_multiple_ref_generator(args: MultipleCompressorArg, spend_bundle: SpendBundle) -> BlockGenerator:
|
|
"""
|
|
Decompress a transaction by referencing bytes from multiple input generator references
|
|
"""
|
|
compressed_cse_list = compressed_coin_solution_entry_list(spend_bundle)
|
|
program = TEST_MULTIPLE.curry(
|
|
DECOMPRESS_PUZZLE,
|
|
DECOMPRESS_CSE_WITH_PREFIX,
|
|
args.arg[0].start,
|
|
args.arg[0].end - args.split_offset,
|
|
args.arg[1].end - args.split_offset,
|
|
args.arg[1].end,
|
|
compressed_cse_list,
|
|
)
|
|
|
|
# TODO aqk: Improve ergonomics of CompressorArg -> GeneratorArg conversion
|
|
generator_args = [
|
|
GeneratorArg(FAKE_BLOCK_HEIGHT1, args.arg[0].generator),
|
|
GeneratorArg(FAKE_BLOCK_HEIGHT2, args.arg[1].generator),
|
|
]
|
|
return BlockGenerator(program, generator_args)
|
|
|
|
|
|
def spend_bundle_to_coin_solution_entry_list(bundle: SpendBundle) -> List[Any]:
|
|
r = []
|
|
for coin_solution in bundle.coin_solutions:
|
|
entry = [
|
|
coin_solution.coin.parent_coin_info,
|
|
sexp_from_stream(io.BytesIO(bytes(coin_solution.puzzle_reveal)), SExp.to),
|
|
coin_solution.coin.amount,
|
|
sexp_from_stream(io.BytesIO(bytes(coin_solution.solution)), SExp.to),
|
|
]
|
|
r.append(entry)
|
|
return r
|
|
|
|
|
|
class TestCompression(TestCase):
|
|
def test_spend_bundle_suitable(self):
|
|
sb: SpendBundle = make_spend_bundle(1)
|
|
assert bundle_suitable_for_compression(sb)
|
|
|
|
def test_compress_spend_bundle(self):
|
|
pass
|
|
|
|
def test_multiple_input_gen_refs(self):
|
|
start1, end1 = match_standard_transaction_at_any_index(gen1)
|
|
start2, end2 = match_standard_transaction_at_any_index(gen2)
|
|
ca1 = CompressorArg(FAKE_BLOCK_HEIGHT1, SerializedProgram.from_bytes(gen1), start1, end1)
|
|
ca2 = CompressorArg(FAKE_BLOCK_HEIGHT2, SerializedProgram.from_bytes(gen2), start2, end2)
|
|
|
|
prefix_len1 = end1 - start1
|
|
prefix_len2 = end2 - start2
|
|
assert prefix_len1 == prefix_len2
|
|
prefix_len = prefix_len1
|
|
results = []
|
|
for split_offset in range(prefix_len):
|
|
gen_args = MultipleCompressorArg([ca1, ca2], split_offset)
|
|
spend_bundle: SpendBundle = make_spend_bundle(1)
|
|
multi_gen = create_multiple_ref_generator(gen_args, spend_bundle)
|
|
cost, result = run_generator(multi_gen, INFINITE_COST)
|
|
results.append(result)
|
|
assert result is not None
|
|
assert cost > 0
|
|
assert all(r == results[0] for r in results)
|
|
|
|
def test_compressed_block_results(self):
|
|
sb: SpendBundle = make_spend_bundle(1)
|
|
start, end = match_standard_transaction_at_any_index(original_generator)
|
|
ca = CompressorArg(uint32(0), SerializedProgram.from_bytes(original_generator), start, end)
|
|
c = compressed_spend_bundle_solution(ca, sb)
|
|
s = simple_solution_generator(sb)
|
|
assert c != s
|
|
cost_c, result_c = run_generator(c, INFINITE_COST)
|
|
cost_s, result_s = run_generator(s, INFINITE_COST)
|
|
print(result_c)
|
|
assert result_c is not None
|
|
assert result_s is not None
|
|
assert result_c == result_s
|
|
|
|
def test_spend_byndle_coin_solution(self):
|
|
for i in range(0, 10):
|
|
sb: SpendBundle = make_spend_bundle(i)
|
|
cs1 = SExp.to(spend_bundle_to_coin_solution_entry_list(sb)).as_bin()
|
|
cs2 = spend_bundle_to_serialized_coin_solution_entry_list(sb)
|
|
assert cs1 == cs2
|
|
|
|
|
|
class TestDecompression(TestCase):
|
|
def __init__(self, *args, **kwargs):
|
|
super(TestDecompression, self).__init__(*args, **kwargs)
|
|
self.maxDiff = None
|
|
|
|
def test_deserialization(self):
|
|
self.maxDiff = None
|
|
cost, out = DESERIALIZE_MOD.run_with_cost(INFINITE_COST, [bytes(Program.to("hello"))])
|
|
assert out == Program.to("hello")
|
|
|
|
def test_deserialization_as_argument(self):
|
|
self.maxDiff = None
|
|
cost, out = TEST_GEN_DESERIALIZE.run_with_cost(
|
|
INFINITE_COST, [DESERIALIZE_MOD, Nil, bytes(Program.to("hello"))]
|
|
)
|
|
print(bytes(Program.to("hello")))
|
|
print()
|
|
print(out)
|
|
assert out == Program.to("hello")
|
|
|
|
def test_decompress_puzzle(self):
|
|
cost, out = DECOMPRESS_PUZZLE.run_with_cost(
|
|
INFINITE_COST, [DESERIALIZE_MOD, b"\xff", bytes(Program.to("pubkey")), b"\x80"]
|
|
)
|
|
|
|
print()
|
|
print(out)
|
|
|
|
# An empty CSE is invalid. (An empty CSE list may be okay)
|
|
# def test_decompress_empty_cse(self):
|
|
# cse0 = binutils.assemble("()")
|
|
# cost, out = DECOMPRESS_CSE.run_with_cost(INFINITE_COST, [DESERIALIZE_MOD, DECOMPRESS_PUZZLE, b"\xff", b"\x80", cse0])
|
|
# print()
|
|
# print(out)
|
|
|
|
def test_decompress_cse(self):
|
|
"""Decompress a single CSE / CoinSolutionEntry"""
|
|
cse0 = binutils.assemble(
|
|
"((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ())))"
|
|
) # noqa
|
|
cost, out = DECOMPRESS_CSE.run_with_cost(
|
|
INFINITE_COST, [DESERIALIZE_MOD, DECOMPRESS_PUZZLE, b"\xff", b"\x80", cse0]
|
|
)
|
|
|
|
print()
|
|
print(out)
|
|
|
|
def test_decompress_cse_with_prefix(self):
|
|
cse0 = binutils.assemble(
|
|
"((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ())))"
|
|
) # noqa
|
|
|
|
start = 2 + 44
|
|
end = start + 238
|
|
prefix = original_generator[start:end]
|
|
# (deserialize decompress_puzzle puzzle_prefix cse)
|
|
cost, out = DECOMPRESS_CSE_WITH_PREFIX.run_with_cost(
|
|
INFINITE_COST, [DESERIALIZE_MOD, DECOMPRESS_PUZZLE, prefix, cse0]
|
|
)
|
|
|
|
print()
|
|
print(out)
|
|
|
|
def test_block_program_zero(self):
|
|
"Decompress a list of CSEs"
|
|
self.maxDiff = None
|
|
cse1 = binutils.assemble(
|
|
"(((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))))"
|
|
) # noqa
|
|
cse2 = binutils.assemble(
|
|
"""
|
|
(
|
|
((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0)
|
|
(0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3
|
|
(() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))
|
|
)
|
|
|
|
((0x0000000000000000000000000000000000000000000000000000000000000001 0x0186a0)
|
|
(0xb0a6207f5173ec41491d9f2c1b8fff5579e13703077e0eaca8fe587669dcccf51e9209a6b65576845ece5f7c2f3229e7e3
|
|
(() (q (51 0x24254a3efc3ebfac9979bbe0d615e2eda043aa329905f65b63846fa24149e2b6 0x0186a0)) ())))
|
|
|
|
)
|
|
"""
|
|
) # noqa
|
|
|
|
start = 2 + 44
|
|
end = start + 238
|
|
|
|
# (mod (decompress_puzzle decompress_coin_solution_entry start end compressed_cses deserialize generator_list reserved_arg)
|
|
# cost, out = DECOMPRESS_BLOCK.run_with_cost(INFINITE_COST, [DECOMPRESS_PUZZLE, DECOMPRESS_CSE, start, Program.to(end), cse0, DESERIALIZE_MOD, bytes(original_generator)])
|
|
cost, out = DECOMPRESS_BLOCK.run_with_cost(
|
|
INFINITE_COST,
|
|
[
|
|
DECOMPRESS_PUZZLE,
|
|
DECOMPRESS_CSE_WITH_PREFIX,
|
|
start,
|
|
Program.to(end),
|
|
cse2,
|
|
DESERIALIZE_MOD,
|
|
[bytes(original_generator)],
|
|
],
|
|
)
|
|
|
|
print()
|
|
print(out)
|
|
|
|
def test_block_program_zero_with_curry(self):
|
|
self.maxDiff = None
|
|
cse1 = binutils.assemble(
|
|
"(((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0) (0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3 (() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))))"
|
|
) # noqa
|
|
cse2 = binutils.assemble(
|
|
"""
|
|
(
|
|
((0x0000000000000000000000000000000000000000000000000000000000000000 0x0186a0)
|
|
(0xb081963921826355dcb6c355ccf9c2637c18adf7d38ee44d803ea9ca41587e48c913d8d46896eb830aeadfc13144a8eac3
|
|
(() (q (51 0x6b7a83babea1eec790c947db4464ab657dbe9b887fe9acc247062847b8c2a8a9 0x0186a0)) ()))
|
|
)
|
|
|
|
((0x0000000000000000000000000000000000000000000000000000000000000001 0x0186a0)
|
|
(0xb0a6207f5173ec41491d9f2c1b8fff5579e13703077e0eaca8fe587669dcccf51e9209a6b65576845ece5f7c2f3229e7e3
|
|
(() (q (51 0x24254a3efc3ebfac9979bbe0d615e2eda043aa329905f65b63846fa24149e2b6 0x0186a0)) ())))
|
|
|
|
)
|
|
"""
|
|
) # noqa
|
|
|
|
start = 2 + 44
|
|
end = start + 238
|
|
|
|
# (mod (decompress_puzzle decompress_coin_solution_entry start end compressed_cses deserialize generator_list reserved_arg)
|
|
# cost, out = DECOMPRESS_BLOCK.run_with_cost(INFINITE_COST, [DECOMPRESS_PUZZLE, DECOMPRESS_CSE, start, Program.to(end), cse0, DESERIALIZE_MOD, bytes(original_generator)])
|
|
p = DECOMPRESS_BLOCK.curry(DECOMPRESS_PUZZLE, DECOMPRESS_CSE_WITH_PREFIX, start, Program.to(end))
|
|
cost, out = p.run_with_cost(INFINITE_COST, [cse2, DESERIALIZE_MOD, [bytes(original_generator)]])
|
|
|
|
print()
|
|
print(p)
|
|
print(out)
|
|
|
|
p_with_cses = DECOMPRESS_BLOCK.curry(
|
|
DECOMPRESS_PUZZLE, DECOMPRESS_CSE_WITH_PREFIX, start, Program.to(end), cse2, DESERIALIZE_MOD
|
|
)
|
|
generator_args = create_generator_args([SerializedProgram.from_bytes(original_generator)])
|
|
cost, out = p_with_cses.run_with_cost(INFINITE_COST, generator_args)
|
|
|
|
print()
|
|
print(p_with_cses)
|
|
print(out)
|