241 lines
11 KiB
Python
241 lines
11 KiB
Python
from typing import Dict, List, Optional
|
|
|
|
from blspy import AugSchemeMPL, G2Element, PrivateKey
|
|
|
|
from chia.consensus.constants import ConsensusConstants
|
|
from chia.util.hash import std_hash
|
|
from chia.types.announcement import Announcement
|
|
from chia.types.blockchain_format.coin import Coin
|
|
from chia.types.blockchain_format.program import Program
|
|
from chia.types.blockchain_format.sized_bytes import bytes32
|
|
from chia.types.coin_solution import CoinSolution
|
|
from chia.types.condition_opcodes import ConditionOpcode
|
|
from chia.types.condition_with_args import ConditionWithArgs
|
|
from chia.types.spend_bundle import SpendBundle
|
|
from chia.util.clvm import int_from_bytes, int_to_bytes
|
|
from chia.util.condition_tools import conditions_by_opcode, conditions_for_solution, pkm_pairs_for_conditions_dict
|
|
from chia.util.ints import uint32, uint64
|
|
from chia.wallet.derive_keys import master_sk_to_wallet_sk
|
|
from chia.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import (
|
|
DEFAULT_HIDDEN_PUZZLE_HASH,
|
|
calculate_synthetic_secret_key,
|
|
puzzle_for_pk,
|
|
solution_for_conditions,
|
|
)
|
|
from chia.wallet.puzzles.puzzle_utils import (
|
|
make_assert_aggsig_condition,
|
|
make_assert_coin_announcement,
|
|
make_assert_puzzle_announcement,
|
|
make_assert_relative_height_exceeds_condition,
|
|
make_assert_absolute_height_exceeds_condition,
|
|
make_assert_my_coin_id_condition,
|
|
make_assert_absolute_seconds_exceeds_condition,
|
|
make_assert_relative_seconds_exceeds_condition,
|
|
make_create_coin_announcement,
|
|
make_create_puzzle_announcement,
|
|
make_create_coin_condition,
|
|
make_reserve_fee_condition,
|
|
make_assert_my_parent_id,
|
|
make_assert_my_puzzlehash,
|
|
make_assert_my_amount,
|
|
)
|
|
|
|
DEFAULT_SEED = b"seed" * 8
|
|
assert len(DEFAULT_SEED) == 32
|
|
|
|
|
|
class WalletTool:
|
|
next_address = 0
|
|
pubkey_num_lookup: Dict[bytes, uint32] = {}
|
|
|
|
def __init__(self, constants: ConsensusConstants, sk: Optional[PrivateKey] = None):
|
|
self.constants = constants
|
|
self.current_balance = 0
|
|
self.my_utxos: set = set()
|
|
if sk is not None:
|
|
self.private_key = sk
|
|
else:
|
|
self.private_key = AugSchemeMPL.key_gen(DEFAULT_SEED)
|
|
self.generator_lookups: Dict = {}
|
|
self.puzzle_pk_cache: Dict = {}
|
|
self.get_new_puzzle()
|
|
|
|
def get_next_address_index(self) -> uint32:
|
|
self.next_address = uint32(self.next_address + 1)
|
|
return self.next_address
|
|
|
|
def get_private_key_for_puzzle_hash(self, puzzle_hash: bytes32) -> PrivateKey:
|
|
if puzzle_hash in self.puzzle_pk_cache:
|
|
child = self.puzzle_pk_cache[puzzle_hash]
|
|
private = master_sk_to_wallet_sk(self.private_key, uint32(child))
|
|
# pubkey = private.get_g1()
|
|
return private
|
|
else:
|
|
for child in range(self.next_address):
|
|
pubkey = master_sk_to_wallet_sk(self.private_key, uint32(child)).get_g1()
|
|
if puzzle_hash == puzzle_for_pk(bytes(pubkey)).get_tree_hash():
|
|
return master_sk_to_wallet_sk(self.private_key, uint32(child))
|
|
raise ValueError(f"Do not have the keys for puzzle hash {puzzle_hash}")
|
|
|
|
def puzzle_for_pk(self, pubkey: bytes) -> Program:
|
|
return puzzle_for_pk(pubkey)
|
|
|
|
def get_new_puzzle(self) -> bytes32:
|
|
next_address_index: uint32 = self.get_next_address_index()
|
|
pubkey = master_sk_to_wallet_sk(self.private_key, next_address_index).get_g1()
|
|
self.pubkey_num_lookup[bytes(pubkey)] = next_address_index
|
|
|
|
puzzle = puzzle_for_pk(bytes(pubkey))
|
|
|
|
self.puzzle_pk_cache[puzzle.get_tree_hash()] = next_address_index
|
|
return puzzle
|
|
|
|
def get_new_puzzlehash(self) -> bytes32:
|
|
puzzle = self.get_new_puzzle()
|
|
return puzzle.get_tree_hash()
|
|
|
|
def sign(self, value: bytes, pubkey: bytes) -> G2Element:
|
|
privatekey: PrivateKey = master_sk_to_wallet_sk(self.private_key, self.pubkey_num_lookup[pubkey])
|
|
return AugSchemeMPL.sign(privatekey, value)
|
|
|
|
def make_solution(self, condition_dic: Dict[ConditionOpcode, List[ConditionWithArgs]]) -> Program:
|
|
ret = []
|
|
|
|
for con_list in condition_dic.values():
|
|
for cvp in con_list:
|
|
if cvp.opcode == ConditionOpcode.CREATE_COIN:
|
|
ret.append(make_create_coin_condition(cvp.vars[0], cvp.vars[1]))
|
|
if cvp.opcode == ConditionOpcode.CREATE_COIN_ANNOUNCEMENT:
|
|
ret.append(make_create_coin_announcement(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.CREATE_PUZZLE_ANNOUNCEMENT:
|
|
ret.append(make_create_puzzle_announcement(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.AGG_SIG_UNSAFE:
|
|
ret.append(make_assert_aggsig_condition(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT:
|
|
ret.append(make_assert_coin_announcement(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT:
|
|
ret.append(make_assert_puzzle_announcement(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_SECONDS_ABSOLUTE:
|
|
ret.append(make_assert_absolute_seconds_exceeds_condition(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_SECONDS_RELATIVE:
|
|
ret.append(make_assert_relative_seconds_exceeds_condition(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_MY_COIN_ID:
|
|
ret.append(make_assert_my_coin_id_condition(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_HEIGHT_ABSOLUTE:
|
|
ret.append(make_assert_absolute_height_exceeds_condition(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_HEIGHT_RELATIVE:
|
|
ret.append(make_assert_relative_height_exceeds_condition(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.RESERVE_FEE:
|
|
ret.append(make_reserve_fee_condition(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_MY_PARENT_ID:
|
|
ret.append(make_assert_my_parent_id(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_MY_PUZZLEHASH:
|
|
ret.append(make_assert_my_puzzlehash(cvp.vars[0]))
|
|
if cvp.opcode == ConditionOpcode.ASSERT_MY_AMOUNT:
|
|
ret.append(make_assert_my_amount(cvp.vars[0]))
|
|
return solution_for_conditions(Program.to(ret))
|
|
|
|
def generate_unsigned_transaction(
|
|
self,
|
|
amount: uint64,
|
|
new_puzzle_hash: bytes32,
|
|
coins: List[Coin],
|
|
condition_dic: Dict[ConditionOpcode, List[ConditionWithArgs]],
|
|
fee: int = 0,
|
|
secret_key: Optional[PrivateKey] = None,
|
|
) -> List[CoinSolution]:
|
|
spends = []
|
|
|
|
spend_value = sum([c.amount for c in coins])
|
|
|
|
if ConditionOpcode.CREATE_COIN not in condition_dic:
|
|
condition_dic[ConditionOpcode.CREATE_COIN] = []
|
|
if ConditionOpcode.CREATE_COIN_ANNOUNCEMENT not in condition_dic:
|
|
condition_dic[ConditionOpcode.CREATE_COIN_ANNOUNCEMENT] = []
|
|
|
|
output = ConditionWithArgs(ConditionOpcode.CREATE_COIN, [new_puzzle_hash, int_to_bytes(amount)])
|
|
condition_dic[output.opcode].append(output)
|
|
amount_total = sum(int_from_bytes(cvp.vars[1]) for cvp in condition_dic[ConditionOpcode.CREATE_COIN])
|
|
change = spend_value - amount_total - fee
|
|
if change > 0:
|
|
change_puzzle_hash = self.get_new_puzzlehash()
|
|
change_output = ConditionWithArgs(ConditionOpcode.CREATE_COIN, [change_puzzle_hash, int_to_bytes(change)])
|
|
condition_dic[output.opcode].append(change_output)
|
|
|
|
secondary_coins_cond_dic: Dict[ConditionOpcode, List[ConditionWithArgs]] = dict()
|
|
secondary_coins_cond_dic[ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT] = []
|
|
for n, coin in enumerate(coins):
|
|
puzzle_hash = coin.puzzle_hash
|
|
if secret_key is None:
|
|
secret_key = self.get_private_key_for_puzzle_hash(puzzle_hash)
|
|
pubkey = secret_key.get_g1()
|
|
puzzle = puzzle_for_pk(bytes(pubkey))
|
|
if n == 0:
|
|
message_list = [c.name() for c in coins]
|
|
for outputs in condition_dic[ConditionOpcode.CREATE_COIN]:
|
|
message_list.append(Coin(coin.name(), outputs.vars[0], int_from_bytes(outputs.vars[1])).name())
|
|
message = std_hash(b"".join(message_list))
|
|
condition_dic[ConditionOpcode.CREATE_COIN_ANNOUNCEMENT].append(
|
|
ConditionWithArgs(ConditionOpcode.CREATE_COIN_ANNOUNCEMENT, [message])
|
|
)
|
|
primary_announcement_hash = Announcement(coin.name(), message).name()
|
|
secondary_coins_cond_dic[ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT].append(
|
|
ConditionWithArgs(ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT, [primary_announcement_hash])
|
|
)
|
|
main_solution = self.make_solution(condition_dic)
|
|
spends.append(CoinSolution(coin, puzzle, main_solution))
|
|
else:
|
|
spends.append(CoinSolution(coin, puzzle, self.make_solution(secondary_coins_cond_dic)))
|
|
return spends
|
|
|
|
def sign_transaction(self, coin_solutions: List[CoinSolution]) -> SpendBundle:
|
|
signatures = []
|
|
solution: Program
|
|
puzzle: Program
|
|
for coin_solution in coin_solutions: # type: ignore # noqa
|
|
secret_key = self.get_private_key_for_puzzle_hash(coin_solution.coin.puzzle_hash)
|
|
synthetic_secret_key = calculate_synthetic_secret_key(secret_key, DEFAULT_HIDDEN_PUZZLE_HASH)
|
|
err, con, cost = conditions_for_solution(
|
|
coin_solution.puzzle_reveal, coin_solution.solution, self.constants.MAX_BLOCK_COST_CLVM
|
|
)
|
|
if not con:
|
|
raise ValueError(err)
|
|
conditions_dict = conditions_by_opcode(con)
|
|
|
|
for _, msg in pkm_pairs_for_conditions_dict(
|
|
conditions_dict, bytes(coin_solution.coin.name()), self.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
|
):
|
|
signature = AugSchemeMPL.sign(synthetic_secret_key, msg)
|
|
signatures.append(signature)
|
|
aggsig = AugSchemeMPL.aggregate(signatures)
|
|
spend_bundle = SpendBundle(coin_solutions, aggsig)
|
|
return spend_bundle
|
|
|
|
def generate_signed_transaction(
|
|
self,
|
|
amount: uint64,
|
|
new_puzzle_hash: bytes32,
|
|
coin: Coin,
|
|
condition_dic: Dict[ConditionOpcode, List[ConditionWithArgs]] = None,
|
|
fee: int = 0,
|
|
) -> SpendBundle:
|
|
if condition_dic is None:
|
|
condition_dic = {}
|
|
transaction = self.generate_unsigned_transaction(amount, new_puzzle_hash, [coin], condition_dic, fee)
|
|
assert transaction is not None
|
|
return self.sign_transaction(transaction)
|
|
|
|
def generate_signed_transaction_multiple_coins(
|
|
self,
|
|
amount: uint64,
|
|
new_puzzle_hash: bytes32,
|
|
coins: List[Coin],
|
|
condition_dic: Dict[ConditionOpcode, List[ConditionWithArgs]] = None,
|
|
fee: int = 0,
|
|
) -> SpendBundle:
|
|
if condition_dic is None:
|
|
condition_dic = {}
|
|
transaction = self.generate_unsigned_transaction(amount, new_puzzle_hash, coins, condition_dic, fee)
|
|
assert transaction is not None
|
|
return self.sign_transaction(transaction)
|