557 lines
24 KiB
Python
557 lines
24 KiB
Python
import asyncio
|
|
import collections
|
|
import dataclasses
|
|
import logging
|
|
import time
|
|
from concurrent.futures.process import ProcessPoolExecutor
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from blspy import AugSchemeMPL, G1Element
|
|
from chiabip158 import PyBIP158
|
|
|
|
from chia.consensus.block_record import BlockRecord
|
|
from chia.consensus.constants import ConsensusConstants
|
|
from chia.consensus.cost_calculator import NPCResult, calculate_cost_of_program
|
|
from chia.full_node.bundle_tools import simple_solution_generator
|
|
from chia.full_node.coin_store import CoinStore
|
|
from chia.full_node.mempool import Mempool
|
|
from chia.full_node.mempool_check_conditions import mempool_check_conditions_dict, get_name_puzzle_conditions
|
|
from chia.types.blockchain_format.coin import Coin
|
|
from chia.types.blockchain_format.program import SerializedProgram
|
|
from chia.types.blockchain_format.sized_bytes import bytes32
|
|
from chia.types.coin_record import CoinRecord
|
|
from chia.types.condition_opcodes import ConditionOpcode
|
|
from chia.types.condition_with_args import ConditionWithArgs
|
|
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
|
|
from chia.types.mempool_item import MempoolItem
|
|
from chia.types.spend_bundle import SpendBundle
|
|
from chia.util.clvm import int_from_bytes
|
|
from chia.util.condition_tools import (
|
|
pkm_pairs_for_conditions_dict,
|
|
coin_announcements_names_for_npc,
|
|
puzzle_announcements_names_for_npc,
|
|
)
|
|
from chia.util.errors import Err
|
|
from chia.util.generator_tools import additions_for_npc
|
|
from chia.util.ints import uint32, uint64
|
|
from chia.util.streamable import recurse_jsonify
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def get_npc_multiprocess(spend_bundle_bytes: bytes, max_cost: int) -> bytes:
|
|
program = simple_solution_generator(SpendBundle.from_bytes(spend_bundle_bytes))
|
|
# npc contains names of the coins removed, puzzle_hashes and their spend conditions
|
|
return bytes(get_name_puzzle_conditions(program, max_cost, True))
|
|
|
|
|
|
class MempoolManager:
|
|
def __init__(self, coin_store: CoinStore, consensus_constants: ConsensusConstants):
|
|
self.constants: ConsensusConstants = consensus_constants
|
|
self.constants_json = recurse_jsonify(dataclasses.asdict(self.constants))
|
|
|
|
# Transactions that were unable to enter mempool, used for retry. (they were invalid)
|
|
self.potential_txs: Dict[bytes32, MempoolItem] = {}
|
|
# Keep track of seen spend_bundles
|
|
self.seen_bundle_hashes: Dict[bytes32, bytes32] = {}
|
|
|
|
self.coin_store = coin_store
|
|
|
|
# The fee per cost must be above this amount to consider the fee "nonzero", and thus able to kick out other
|
|
# transactions. This prevents spam. This is equivalent to 0.055 XCH per block, or about 0.00005 XCH for two
|
|
# spends.
|
|
self.nonzero_fee_minimum_fpc = 5
|
|
|
|
self.limit_factor = 0.5
|
|
self.mempool_max_total_cost = int(self.constants.MAX_BLOCK_COST_CLVM * self.constants.MEMPOOL_BLOCK_BUFFER)
|
|
self.potential_cache_max_total_cost = int(self.constants.MAX_BLOCK_COST_CLVM * 5)
|
|
self.potential_cache_cost: int = 0
|
|
self.seen_cache_size = 10000
|
|
self.pool = ProcessPoolExecutor(max_workers=1)
|
|
|
|
# The mempool will correspond to a certain peak
|
|
self.peak: Optional[BlockRecord] = None
|
|
self.mempool: Mempool = Mempool(self.mempool_max_total_cost)
|
|
self.lock: asyncio.Lock = asyncio.Lock()
|
|
|
|
def shut_down(self):
|
|
self.pool.shutdown(wait=True)
|
|
|
|
async def create_bundle_from_mempool(
|
|
self, last_tb_header_hash: bytes32
|
|
) -> Optional[Tuple[SpendBundle, List[Coin], List[Coin]]]:
|
|
"""
|
|
Returns aggregated spendbundle that can be used for creating new block,
|
|
additions and removals in that spend_bundle
|
|
"""
|
|
if (
|
|
self.peak is None
|
|
or self.peak.header_hash != last_tb_header_hash
|
|
or int(time.time()) <= self.constants.INITIAL_FREEZE_END_TIMESTAMP
|
|
):
|
|
return None
|
|
|
|
cost_sum = 0 # Checks that total cost does not exceed block maximum
|
|
fee_sum = 0 # Checks that total fees don't exceed 64 bits
|
|
spend_bundles: List[SpendBundle] = []
|
|
removals = []
|
|
additions = []
|
|
broke_from_inner_loop = False
|
|
log.info(f"Starting to make block, max cost: {self.constants.MAX_BLOCK_COST_CLVM}")
|
|
for dic in reversed(self.mempool.sorted_spends.values()):
|
|
if broke_from_inner_loop:
|
|
break
|
|
for item in dic.values():
|
|
log.info(f"Cumulative cost: {cost_sum}, fee per cost: {item.fee / item.cost}")
|
|
if (
|
|
item.cost + cost_sum <= self.limit_factor * self.constants.MAX_BLOCK_COST_CLVM
|
|
and item.fee + fee_sum <= self.constants.MAX_COIN_AMOUNT
|
|
):
|
|
spend_bundles.append(item.spend_bundle)
|
|
cost_sum += item.cost
|
|
fee_sum += item.fee
|
|
removals.extend(item.removals)
|
|
additions.extend(item.additions)
|
|
else:
|
|
broke_from_inner_loop = True
|
|
break
|
|
if len(spend_bundles) > 0:
|
|
log.info(
|
|
f"Cumulative cost of block (real cost should be less) {cost_sum}. Proportion "
|
|
f"full: {cost_sum / self.constants.MAX_BLOCK_COST_CLVM}"
|
|
)
|
|
agg = SpendBundle.aggregate(spend_bundles)
|
|
assert set(agg.additions()) == set(additions)
|
|
assert set(agg.removals()) == set(removals)
|
|
return agg, additions, removals
|
|
else:
|
|
return None
|
|
|
|
def get_filter(self) -> bytes:
|
|
all_transactions: Set[bytes32] = set()
|
|
byte_array_list = []
|
|
for key, _ in self.mempool.spends.items():
|
|
if key not in all_transactions:
|
|
all_transactions.add(key)
|
|
byte_array_list.append(bytearray(key))
|
|
|
|
tx_filter: PyBIP158 = PyBIP158(byte_array_list)
|
|
return bytes(tx_filter.GetEncoded())
|
|
|
|
def is_fee_enough(self, fees: uint64, cost: uint64) -> bool:
|
|
"""
|
|
Determines whether any of the pools can accept a transaction with a given fees
|
|
and cost.
|
|
"""
|
|
if cost == 0:
|
|
return False
|
|
fees_per_cost = fees / cost
|
|
if not self.mempool.at_full_capacity(cost) or (
|
|
fees_per_cost >= self.nonzero_fee_minimum_fpc and fees_per_cost > self.mempool.get_min_fee_rate(cost)
|
|
):
|
|
return True
|
|
return False
|
|
|
|
def add_and_maybe_pop_seen(self, spend_name: bytes32):
|
|
self.seen_bundle_hashes[spend_name] = spend_name
|
|
while len(self.seen_bundle_hashes) > self.seen_cache_size:
|
|
first_in = list(self.seen_bundle_hashes.keys())[0]
|
|
self.seen_bundle_hashes.pop(first_in)
|
|
|
|
def seen(self, bundle_hash: bytes32) -> bool:
|
|
"""Return true if we saw this spendbundle recently"""
|
|
return bundle_hash in self.seen_bundle_hashes
|
|
|
|
def remove_seen(self, bundle_hash: bytes32):
|
|
if bundle_hash in self.seen_bundle_hashes:
|
|
self.seen_bundle_hashes.pop(bundle_hash)
|
|
|
|
@staticmethod
|
|
def get_min_fee_increase() -> int:
|
|
# 0.00001 XCH
|
|
return 10000000
|
|
|
|
def can_replace(
|
|
self,
|
|
conflicting_items: Dict[bytes32, MempoolItem],
|
|
removals: Dict[bytes32, CoinRecord],
|
|
fees: uint64,
|
|
fees_per_cost: float,
|
|
) -> bool:
|
|
conflicting_fees = 0
|
|
conflicting_cost = 0
|
|
for item in conflicting_items.values():
|
|
conflicting_fees += item.fee
|
|
conflicting_cost += item.cost
|
|
|
|
# All coins spent in all conflicting items must also be spent in
|
|
# the new item
|
|
for coin in item.removals:
|
|
if coin.name() not in removals:
|
|
log.debug(f"Rejecting conflicting tx as it does not spend conflicting coin {coin.name()}")
|
|
return False
|
|
|
|
# New item must have higher fee per cost
|
|
conflicting_fees_per_cost = conflicting_fees / conflicting_cost
|
|
if fees_per_cost <= conflicting_fees_per_cost:
|
|
log.debug(
|
|
f"Rejecting conflicting tx due to not increasing fees per cost "
|
|
f"({fees_per_cost} <= {conflicting_fees_per_cost})"
|
|
)
|
|
return False
|
|
|
|
# New item must increase the total fee at least by a certain amount
|
|
fee_increase = fees - conflicting_fees
|
|
if fee_increase < self.get_min_fee_increase():
|
|
log.debug(f"Rejecting conflicting tx due to low fee increase ({fee_increase})")
|
|
return False
|
|
|
|
log.info(f"Replacing conflicting tx in mempool. New tx fee: {fees}, old tx fees: {conflicting_fees}")
|
|
return True
|
|
|
|
async def pre_validate_spendbundle(self, new_spend: SpendBundle) -> NPCResult:
|
|
"""
|
|
Errors are included within the cached_result.
|
|
This runs in another process so we don't block the main thread
|
|
"""
|
|
start_time = time.time()
|
|
cached_result_bytes = await asyncio.get_running_loop().run_in_executor(
|
|
self.pool, get_npc_multiprocess, bytes(new_spend), self.constants.MAX_BLOCK_COST_CLVM
|
|
)
|
|
end_time = time.time()
|
|
log.info(f"It took {end_time - start_time} to pre validate transaction")
|
|
return NPCResult.from_bytes(cached_result_bytes)
|
|
|
|
async def add_spendbundle(
|
|
self,
|
|
new_spend: SpendBundle,
|
|
npc_result: NPCResult,
|
|
spend_name: bytes32,
|
|
validate_signature=True,
|
|
program: Optional[SerializedProgram] = None,
|
|
) -> Tuple[Optional[uint64], MempoolInclusionStatus, Optional[Err]]:
|
|
"""
|
|
Tries to add spend bundle to the mempool
|
|
Returns the cost (if SUCCESS), the result (MempoolInclusion status), and an optional error
|
|
"""
|
|
start_time = time.time()
|
|
if self.peak is None:
|
|
return None, MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED
|
|
|
|
npc_list = npc_result.npc_list
|
|
if program is None:
|
|
program = simple_solution_generator(new_spend).program
|
|
cost = calculate_cost_of_program(program, npc_result, self.constants.COST_PER_BYTE)
|
|
|
|
log.debug(f"Cost: {cost}")
|
|
|
|
if cost > int(self.limit_factor * self.constants.MAX_BLOCK_COST_CLVM):
|
|
return None, MempoolInclusionStatus.FAILED, Err.BLOCK_COST_EXCEEDS_MAX
|
|
|
|
if npc_result.error is not None:
|
|
return None, MempoolInclusionStatus.FAILED, Err(npc_result.error)
|
|
# build removal list
|
|
removal_names: List[bytes32] = [npc.coin_name for npc in npc_list]
|
|
|
|
additions = additions_for_npc(npc_list)
|
|
|
|
additions_dict: Dict[bytes32, Coin] = {}
|
|
for add in additions:
|
|
additions_dict[add.name()] = add
|
|
|
|
addition_amount = uint64(0)
|
|
# Check additions for max coin amount
|
|
for coin in additions:
|
|
if coin.amount < 0:
|
|
return (
|
|
None,
|
|
MempoolInclusionStatus.FAILED,
|
|
Err.COIN_AMOUNT_NEGATIVE,
|
|
)
|
|
if coin.amount > self.constants.MAX_COIN_AMOUNT:
|
|
return (
|
|
None,
|
|
MempoolInclusionStatus.FAILED,
|
|
Err.COIN_AMOUNT_EXCEEDS_MAXIMUM,
|
|
)
|
|
addition_amount = uint64(addition_amount + coin.amount)
|
|
# Check for duplicate outputs
|
|
addition_counter = collections.Counter(_.name() for _ in additions)
|
|
for k, v in addition_counter.items():
|
|
if v > 1:
|
|
return None, MempoolInclusionStatus.FAILED, Err.DUPLICATE_OUTPUT
|
|
# Check for duplicate inputs
|
|
removal_counter = collections.Counter(name for name in removal_names)
|
|
for k, v in removal_counter.items():
|
|
if v > 1:
|
|
return None, MempoolInclusionStatus.FAILED, Err.DOUBLE_SPEND
|
|
# Skip if already added
|
|
if spend_name in self.mempool.spends:
|
|
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
|
|
|
|
removal_record_dict: Dict[bytes32, CoinRecord] = {}
|
|
removal_coin_dict: Dict[bytes32, Coin] = {}
|
|
removal_amount = uint64(0)
|
|
for name in removal_names:
|
|
removal_record = await self.coin_store.get_coin_record(name)
|
|
if removal_record is None and name not in additions_dict:
|
|
return None, MempoolInclusionStatus.FAILED, Err.UNKNOWN_UNSPENT
|
|
elif name in additions_dict:
|
|
removal_coin = additions_dict[name]
|
|
# TODO(straya): what timestamp to use here?
|
|
assert self.peak.timestamp is not None
|
|
removal_record = CoinRecord(
|
|
removal_coin,
|
|
uint32(self.peak.height + 1), # In mempool, so will be included in next height
|
|
uint32(0),
|
|
False,
|
|
False,
|
|
uint64(self.peak.timestamp + 1),
|
|
)
|
|
|
|
assert removal_record is not None
|
|
removal_amount = uint64(removal_amount + removal_record.coin.amount)
|
|
removal_record_dict[name] = removal_record
|
|
removal_coin_dict[name] = removal_record.coin
|
|
|
|
removals: List[Coin] = [coin for coin in removal_coin_dict.values()]
|
|
|
|
if addition_amount > removal_amount:
|
|
print(addition_amount, removal_amount)
|
|
return None, MempoolInclusionStatus.FAILED, Err.MINTING_COIN
|
|
|
|
fees = uint64(removal_amount - addition_amount)
|
|
assert_fee_sum: uint64 = uint64(0)
|
|
|
|
for npc in npc_list:
|
|
if ConditionOpcode.RESERVE_FEE in npc.condition_dict:
|
|
fee_list: List[ConditionWithArgs] = npc.condition_dict[ConditionOpcode.RESERVE_FEE]
|
|
for cvp in fee_list:
|
|
fee = int_from_bytes(cvp.vars[0])
|
|
if fee < 0:
|
|
return None, MempoolInclusionStatus.FAILED, Err.RESERVE_FEE_CONDITION_FAILED
|
|
assert_fee_sum = assert_fee_sum + fee
|
|
if fees < assert_fee_sum:
|
|
return (
|
|
None,
|
|
MempoolInclusionStatus.FAILED,
|
|
Err.RESERVE_FEE_CONDITION_FAILED,
|
|
)
|
|
|
|
if cost == 0:
|
|
return None, MempoolInclusionStatus.FAILED, Err.UNKNOWN
|
|
|
|
fees_per_cost: float = fees / cost
|
|
# If pool is at capacity check the fee, if not then accept even without the fee
|
|
if self.mempool.at_full_capacity(cost):
|
|
if fees_per_cost < self.nonzero_fee_minimum_fpc:
|
|
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_TOO_CLOSE_TO_ZERO
|
|
if fees_per_cost <= self.mempool.get_min_fee_rate(cost):
|
|
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_LOW_FEE
|
|
# Check removals against UnspentDB + DiffStore + Mempool + SpendBundle
|
|
# Use this information later when constructing a block
|
|
fail_reason, conflicts = await self.check_removals(removal_record_dict)
|
|
# If there is a mempool conflict check if this spendbundle has a higher fee per cost than all others
|
|
tmp_error: Optional[Err] = None
|
|
conflicting_pool_items: Dict[bytes32, MempoolItem] = {}
|
|
if fail_reason is Err.MEMPOOL_CONFLICT:
|
|
for conflicting in conflicts:
|
|
sb: MempoolItem = self.mempool.removals[conflicting.name()]
|
|
conflicting_pool_items[sb.name] = sb
|
|
if not self.can_replace(conflicting_pool_items, removal_record_dict, fees, fees_per_cost):
|
|
potential = MempoolItem(
|
|
new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program
|
|
)
|
|
self.add_to_potential_tx_set(potential)
|
|
return (
|
|
uint64(cost),
|
|
MempoolInclusionStatus.PENDING,
|
|
Err.MEMPOOL_CONFLICT,
|
|
)
|
|
|
|
elif fail_reason:
|
|
return None, MempoolInclusionStatus.FAILED, fail_reason
|
|
|
|
if tmp_error:
|
|
return None, MempoolInclusionStatus.FAILED, tmp_error
|
|
|
|
# Verify conditions, create hash_key list for aggsig check
|
|
pks: List[G1Element] = []
|
|
msgs: List[bytes32] = []
|
|
error: Optional[Err] = None
|
|
coin_announcements_in_spend: Set[bytes32] = coin_announcements_names_for_npc(npc_list)
|
|
puzzle_announcements_in_spend: Set[bytes32] = puzzle_announcements_names_for_npc(npc_list)
|
|
for npc in npc_list:
|
|
coin_record: CoinRecord = removal_record_dict[npc.coin_name]
|
|
# Check that the revealed removal puzzles actually match the puzzle hash
|
|
if npc.puzzle_hash != coin_record.coin.puzzle_hash:
|
|
log.warning("Mempool rejecting transaction because of wrong puzzle_hash")
|
|
log.warning(f"{npc.puzzle_hash} != {coin_record.coin.puzzle_hash}")
|
|
return None, MempoolInclusionStatus.FAILED, Err.WRONG_PUZZLE_HASH
|
|
|
|
chialisp_height = (
|
|
self.peak.prev_transaction_block_height if not self.peak.is_transaction_block else self.peak.height
|
|
)
|
|
assert self.peak.timestamp is not None
|
|
error = mempool_check_conditions_dict(
|
|
coin_record,
|
|
coin_announcements_in_spend,
|
|
puzzle_announcements_in_spend,
|
|
npc.condition_dict,
|
|
uint32(chialisp_height),
|
|
self.peak.timestamp,
|
|
)
|
|
|
|
if error:
|
|
if error is Err.ASSERT_HEIGHT_ABSOLUTE_FAILED or error is Err.ASSERT_HEIGHT_RELATIVE_FAILED:
|
|
potential = MempoolItem(
|
|
new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program
|
|
)
|
|
self.add_to_potential_tx_set(potential)
|
|
return uint64(cost), MempoolInclusionStatus.PENDING, error
|
|
break
|
|
|
|
if validate_signature:
|
|
for pk, message in pkm_pairs_for_conditions_dict(
|
|
npc.condition_dict, npc.coin_name, self.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
|
):
|
|
pks.append(pk)
|
|
msgs.append(message)
|
|
if error:
|
|
return None, MempoolInclusionStatus.FAILED, error
|
|
|
|
if validate_signature:
|
|
# Verify aggregated signature
|
|
if not AugSchemeMPL.aggregate_verify(pks, msgs, new_spend.aggregated_signature):
|
|
log.warning(f"Aggsig validation error {pks} {msgs} {new_spend}")
|
|
return None, MempoolInclusionStatus.FAILED, Err.BAD_AGGREGATE_SIGNATURE
|
|
# Remove all conflicting Coins and SpendBundles
|
|
if fail_reason:
|
|
mempool_item: MempoolItem
|
|
for mempool_item in conflicting_pool_items.values():
|
|
self.mempool.remove_from_pool(mempool_item)
|
|
|
|
new_item = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program)
|
|
self.mempool.add_to_pool(new_item, additions, removal_coin_dict)
|
|
log.info(
|
|
f"add_spendbundle took {time.time() - start_time} seconds, cost {cost} "
|
|
f"({round(100.0 * cost/self.constants.MAX_BLOCK_COST_CLVM, 3)}%)"
|
|
)
|
|
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
|
|
|
|
async def check_removals(self, removals: Dict[bytes32, CoinRecord]) -> Tuple[Optional[Err], List[Coin]]:
|
|
"""
|
|
This function checks for double spends, unknown spends and conflicting transactions in mempool.
|
|
Returns Error (if any), dictionary of Unspents, list of coins with conflict errors (if any any).
|
|
Note that additions are not checked for duplicates, because having duplicate additions requires also
|
|
having duplicate removals.
|
|
"""
|
|
assert self.peak is not None
|
|
conflicts: List[Coin] = []
|
|
|
|
for record in removals.values():
|
|
removal = record.coin
|
|
# 1. Checks if it's been spent already
|
|
if record.spent == 1:
|
|
return Err.DOUBLE_SPEND, []
|
|
# 2. Checks if there's a mempool conflict
|
|
if removal.name() in self.mempool.removals:
|
|
conflicts.append(removal)
|
|
|
|
if len(conflicts) > 0:
|
|
return Err.MEMPOOL_CONFLICT, conflicts
|
|
# 5. If coins can be spent return list of unspents as we see them in local storage
|
|
return None, []
|
|
|
|
def add_to_potential_tx_set(self, item: MempoolItem):
|
|
"""
|
|
Adds SpendBundles that have failed to be added to the pool in potential tx set.
|
|
This is later used to retry to add them.
|
|
"""
|
|
if item.spend_bundle_name in self.potential_txs:
|
|
return None
|
|
|
|
self.potential_txs[item.spend_bundle_name] = item
|
|
self.potential_cache_cost += item.cost
|
|
|
|
while self.potential_cache_cost > self.potential_cache_max_total_cost:
|
|
first_in = list(self.potential_txs.keys())[0]
|
|
self.potential_cache_max_total_cost -= self.potential_txs[first_in].cost
|
|
self.potential_txs.pop(first_in)
|
|
|
|
def get_spendbundle(self, bundle_hash: bytes32) -> Optional[SpendBundle]:
|
|
"""Returns a full SpendBundle if it's inside one the mempools"""
|
|
if bundle_hash in self.mempool.spends:
|
|
return self.mempool.spends[bundle_hash].spend_bundle
|
|
return None
|
|
|
|
def get_mempool_item(self, bundle_hash: bytes32) -> Optional[MempoolItem]:
|
|
"""Returns a MempoolItem if it's inside one the mempools"""
|
|
if bundle_hash in self.mempool.spends:
|
|
return self.mempool.spends[bundle_hash]
|
|
return None
|
|
|
|
async def new_peak(self, new_peak: Optional[BlockRecord]) -> List[Tuple[SpendBundle, NPCResult, bytes32]]:
|
|
"""
|
|
Called when a new peak is available, we try to recreate a mempool for the new tip.
|
|
"""
|
|
if new_peak is None:
|
|
return []
|
|
if new_peak.is_transaction_block is False:
|
|
return []
|
|
if self.peak == new_peak:
|
|
return []
|
|
assert new_peak.timestamp is not None
|
|
if new_peak.timestamp <= self.constants.INITIAL_FREEZE_END_TIMESTAMP:
|
|
return []
|
|
|
|
self.peak = new_peak
|
|
|
|
old_pool = self.mempool
|
|
async with self.lock:
|
|
self.mempool = Mempool(self.mempool_max_total_cost)
|
|
|
|
for item in old_pool.spends.values():
|
|
_, result, _ = await self.add_spendbundle(
|
|
item.spend_bundle, item.npc_result, item.spend_bundle_name, False, item.program
|
|
)
|
|
# If the spend bundle was confirmed or conflicting (can no longer be in mempool), it won't be
|
|
# successfully added to the new mempool. In this case, remove it from seen, so in the case of a reorg,
|
|
# it can be resubmitted
|
|
if result != MempoolInclusionStatus.SUCCESS:
|
|
self.remove_seen(item.spend_bundle_name)
|
|
|
|
potential_txs_copy = self.potential_txs.copy()
|
|
self.potential_txs = {}
|
|
txs_added = []
|
|
for item in potential_txs_copy.values():
|
|
cost, status, error = await self.add_spendbundle(
|
|
item.spend_bundle, item.npc_result, item.spend_bundle_name, program=item.program
|
|
)
|
|
if status == MempoolInclusionStatus.SUCCESS:
|
|
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))
|
|
log.info(
|
|
f"Size of mempool: {len(self.mempool.spends)} spends, cost: {self.mempool.total_mempool_cost} "
|
|
f"minimum fee to get in: {self.mempool.get_min_fee_rate(100000)}"
|
|
)
|
|
return txs_added
|
|
|
|
async def get_items_not_in_filter(self, mempool_filter: PyBIP158, limit: int = 100) -> List[MempoolItem]:
|
|
items: List[MempoolItem] = []
|
|
counter = 0
|
|
broke_from_inner_loop = False
|
|
|
|
# Send 100 with highest fee per cost
|
|
for dic in self.mempool.sorted_spends.values():
|
|
if broke_from_inner_loop:
|
|
break
|
|
for item in dic.values():
|
|
if counter == limit:
|
|
broke_from_inner_loop = True
|
|
break
|
|
if mempool_filter.Match(bytearray(item.spend_bundle_name)):
|
|
continue
|
|
items.append(item)
|
|
counter += 1
|
|
|
|
return items
|