659 lines
26 KiB
Python
659 lines
26 KiB
Python
import logging
|
|
import math
|
|
import time
|
|
from asyncio import Lock
|
|
from random import choice, randrange
|
|
from secrets import randbits
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
|
|
from chia.types.peer_info import PeerInfo, TimestampedPeerInfo
|
|
from chia.util.hash import std_hash
|
|
from chia.util.ints import uint16, uint64
|
|
|
|
TRIED_BUCKETS_PER_GROUP = 8
|
|
NEW_BUCKETS_PER_SOURCE_GROUP = 64
|
|
TRIED_BUCKET_COUNT = 256
|
|
NEW_BUCKET_COUNT = 1024
|
|
BUCKET_SIZE = 64
|
|
TRIED_COLLISION_SIZE = 10
|
|
NEW_BUCKETS_PER_ADDRESS = 8
|
|
LOG_TRIED_BUCKET_COUNT = 3
|
|
LOG_NEW_BUCKET_COUNT = 10
|
|
LOG_BUCKET_SIZE = 6
|
|
HORIZON_DAYS = 30
|
|
MAX_RETRIES = 3
|
|
MIN_FAIL_DAYS = 7
|
|
MAX_FAILURES = 10
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# This is a Python port from 'CAddrInfo' class from Bitcoin core code.
|
|
class ExtendedPeerInfo:
|
|
def __init__(
|
|
self,
|
|
addr: TimestampedPeerInfo,
|
|
src_peer: Optional[PeerInfo],
|
|
):
|
|
self.peer_info: PeerInfo = PeerInfo(
|
|
addr.host,
|
|
addr.port,
|
|
)
|
|
self.timestamp: int = addr.timestamp
|
|
self.src: Optional[PeerInfo] = src_peer
|
|
if src_peer is None:
|
|
self.src = self.peer_info
|
|
self.random_pos: Optional[int] = None
|
|
self.is_tried: bool = False
|
|
self.ref_count: int = 0
|
|
self.last_success: int = 0
|
|
self.last_try: int = 0
|
|
self.num_attempts: int = 0
|
|
self.last_count_attempt: int = 0
|
|
|
|
def to_string(self) -> str:
|
|
assert self.src is not None
|
|
out = (
|
|
self.peer_info.host
|
|
+ " "
|
|
+ str(int(self.peer_info.port))
|
|
+ " "
|
|
+ str(int(self.timestamp))
|
|
+ " "
|
|
+ self.src.host
|
|
+ " "
|
|
+ str(int(self.src.port))
|
|
)
|
|
return out
|
|
|
|
@classmethod
|
|
def from_string(cls, peer_str: str):
|
|
blobs = peer_str.split(" ")
|
|
assert len(blobs) == 5
|
|
peer_info = TimestampedPeerInfo(blobs[0], uint16(int(blobs[1])), uint64(int(blobs[2])))
|
|
src_peer = PeerInfo(blobs[3], uint16(int(blobs[4])))
|
|
return cls(peer_info, src_peer)
|
|
|
|
def get_tried_bucket(self, key: int) -> int:
|
|
hash1 = int.from_bytes(
|
|
bytes(std_hash(key.to_bytes(32, byteorder="big") + self.peer_info.get_key())[:8]),
|
|
byteorder="big",
|
|
)
|
|
hash1 = hash1 % TRIED_BUCKETS_PER_GROUP
|
|
hash2 = int.from_bytes(
|
|
bytes(std_hash(key.to_bytes(32, byteorder="big") + self.peer_info.get_group() + bytes([hash1]))[:8]),
|
|
byteorder="big",
|
|
)
|
|
return hash2 % TRIED_BUCKET_COUNT
|
|
|
|
def get_new_bucket(self, key: int, src_peer: Optional[PeerInfo] = None) -> int:
|
|
if src_peer is None:
|
|
src_peer = self.src
|
|
assert src_peer is not None
|
|
hash1 = int.from_bytes(
|
|
bytes(std_hash(key.to_bytes(32, byteorder="big") + self.peer_info.get_group() + src_peer.get_group())[:8]),
|
|
byteorder="big",
|
|
)
|
|
hash1 = hash1 % NEW_BUCKETS_PER_SOURCE_GROUP
|
|
hash2 = int.from_bytes(
|
|
bytes(std_hash(key.to_bytes(32, byteorder="big") + src_peer.get_group() + bytes([hash1]))[:8]),
|
|
byteorder="big",
|
|
)
|
|
return hash2 % NEW_BUCKET_COUNT
|
|
|
|
def get_bucket_position(self, key: int, is_new: bool, nBucket: int) -> int:
|
|
ch = "N" if is_new else "K"
|
|
hash1 = int.from_bytes(
|
|
bytes(
|
|
std_hash(
|
|
key.to_bytes(32, byteorder="big")
|
|
+ ch.encode()
|
|
+ nBucket.to_bytes(3, byteorder="big")
|
|
+ self.peer_info.get_key()
|
|
)[:8]
|
|
),
|
|
byteorder="big",
|
|
)
|
|
return hash1 % BUCKET_SIZE
|
|
|
|
def is_terrible(self, now: Optional[int] = None) -> bool:
|
|
if now is None:
|
|
now = int(math.floor(time.time()))
|
|
# never remove things tried in the last minute
|
|
if self.last_try > 0 and self.last_try >= now - 60:
|
|
return False
|
|
|
|
# came in a flying DeLorean
|
|
if self.timestamp > now + 10 * 60:
|
|
return True
|
|
|
|
# not seen in recent history
|
|
if self.timestamp == 0 or now - self.timestamp > HORIZON_DAYS * 24 * 60 * 60:
|
|
return True
|
|
|
|
# tried N times and never a success
|
|
if self.last_success == 0 and self.num_attempts >= MAX_RETRIES:
|
|
return True
|
|
|
|
# N successive failures in the last week
|
|
if now - self.last_success > MIN_FAIL_DAYS * 24 * 60 * 60 and self.num_attempts >= MAX_FAILURES:
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_selection_chance(self, now: Optional[int] = None):
|
|
if now is None:
|
|
now = int(math.floor(time.time()))
|
|
chance = 1.0
|
|
since_last_try = max(now - self.last_try, 0)
|
|
# deprioritize very recent attempts away
|
|
if since_last_try < 60 * 10:
|
|
chance *= 0.01
|
|
|
|
# deprioritize 66% after each failed attempt,
|
|
# but at most 1/28th to avoid the search taking forever or overly penalizing outages.
|
|
chance *= pow(0.66, min(self.num_attempts, 8))
|
|
return chance
|
|
|
|
|
|
# This is a Python port from 'CAddrMan' class from Bitcoin core code.
|
|
class AddressManager:
|
|
id_count: int
|
|
key: int
|
|
random_pos: List[int]
|
|
tried_matrix: List[List[int]]
|
|
new_matrix: List[List[int]]
|
|
tried_count: int
|
|
new_count: int
|
|
map_addr: Dict[str, int]
|
|
map_info: Dict[int, ExtendedPeerInfo]
|
|
last_good: int
|
|
tried_collisions: List[int]
|
|
used_new_matrix_positions: Set[Tuple[int, int]]
|
|
used_tried_matrix_positions: Set[Tuple[int, int]]
|
|
allow_private_subnets: bool
|
|
|
|
def __init__(self) -> None:
|
|
self.clear()
|
|
self.lock: Lock = Lock()
|
|
|
|
def clear(self) -> None:
|
|
self.id_count = 0
|
|
self.key = randbits(256)
|
|
self.random_pos = []
|
|
self.tried_matrix = [[-1 for x in range(BUCKET_SIZE)] for y in range(TRIED_BUCKET_COUNT)]
|
|
self.new_matrix = [[-1 for x in range(BUCKET_SIZE)] for y in range(NEW_BUCKET_COUNT)]
|
|
self.tried_count = 0
|
|
self.new_count = 0
|
|
self.map_addr = {}
|
|
self.map_info = {}
|
|
self.last_good = 1
|
|
self.tried_collisions = []
|
|
self.used_new_matrix_positions = set()
|
|
self.used_tried_matrix_positions = set()
|
|
self.allow_private_subnets = False
|
|
|
|
def make_private_subnets_valid(self) -> None:
|
|
self.allow_private_subnets = True
|
|
|
|
# Use only this method for modifying new matrix.
|
|
def _set_new_matrix(self, row: int, col: int, value: int) -> None:
|
|
self.new_matrix[row][col] = value
|
|
if value == -1:
|
|
if (row, col) in self.used_new_matrix_positions:
|
|
self.used_new_matrix_positions.remove((row, col))
|
|
else:
|
|
if (row, col) not in self.used_new_matrix_positions:
|
|
self.used_new_matrix_positions.add((row, col))
|
|
|
|
# Use only this method for modifying tried matrix.
|
|
def _set_tried_matrix(self, row: int, col: int, value: int) -> None:
|
|
self.tried_matrix[row][col] = value
|
|
if value == -1:
|
|
if (row, col) in self.used_tried_matrix_positions:
|
|
self.used_tried_matrix_positions.remove((row, col))
|
|
else:
|
|
if (row, col) not in self.used_tried_matrix_positions:
|
|
self.used_tried_matrix_positions.add((row, col))
|
|
|
|
def load_used_table_positions(self) -> None:
|
|
self.used_new_matrix_positions = set()
|
|
self.used_tried_matrix_positions = set()
|
|
for bucket in range(NEW_BUCKET_COUNT):
|
|
for pos in range(BUCKET_SIZE):
|
|
if self.new_matrix[bucket][pos] != -1:
|
|
self.used_new_matrix_positions.add((bucket, pos))
|
|
for bucket in range(TRIED_BUCKET_COUNT):
|
|
for pos in range(BUCKET_SIZE):
|
|
if self.tried_matrix[bucket][pos] != -1:
|
|
self.used_tried_matrix_positions.add((bucket, pos))
|
|
|
|
def create_(self, addr: TimestampedPeerInfo, addr_src: Optional[PeerInfo]) -> Tuple[ExtendedPeerInfo, int]:
|
|
self.id_count += 1
|
|
node_id = self.id_count
|
|
self.map_info[node_id] = ExtendedPeerInfo(addr, addr_src)
|
|
self.map_addr[addr.host] = node_id
|
|
self.map_info[node_id].random_pos = len(self.random_pos)
|
|
self.random_pos.append(node_id)
|
|
return (self.map_info[node_id], node_id)
|
|
|
|
def find_(self, addr: PeerInfo) -> Tuple[Optional[ExtendedPeerInfo], Optional[int]]:
|
|
if addr.host not in self.map_addr:
|
|
return (None, None)
|
|
node_id = self.map_addr[addr.host]
|
|
if node_id not in self.map_info:
|
|
return (None, node_id)
|
|
return (self.map_info[node_id], node_id)
|
|
|
|
def swap_random_(self, rand_pos_1: int, rand_pos_2: int) -> None:
|
|
if rand_pos_1 == rand_pos_2:
|
|
return None
|
|
assert rand_pos_1 < len(self.random_pos) and rand_pos_2 < len(self.random_pos)
|
|
node_id_1 = self.random_pos[rand_pos_1]
|
|
node_id_2 = self.random_pos[rand_pos_2]
|
|
self.map_info[node_id_1].random_pos = rand_pos_2
|
|
self.map_info[node_id_2].random_pos = rand_pos_1
|
|
self.random_pos[rand_pos_1] = node_id_2
|
|
self.random_pos[rand_pos_2] = node_id_1
|
|
|
|
def make_tried_(self, info: ExtendedPeerInfo, node_id: int) -> None:
|
|
for bucket in range(NEW_BUCKET_COUNT):
|
|
pos = info.get_bucket_position(self.key, True, bucket)
|
|
if self.new_matrix[bucket][pos] == node_id:
|
|
self._set_new_matrix(bucket, pos, -1)
|
|
info.ref_count -= 1
|
|
assert info.ref_count == 0
|
|
self.new_count -= 1
|
|
cur_bucket = info.get_tried_bucket(self.key)
|
|
cur_bucket_pos = info.get_bucket_position(self.key, False, cur_bucket)
|
|
if self.tried_matrix[cur_bucket][cur_bucket_pos] != -1:
|
|
# Evict the old node from the tried table.
|
|
node_id_evict = self.tried_matrix[cur_bucket][cur_bucket_pos]
|
|
assert node_id_evict in self.map_info
|
|
old_info = self.map_info[node_id_evict]
|
|
old_info.is_tried = False
|
|
self._set_tried_matrix(cur_bucket, cur_bucket_pos, -1)
|
|
self.tried_count -= 1
|
|
# Find its position into new table.
|
|
new_bucket = old_info.get_new_bucket(self.key)
|
|
new_bucket_pos = old_info.get_bucket_position(self.key, True, new_bucket)
|
|
self.clear_new_(new_bucket, new_bucket_pos)
|
|
old_info.ref_count = 1
|
|
self._set_new_matrix(new_bucket, new_bucket_pos, node_id_evict)
|
|
self.new_count += 1
|
|
self._set_tried_matrix(cur_bucket, cur_bucket_pos, node_id)
|
|
self.tried_count += 1
|
|
info.is_tried = True
|
|
|
|
def clear_new_(self, bucket: int, pos: int) -> None:
|
|
if self.new_matrix[bucket][pos] != -1:
|
|
delete_id = self.new_matrix[bucket][pos]
|
|
delete_info = self.map_info[delete_id]
|
|
assert delete_info.ref_count > 0
|
|
delete_info.ref_count -= 1
|
|
self._set_new_matrix(bucket, pos, -1)
|
|
if delete_info.ref_count == 0:
|
|
self.delete_new_entry_(delete_id)
|
|
|
|
def mark_good_(self, addr: PeerInfo, test_before_evict: bool, timestamp: int) -> None:
|
|
self.last_good = timestamp
|
|
(info, node_id) = self.find_(addr)
|
|
if not addr.is_valid(self.allow_private_subnets):
|
|
return None
|
|
if info is None:
|
|
return None
|
|
if node_id is None:
|
|
return None
|
|
|
|
if not (info.peer_info.host == addr.host and info.peer_info.port == addr.port):
|
|
return None
|
|
|
|
# update info
|
|
info.last_success = timestamp
|
|
info.last_try = timestamp
|
|
info.num_attempts = 0
|
|
# timestamp is not updated here, to avoid leaking information about
|
|
# currently-connected peers.
|
|
|
|
# if it is already in the tried set, don't do anything else
|
|
if info.is_tried:
|
|
return None
|
|
|
|
# find a bucket it is in now
|
|
bucket_rand = randrange(NEW_BUCKET_COUNT)
|
|
new_bucket = -1
|
|
for n in range(NEW_BUCKET_COUNT):
|
|
cur_new_bucket = (n + bucket_rand) % NEW_BUCKET_COUNT
|
|
cur_new_bucket_pos = info.get_bucket_position(self.key, True, cur_new_bucket)
|
|
if self.new_matrix[cur_new_bucket][cur_new_bucket_pos] == node_id:
|
|
new_bucket = cur_new_bucket
|
|
break
|
|
|
|
# if no bucket is found, something bad happened;
|
|
if new_bucket == -1:
|
|
return None
|
|
|
|
# NOTE(Florin): Double check this. It's not used anywhere else.
|
|
|
|
# which tried bucket to move the entry to
|
|
tried_bucket = info.get_tried_bucket(self.key)
|
|
tried_bucket_pos = info.get_bucket_position(self.key, False, tried_bucket)
|
|
|
|
# Will moving this address into tried evict another entry?
|
|
if test_before_evict and self.tried_matrix[tried_bucket][tried_bucket_pos] != -1:
|
|
if len(self.tried_collisions) < TRIED_COLLISION_SIZE:
|
|
if node_id not in self.tried_collisions:
|
|
self.tried_collisions.append(node_id)
|
|
else:
|
|
self.make_tried_(info, node_id)
|
|
|
|
def delete_new_entry_(self, node_id: int) -> None:
|
|
info = self.map_info[node_id]
|
|
if info is None or info.random_pos is None:
|
|
return None
|
|
self.swap_random_(info.random_pos, len(self.random_pos) - 1)
|
|
self.random_pos = self.random_pos[:-1]
|
|
del self.map_addr[info.peer_info.host]
|
|
del self.map_info[node_id]
|
|
self.new_count -= 1
|
|
|
|
def add_to_new_table_(self, addr: TimestampedPeerInfo, source: Optional[PeerInfo], penalty: int) -> bool:
|
|
is_unique = False
|
|
peer_info = PeerInfo(
|
|
addr.host,
|
|
addr.port,
|
|
)
|
|
if not peer_info.is_valid(self.allow_private_subnets):
|
|
return False
|
|
(info, node_id) = self.find_(peer_info)
|
|
if info is not None and info.peer_info.host == addr.host and info.peer_info.port == addr.port:
|
|
penalty = 0
|
|
|
|
if info is not None:
|
|
# periodically update timestamp
|
|
currently_online = time.time() - addr.timestamp < 24 * 60 * 60
|
|
update_interval = 60 * 60 if currently_online else 24 * 60 * 60
|
|
if addr.timestamp > 0 and (
|
|
info.timestamp > 0 or info.timestamp < addr.timestamp - update_interval - penalty
|
|
):
|
|
info.timestamp = max(0, addr.timestamp - penalty)
|
|
|
|
# do not update if no new information is present
|
|
if addr.timestamp == 0 or (info.timestamp > 0 and addr.timestamp <= info.timestamp):
|
|
return False
|
|
|
|
# do not update if the entry was already in the "tried" table
|
|
if info.is_tried:
|
|
return False
|
|
|
|
# do not update if the max reference count is reached
|
|
if info.ref_count == NEW_BUCKETS_PER_ADDRESS:
|
|
return False
|
|
|
|
# stochastic test: previous ref_count == N: 2^N times harder to increase it
|
|
factor = 1 << info.ref_count
|
|
if factor > 1 and randrange(factor) != 0:
|
|
return False
|
|
else:
|
|
(info, node_id) = self.create_(addr, source)
|
|
info.timestamp = max(0, info.timestamp - penalty)
|
|
self.new_count += 1
|
|
is_unique = True
|
|
|
|
new_bucket = info.get_new_bucket(self.key, source)
|
|
new_bucket_pos = info.get_bucket_position(self.key, True, new_bucket)
|
|
if self.new_matrix[new_bucket][new_bucket_pos] != node_id:
|
|
add_to_new = self.new_matrix[new_bucket][new_bucket_pos] == -1
|
|
if not add_to_new:
|
|
info_existing = self.map_info[self.new_matrix[new_bucket][new_bucket_pos]]
|
|
if info_existing.is_terrible() or (info_existing.ref_count > 1 and info.ref_count == 0):
|
|
add_to_new = True
|
|
if add_to_new:
|
|
self.clear_new_(new_bucket, new_bucket_pos)
|
|
info.ref_count += 1
|
|
if node_id is not None:
|
|
self._set_new_matrix(new_bucket, new_bucket_pos, node_id)
|
|
else:
|
|
if info.ref_count == 0:
|
|
if node_id is not None:
|
|
self.delete_new_entry_(node_id)
|
|
return is_unique
|
|
|
|
def attempt_(self, addr: PeerInfo, count_failures: bool, timestamp: int) -> None:
|
|
info, _ = self.find_(addr)
|
|
if info is None:
|
|
return None
|
|
|
|
if not (info.peer_info.host == addr.host and info.peer_info.port == addr.port):
|
|
return None
|
|
|
|
info.last_try = timestamp
|
|
if count_failures and info.last_count_attempt < self.last_good:
|
|
info.last_count_attempt = timestamp
|
|
info.num_attempts += 1
|
|
|
|
def select_peer_(self, new_only: bool) -> Optional[ExtendedPeerInfo]:
|
|
if len(self.random_pos) == 0:
|
|
return None
|
|
|
|
if new_only and self.new_count == 0:
|
|
return None
|
|
|
|
# Use a 50% chance for choosing between tried and new table entries.
|
|
if not new_only and self.tried_count > 0 and (self.new_count == 0 or randrange(2) == 0):
|
|
chance = 1.0
|
|
start = time.time()
|
|
cached_tried_matrix_positions: List[Tuple[int, int]] = []
|
|
if len(self.used_tried_matrix_positions) < math.sqrt(TRIED_BUCKET_COUNT * BUCKET_SIZE):
|
|
cached_tried_matrix_positions = list(self.used_tried_matrix_positions)
|
|
while True:
|
|
if len(self.used_tried_matrix_positions) < math.sqrt(TRIED_BUCKET_COUNT * BUCKET_SIZE):
|
|
if len(self.used_tried_matrix_positions) == 0:
|
|
log.error(f"Empty tried table, but tried_count shows {self.tried_count}.")
|
|
return None
|
|
# The table is sparse, randomly pick from positions list.
|
|
index = randrange(len(cached_tried_matrix_positions))
|
|
tried_bucket, tried_bucket_pos = cached_tried_matrix_positions[index]
|
|
else:
|
|
# The table is dense, randomly trying positions is faster than loading positions list.
|
|
tried_bucket = randrange(TRIED_BUCKET_COUNT)
|
|
tried_bucket_pos = randrange(BUCKET_SIZE)
|
|
while self.tried_matrix[tried_bucket][tried_bucket_pos] == -1:
|
|
tried_bucket = (tried_bucket + randbits(LOG_TRIED_BUCKET_COUNT)) % TRIED_BUCKET_COUNT
|
|
tried_bucket_pos = (tried_bucket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
|
|
|
|
node_id = self.tried_matrix[tried_bucket][tried_bucket_pos]
|
|
assert node_id != -1
|
|
info = self.map_info[node_id]
|
|
if randbits(30) < (chance * info.get_selection_chance() * (1 << 30)):
|
|
end = time.time()
|
|
log.debug(f"address_manager.select_peer took {(end - start):.2e} seconds in tried table.")
|
|
return info
|
|
chance *= 1.2
|
|
else:
|
|
chance = 1.0
|
|
start = time.time()
|
|
cached_new_matrix_positions: List[Tuple[int, int]] = []
|
|
if len(self.used_new_matrix_positions) < math.sqrt(NEW_BUCKET_COUNT * BUCKET_SIZE):
|
|
cached_new_matrix_positions = list(self.used_new_matrix_positions)
|
|
while True:
|
|
if len(self.used_new_matrix_positions) < math.sqrt(NEW_BUCKET_COUNT * BUCKET_SIZE):
|
|
if len(self.used_new_matrix_positions) == 0:
|
|
log.error(f"Empty new table, but new_count shows {self.new_count}.")
|
|
return None
|
|
index = randrange(len(cached_new_matrix_positions))
|
|
new_bucket, new_bucket_pos = cached_new_matrix_positions[index]
|
|
else:
|
|
new_bucket = randrange(NEW_BUCKET_COUNT)
|
|
new_bucket_pos = randrange(BUCKET_SIZE)
|
|
while self.new_matrix[new_bucket][new_bucket_pos] == -1:
|
|
new_bucket = (new_bucket + randbits(LOG_NEW_BUCKET_COUNT)) % NEW_BUCKET_COUNT
|
|
new_bucket_pos = (new_bucket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
|
|
node_id = self.new_matrix[new_bucket][new_bucket_pos]
|
|
assert node_id != -1
|
|
info = self.map_info[node_id]
|
|
if randbits(30) < chance * info.get_selection_chance() * (1 << 30):
|
|
end = time.time()
|
|
log.debug(f"address_manager.select_peer took {(end - start):.2e} seconds in new table.")
|
|
return info
|
|
chance *= 1.2
|
|
|
|
def resolve_tried_collisions_(self) -> None:
|
|
for node_id in self.tried_collisions[:]:
|
|
resolved = False
|
|
if node_id not in self.map_info:
|
|
resolved = True
|
|
else:
|
|
info = self.map_info[node_id]
|
|
peer = info.peer_info
|
|
tried_bucket = info.get_tried_bucket(self.key)
|
|
tried_bucket_pos = info.get_bucket_position(self.key, False, tried_bucket)
|
|
if self.tried_matrix[tried_bucket][tried_bucket_pos] != -1:
|
|
old_id = self.tried_matrix[tried_bucket][tried_bucket_pos]
|
|
old_info = self.map_info[old_id]
|
|
if time.time() - old_info.last_success < 4 * 60 * 60:
|
|
resolved = True
|
|
elif time.time() - old_info.last_try < 4 * 60 * 60:
|
|
if time.time() - old_info.last_try > 60:
|
|
self.mark_good_(peer, False, math.floor(time.time()))
|
|
resolved = True
|
|
elif time.time() - info.last_success > 40 * 60:
|
|
self.mark_good_(peer, False, math.floor(time.time()))
|
|
resolved = True
|
|
else:
|
|
self.mark_good_(peer, False, math.floor(time.time()))
|
|
resolved = True
|
|
if resolved:
|
|
self.tried_collisions.remove(node_id)
|
|
|
|
def select_tried_collision_(self) -> Optional[ExtendedPeerInfo]:
|
|
if len(self.tried_collisions) == 0:
|
|
return None
|
|
new_id = choice(self.tried_collisions)
|
|
if new_id not in self.map_info:
|
|
self.tried_collisions.remove(new_id)
|
|
return None
|
|
new_info = self.map_info[new_id]
|
|
tried_bucket = new_info.get_tried_bucket(self.key)
|
|
tried_bucket_pos = new_info.get_bucket_position(self.key, False, tried_bucket)
|
|
|
|
old_id = self.tried_matrix[tried_bucket][tried_bucket_pos]
|
|
return self.map_info[old_id]
|
|
|
|
def get_peers_(self) -> List[TimestampedPeerInfo]:
|
|
addr: List[TimestampedPeerInfo] = []
|
|
num_nodes = math.ceil(23 * len(self.random_pos) / 100)
|
|
if num_nodes > 1000:
|
|
num_nodes = 1000
|
|
for n in range(len(self.random_pos)):
|
|
if len(addr) >= num_nodes:
|
|
return addr
|
|
|
|
rand_pos = randrange(len(self.random_pos) - n) + n
|
|
self.swap_random_(n, rand_pos)
|
|
info = self.map_info[self.random_pos[n]]
|
|
if not info.peer_info.is_valid(self.allow_private_subnets):
|
|
continue
|
|
if not info.is_terrible():
|
|
cur_peer_info = TimestampedPeerInfo(
|
|
info.peer_info.host,
|
|
uint16(info.peer_info.port),
|
|
uint64(info.timestamp),
|
|
)
|
|
addr.append(cur_peer_info)
|
|
|
|
return addr
|
|
|
|
def cleanup(self, max_timestamp_difference: int, max_consecutive_failures: int):
|
|
now = int(math.floor(time.time()))
|
|
for bucket in range(NEW_BUCKET_COUNT):
|
|
for pos in range(BUCKET_SIZE):
|
|
if self.new_matrix[bucket][pos] != -1:
|
|
node_id = self.new_matrix[bucket][pos]
|
|
cur_info = self.map_info[node_id]
|
|
if (
|
|
cur_info.timestamp < now - max_timestamp_difference
|
|
and cur_info.num_attempts >= max_consecutive_failures
|
|
):
|
|
self.clear_new_(bucket, pos)
|
|
|
|
def connect_(self, addr: PeerInfo, timestamp: int):
|
|
info, _ = self.find_(addr)
|
|
if info is None:
|
|
return None
|
|
|
|
# check whether we are talking about the exact same peer
|
|
if not (info.peer_info.host == addr.host and info.peer_info.port == addr.port):
|
|
return None
|
|
|
|
update_interval = 20 * 60
|
|
if timestamp - info.timestamp > update_interval:
|
|
info.timestamp = timestamp
|
|
|
|
async def size(self) -> int:
|
|
async with self.lock:
|
|
return len(self.random_pos)
|
|
|
|
async def add_to_new_table(
|
|
self,
|
|
addresses: List[TimestampedPeerInfo],
|
|
source: Optional[PeerInfo] = None,
|
|
penalty: int = 0,
|
|
) -> bool:
|
|
is_added = False
|
|
async with self.lock:
|
|
for addr in addresses:
|
|
cur_peer_added = self.add_to_new_table_(addr, source, penalty)
|
|
is_added = is_added or cur_peer_added
|
|
return is_added
|
|
|
|
# Mark an entry as accesible.
|
|
async def mark_good(
|
|
self,
|
|
addr: PeerInfo,
|
|
test_before_evict: bool = True,
|
|
timestamp: int = -1,
|
|
):
|
|
if timestamp == -1:
|
|
timestamp = math.floor(time.time())
|
|
async with self.lock:
|
|
self.mark_good_(addr, test_before_evict, timestamp)
|
|
|
|
# Mark an entry as connection attempted to.
|
|
async def attempt(
|
|
self,
|
|
addr: PeerInfo,
|
|
count_failures: bool,
|
|
timestamp: int = -1,
|
|
):
|
|
if timestamp == -1:
|
|
timestamp = math.floor(time.time())
|
|
async with self.lock:
|
|
self.attempt_(addr, count_failures, timestamp)
|
|
|
|
# See if any to-be-evicted tried table entries have been tested and if so resolve the collisions.
|
|
async def resolve_tried_collisions(self):
|
|
async with self.lock:
|
|
self.resolve_tried_collisions_()
|
|
|
|
# Randomly select an address in tried that another address is attempting to evict.
|
|
async def select_tried_collision(self) -> Optional[ExtendedPeerInfo]:
|
|
async with self.lock:
|
|
return self.select_tried_collision_()
|
|
|
|
# Choose an address to connect to.
|
|
async def select_peer(self, new_only: bool = False) -> Optional[ExtendedPeerInfo]:
|
|
async with self.lock:
|
|
return self.select_peer_(new_only)
|
|
|
|
# Return a bunch of addresses, selected at random.
|
|
async def get_peers(self) -> List[TimestampedPeerInfo]:
|
|
async with self.lock:
|
|
return self.get_peers_()
|
|
|
|
async def connect(self, addr: PeerInfo, timestamp: int = -1):
|
|
if timestamp == -1:
|
|
timestamp = math.floor(time.time())
|
|
async with self.lock:
|
|
return self.connect_(addr, timestamp)
|