chia-blockchain/chia/full_node/mempool_manager.py

557 lines
24 KiB
Python

import asyncio
import collections
import dataclasses
import logging
import time
from concurrent.futures.process import ProcessPoolExecutor
from typing import Dict, List, Optional, Set, Tuple
from blspy import AugSchemeMPL, G1Element
from chiabip158 import PyBIP158
from chia.consensus.block_record import BlockRecord
from chia.consensus.constants import ConsensusConstants
from chia.consensus.cost_calculator import NPCResult, calculate_cost_of_program
from chia.full_node.bundle_tools import simple_solution_generator
from chia.full_node.coin_store import CoinStore
from chia.full_node.mempool import Mempool
from chia.full_node.mempool_check_conditions import mempool_check_conditions_dict, get_name_puzzle_conditions
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import SerializedProgram
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_record import CoinRecord
from chia.types.condition_opcodes import ConditionOpcode
from chia.types.condition_with_args import ConditionWithArgs
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.mempool_item import MempoolItem
from chia.types.spend_bundle import SpendBundle
from chia.util.clvm import int_from_bytes
from chia.util.condition_tools import (
pkm_pairs_for_conditions_dict,
coin_announcements_names_for_npc,
puzzle_announcements_names_for_npc,
)
from chia.util.errors import Err
from chia.util.generator_tools import additions_for_npc
from chia.util.ints import uint32, uint64
from chia.util.streamable import recurse_jsonify
log = logging.getLogger(__name__)
def get_npc_multiprocess(spend_bundle_bytes: bytes, max_cost: int) -> bytes:
program = simple_solution_generator(SpendBundle.from_bytes(spend_bundle_bytes))
# npc contains names of the coins removed, puzzle_hashes and their spend conditions
return bytes(get_name_puzzle_conditions(program, max_cost, True))
class MempoolManager:
def __init__(self, coin_store: CoinStore, consensus_constants: ConsensusConstants):
self.constants: ConsensusConstants = consensus_constants
self.constants_json = recurse_jsonify(dataclasses.asdict(self.constants))
# Transactions that were unable to enter mempool, used for retry. (they were invalid)
self.potential_txs: Dict[bytes32, MempoolItem] = {}
# Keep track of seen spend_bundles
self.seen_bundle_hashes: Dict[bytes32, bytes32] = {}
self.coin_store = coin_store
# The fee per cost must be above this amount to consider the fee "nonzero", and thus able to kick out other
# transactions. This prevents spam. This is equivalent to 0.055 XCH per block, or about 0.00005 XCH for two
# spends.
self.nonzero_fee_minimum_fpc = 5
self.limit_factor = 0.5
self.mempool_max_total_cost = int(self.constants.MAX_BLOCK_COST_CLVM * self.constants.MEMPOOL_BLOCK_BUFFER)
self.potential_cache_max_total_cost = int(self.constants.MAX_BLOCK_COST_CLVM * 5)
self.potential_cache_cost: int = 0
self.seen_cache_size = 10000
self.pool = ProcessPoolExecutor(max_workers=1)
# The mempool will correspond to a certain peak
self.peak: Optional[BlockRecord] = None
self.mempool: Mempool = Mempool(self.mempool_max_total_cost)
self.lock: asyncio.Lock = asyncio.Lock()
def shut_down(self):
self.pool.shutdown(wait=True)
async def create_bundle_from_mempool(
self, last_tb_header_hash: bytes32
) -> Optional[Tuple[SpendBundle, List[Coin], List[Coin]]]:
"""
Returns aggregated spendbundle that can be used for creating new block,
additions and removals in that spend_bundle
"""
if (
self.peak is None
or self.peak.header_hash != last_tb_header_hash
or int(time.time()) <= self.constants.INITIAL_FREEZE_END_TIMESTAMP
):
return None
cost_sum = 0 # Checks that total cost does not exceed block maximum
fee_sum = 0 # Checks that total fees don't exceed 64 bits
spend_bundles: List[SpendBundle] = []
removals = []
additions = []
broke_from_inner_loop = False
log.info(f"Starting to make block, max cost: {self.constants.MAX_BLOCK_COST_CLVM}")
for dic in reversed(self.mempool.sorted_spends.values()):
if broke_from_inner_loop:
break
for item in dic.values():
log.info(f"Cumulative cost: {cost_sum}, fee per cost: {item.fee / item.cost}")
if (
item.cost + cost_sum <= self.limit_factor * self.constants.MAX_BLOCK_COST_CLVM
and item.fee + fee_sum <= self.constants.MAX_COIN_AMOUNT
):
spend_bundles.append(item.spend_bundle)
cost_sum += item.cost
fee_sum += item.fee
removals.extend(item.removals)
additions.extend(item.additions)
else:
broke_from_inner_loop = True
break
if len(spend_bundles) > 0:
log.info(
f"Cumulative cost of block (real cost should be less) {cost_sum}. Proportion "
f"full: {cost_sum / self.constants.MAX_BLOCK_COST_CLVM}"
)
agg = SpendBundle.aggregate(spend_bundles)
assert set(agg.additions()) == set(additions)
assert set(agg.removals()) == set(removals)
return agg, additions, removals
else:
return None
def get_filter(self) -> bytes:
all_transactions: Set[bytes32] = set()
byte_array_list = []
for key, _ in self.mempool.spends.items():
if key not in all_transactions:
all_transactions.add(key)
byte_array_list.append(bytearray(key))
tx_filter: PyBIP158 = PyBIP158(byte_array_list)
return bytes(tx_filter.GetEncoded())
def is_fee_enough(self, fees: uint64, cost: uint64) -> bool:
"""
Determines whether any of the pools can accept a transaction with a given fees
and cost.
"""
if cost == 0:
return False
fees_per_cost = fees / cost
if not self.mempool.at_full_capacity(cost) or (
fees_per_cost >= self.nonzero_fee_minimum_fpc and fees_per_cost > self.mempool.get_min_fee_rate(cost)
):
return True
return False
def add_and_maybe_pop_seen(self, spend_name: bytes32):
self.seen_bundle_hashes[spend_name] = spend_name
while len(self.seen_bundle_hashes) > self.seen_cache_size:
first_in = list(self.seen_bundle_hashes.keys())[0]
self.seen_bundle_hashes.pop(first_in)
def seen(self, bundle_hash: bytes32) -> bool:
"""Return true if we saw this spendbundle recently"""
return bundle_hash in self.seen_bundle_hashes
def remove_seen(self, bundle_hash: bytes32):
if bundle_hash in self.seen_bundle_hashes:
self.seen_bundle_hashes.pop(bundle_hash)
@staticmethod
def get_min_fee_increase() -> int:
# 0.00001 XCH
return 10000000
def can_replace(
self,
conflicting_items: Dict[bytes32, MempoolItem],
removals: Dict[bytes32, CoinRecord],
fees: uint64,
fees_per_cost: float,
) -> bool:
conflicting_fees = 0
conflicting_cost = 0
for item in conflicting_items.values():
conflicting_fees += item.fee
conflicting_cost += item.cost
# All coins spent in all conflicting items must also be spent in
# the new item
for coin in item.removals:
if coin.name() not in removals:
log.debug(f"Rejecting conflicting tx as it does not spend conflicting coin {coin.name()}")
return False
# New item must have higher fee per cost
conflicting_fees_per_cost = conflicting_fees / conflicting_cost
if fees_per_cost <= conflicting_fees_per_cost:
log.debug(
f"Rejecting conflicting tx due to not increasing fees per cost "
f"({fees_per_cost} <= {conflicting_fees_per_cost})"
)
return False
# New item must increase the total fee at least by a certain amount
fee_increase = fees - conflicting_fees
if fee_increase < self.get_min_fee_increase():
log.debug(f"Rejecting conflicting tx due to low fee increase ({fee_increase})")
return False
log.info(f"Replacing conflicting tx in mempool. New tx fee: {fees}, old tx fees: {conflicting_fees}")
return True
async def pre_validate_spendbundle(self, new_spend: SpendBundle) -> NPCResult:
"""
Errors are included within the cached_result.
This runs in another process so we don't block the main thread
"""
start_time = time.time()
cached_result_bytes = await asyncio.get_running_loop().run_in_executor(
self.pool, get_npc_multiprocess, bytes(new_spend), self.constants.MAX_BLOCK_COST_CLVM
)
end_time = time.time()
log.info(f"It took {end_time - start_time} to pre validate transaction")
return NPCResult.from_bytes(cached_result_bytes)
async def add_spendbundle(
self,
new_spend: SpendBundle,
npc_result: NPCResult,
spend_name: bytes32,
validate_signature=True,
program: Optional[SerializedProgram] = None,
) -> Tuple[Optional[uint64], MempoolInclusionStatus, Optional[Err]]:
"""
Tries to add spend bundle to the mempool
Returns the cost (if SUCCESS), the result (MempoolInclusion status), and an optional error
"""
start_time = time.time()
if self.peak is None:
return None, MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED
npc_list = npc_result.npc_list
if program is None:
program = simple_solution_generator(new_spend).program
cost = calculate_cost_of_program(program, npc_result, self.constants.COST_PER_BYTE)
log.debug(f"Cost: {cost}")
if cost > int(self.limit_factor * self.constants.MAX_BLOCK_COST_CLVM):
return None, MempoolInclusionStatus.FAILED, Err.BLOCK_COST_EXCEEDS_MAX
if npc_result.error is not None:
return None, MempoolInclusionStatus.FAILED, Err(npc_result.error)
# build removal list
removal_names: List[bytes32] = [npc.coin_name for npc in npc_list]
additions = additions_for_npc(npc_list)
additions_dict: Dict[bytes32, Coin] = {}
for add in additions:
additions_dict[add.name()] = add
addition_amount = uint64(0)
# Check additions for max coin amount
for coin in additions:
if coin.amount < 0:
return (
None,
MempoolInclusionStatus.FAILED,
Err.COIN_AMOUNT_NEGATIVE,
)
if coin.amount > self.constants.MAX_COIN_AMOUNT:
return (
None,
MempoolInclusionStatus.FAILED,
Err.COIN_AMOUNT_EXCEEDS_MAXIMUM,
)
addition_amount = uint64(addition_amount + coin.amount)
# Check for duplicate outputs
addition_counter = collections.Counter(_.name() for _ in additions)
for k, v in addition_counter.items():
if v > 1:
return None, MempoolInclusionStatus.FAILED, Err.DUPLICATE_OUTPUT
# Check for duplicate inputs
removal_counter = collections.Counter(name for name in removal_names)
for k, v in removal_counter.items():
if v > 1:
return None, MempoolInclusionStatus.FAILED, Err.DOUBLE_SPEND
# Skip if already added
if spend_name in self.mempool.spends:
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
removal_record_dict: Dict[bytes32, CoinRecord] = {}
removal_coin_dict: Dict[bytes32, Coin] = {}
removal_amount = uint64(0)
for name in removal_names:
removal_record = await self.coin_store.get_coin_record(name)
if removal_record is None and name not in additions_dict:
return None, MempoolInclusionStatus.FAILED, Err.UNKNOWN_UNSPENT
elif name in additions_dict:
removal_coin = additions_dict[name]
# TODO(straya): what timestamp to use here?
assert self.peak.timestamp is not None
removal_record = CoinRecord(
removal_coin,
uint32(self.peak.height + 1), # In mempool, so will be included in next height
uint32(0),
False,
False,
uint64(self.peak.timestamp + 1),
)
assert removal_record is not None
removal_amount = uint64(removal_amount + removal_record.coin.amount)
removal_record_dict[name] = removal_record
removal_coin_dict[name] = removal_record.coin
removals: List[Coin] = [coin for coin in removal_coin_dict.values()]
if addition_amount > removal_amount:
print(addition_amount, removal_amount)
return None, MempoolInclusionStatus.FAILED, Err.MINTING_COIN
fees = uint64(removal_amount - addition_amount)
assert_fee_sum: uint64 = uint64(0)
for npc in npc_list:
if ConditionOpcode.RESERVE_FEE in npc.condition_dict:
fee_list: List[ConditionWithArgs] = npc.condition_dict[ConditionOpcode.RESERVE_FEE]
for cvp in fee_list:
fee = int_from_bytes(cvp.vars[0])
if fee < 0:
return None, MempoolInclusionStatus.FAILED, Err.RESERVE_FEE_CONDITION_FAILED
assert_fee_sum = assert_fee_sum + fee
if fees < assert_fee_sum:
return (
None,
MempoolInclusionStatus.FAILED,
Err.RESERVE_FEE_CONDITION_FAILED,
)
if cost == 0:
return None, MempoolInclusionStatus.FAILED, Err.UNKNOWN
fees_per_cost: float = fees / cost
# If pool is at capacity check the fee, if not then accept even without the fee
if self.mempool.at_full_capacity(cost):
if fees_per_cost < self.nonzero_fee_minimum_fpc:
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_TOO_CLOSE_TO_ZERO
if fees_per_cost <= self.mempool.get_min_fee_rate(cost):
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_LOW_FEE
# Check removals against UnspentDB + DiffStore + Mempool + SpendBundle
# Use this information later when constructing a block
fail_reason, conflicts = await self.check_removals(removal_record_dict)
# If there is a mempool conflict check if this spendbundle has a higher fee per cost than all others
tmp_error: Optional[Err] = None
conflicting_pool_items: Dict[bytes32, MempoolItem] = {}
if fail_reason is Err.MEMPOOL_CONFLICT:
for conflicting in conflicts:
sb: MempoolItem = self.mempool.removals[conflicting.name()]
conflicting_pool_items[sb.name] = sb
if not self.can_replace(conflicting_pool_items, removal_record_dict, fees, fees_per_cost):
potential = MempoolItem(
new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program
)
self.add_to_potential_tx_set(potential)
return (
uint64(cost),
MempoolInclusionStatus.PENDING,
Err.MEMPOOL_CONFLICT,
)
elif fail_reason:
return None, MempoolInclusionStatus.FAILED, fail_reason
if tmp_error:
return None, MempoolInclusionStatus.FAILED, tmp_error
# Verify conditions, create hash_key list for aggsig check
pks: List[G1Element] = []
msgs: List[bytes32] = []
error: Optional[Err] = None
coin_announcements_in_spend: Set[bytes32] = coin_announcements_names_for_npc(npc_list)
puzzle_announcements_in_spend: Set[bytes32] = puzzle_announcements_names_for_npc(npc_list)
for npc in npc_list:
coin_record: CoinRecord = removal_record_dict[npc.coin_name]
# Check that the revealed removal puzzles actually match the puzzle hash
if npc.puzzle_hash != coin_record.coin.puzzle_hash:
log.warning("Mempool rejecting transaction because of wrong puzzle_hash")
log.warning(f"{npc.puzzle_hash} != {coin_record.coin.puzzle_hash}")
return None, MempoolInclusionStatus.FAILED, Err.WRONG_PUZZLE_HASH
chialisp_height = (
self.peak.prev_transaction_block_height if not self.peak.is_transaction_block else self.peak.height
)
assert self.peak.timestamp is not None
error = mempool_check_conditions_dict(
coin_record,
coin_announcements_in_spend,
puzzle_announcements_in_spend,
npc.condition_dict,
uint32(chialisp_height),
self.peak.timestamp,
)
if error:
if error is Err.ASSERT_HEIGHT_ABSOLUTE_FAILED or error is Err.ASSERT_HEIGHT_RELATIVE_FAILED:
potential = MempoolItem(
new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program
)
self.add_to_potential_tx_set(potential)
return uint64(cost), MempoolInclusionStatus.PENDING, error
break
if validate_signature:
for pk, message in pkm_pairs_for_conditions_dict(
npc.condition_dict, npc.coin_name, self.constants.AGG_SIG_ME_ADDITIONAL_DATA
):
pks.append(pk)
msgs.append(message)
if error:
return None, MempoolInclusionStatus.FAILED, error
if validate_signature:
# Verify aggregated signature
if not AugSchemeMPL.aggregate_verify(pks, msgs, new_spend.aggregated_signature):
log.warning(f"Aggsig validation error {pks} {msgs} {new_spend}")
return None, MempoolInclusionStatus.FAILED, Err.BAD_AGGREGATE_SIGNATURE
# Remove all conflicting Coins and SpendBundles
if fail_reason:
mempool_item: MempoolItem
for mempool_item in conflicting_pool_items.values():
self.mempool.remove_from_pool(mempool_item)
new_item = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program)
self.mempool.add_to_pool(new_item, additions, removal_coin_dict)
log.info(
f"add_spendbundle took {time.time() - start_time} seconds, cost {cost} "
f"({round(100.0 * cost/self.constants.MAX_BLOCK_COST_CLVM, 3)}%)"
)
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
async def check_removals(self, removals: Dict[bytes32, CoinRecord]) -> Tuple[Optional[Err], List[Coin]]:
"""
This function checks for double spends, unknown spends and conflicting transactions in mempool.
Returns Error (if any), dictionary of Unspents, list of coins with conflict errors (if any any).
Note that additions are not checked for duplicates, because having duplicate additions requires also
having duplicate removals.
"""
assert self.peak is not None
conflicts: List[Coin] = []
for record in removals.values():
removal = record.coin
# 1. Checks if it's been spent already
if record.spent == 1:
return Err.DOUBLE_SPEND, []
# 2. Checks if there's a mempool conflict
if removal.name() in self.mempool.removals:
conflicts.append(removal)
if len(conflicts) > 0:
return Err.MEMPOOL_CONFLICT, conflicts
# 5. If coins can be spent return list of unspents as we see them in local storage
return None, []
def add_to_potential_tx_set(self, item: MempoolItem):
"""
Adds SpendBundles that have failed to be added to the pool in potential tx set.
This is later used to retry to add them.
"""
if item.spend_bundle_name in self.potential_txs:
return None
self.potential_txs[item.spend_bundle_name] = item
self.potential_cache_cost += item.cost
while self.potential_cache_cost > self.potential_cache_max_total_cost:
first_in = list(self.potential_txs.keys())[0]
self.potential_cache_max_total_cost -= self.potential_txs[first_in].cost
self.potential_txs.pop(first_in)
def get_spendbundle(self, bundle_hash: bytes32) -> Optional[SpendBundle]:
"""Returns a full SpendBundle if it's inside one the mempools"""
if bundle_hash in self.mempool.spends:
return self.mempool.spends[bundle_hash].spend_bundle
return None
def get_mempool_item(self, bundle_hash: bytes32) -> Optional[MempoolItem]:
"""Returns a MempoolItem if it's inside one the mempools"""
if bundle_hash in self.mempool.spends:
return self.mempool.spends[bundle_hash]
return None
async def new_peak(self, new_peak: Optional[BlockRecord]) -> List[Tuple[SpendBundle, NPCResult, bytes32]]:
"""
Called when a new peak is available, we try to recreate a mempool for the new tip.
"""
if new_peak is None:
return []
if new_peak.is_transaction_block is False:
return []
if self.peak == new_peak:
return []
assert new_peak.timestamp is not None
if new_peak.timestamp <= self.constants.INITIAL_FREEZE_END_TIMESTAMP:
return []
self.peak = new_peak
old_pool = self.mempool
async with self.lock:
self.mempool = Mempool(self.mempool_max_total_cost)
for item in old_pool.spends.values():
_, result, _ = await self.add_spendbundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, False, item.program
)
# If the spend bundle was confirmed or conflicting (can no longer be in mempool), it won't be
# successfully added to the new mempool. In this case, remove it from seen, so in the case of a reorg,
# it can be resubmitted
if result != MempoolInclusionStatus.SUCCESS:
self.remove_seen(item.spend_bundle_name)
potential_txs_copy = self.potential_txs.copy()
self.potential_txs = {}
txs_added = []
for item in potential_txs_copy.values():
cost, status, error = await self.add_spendbundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, program=item.program
)
if status == MempoolInclusionStatus.SUCCESS:
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))
log.info(
f"Size of mempool: {len(self.mempool.spends)} spends, cost: {self.mempool.total_mempool_cost} "
f"minimum fee to get in: {self.mempool.get_min_fee_rate(100000)}"
)
return txs_added
async def get_items_not_in_filter(self, mempool_filter: PyBIP158, limit: int = 100) -> List[MempoolItem]:
items: List[MempoolItem] = []
counter = 0
broke_from_inner_loop = False
# Send 100 with highest fee per cost
for dic in self.mempool.sorted_spends.values():
if broke_from_inner_loop:
break
for item in dic.values():
if counter == limit:
broke_from_inner_loop = True
break
if mempool_filter.Match(bytearray(item.spend_bundle_name)):
continue
items.append(item)
counter += 1
return items