357 lines
14 KiB
Python
357 lines
14 KiB
Python
import logging
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import aiosqlite
|
|
|
|
from chia.consensus.block_record import BlockRecord
|
|
from chia.types.blockchain_format.sized_bytes import bytes32
|
|
from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary
|
|
from chia.types.full_block import FullBlock
|
|
from chia.types.weight_proof import SubEpochChallengeSegment, SubEpochSegments
|
|
from chia.util.db_wrapper import DBWrapper
|
|
from chia.util.ints import uint32
|
|
from chia.util.lru_cache import LRUCache
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class BlockStore:
|
|
db: aiosqlite.Connection
|
|
block_cache: LRUCache
|
|
db_wrapper: DBWrapper
|
|
ses_challenge_cache: LRUCache
|
|
|
|
@classmethod
|
|
async def create(cls, db_wrapper: DBWrapper):
|
|
self = cls()
|
|
|
|
# All full blocks which have been added to the blockchain. Header_hash -> block
|
|
self.db_wrapper = db_wrapper
|
|
self.db = db_wrapper.db
|
|
await self.db.execute("pragma journal_mode=wal")
|
|
await self.db.execute("pragma synchronous=2")
|
|
await self.db.execute(
|
|
"CREATE TABLE IF NOT EXISTS full_blocks(header_hash text PRIMARY KEY, height bigint,"
|
|
" is_block tinyint, is_fully_compactified tinyint, block blob)"
|
|
)
|
|
|
|
# Block records
|
|
await self.db.execute(
|
|
"CREATE TABLE IF NOT EXISTS block_records(header_hash "
|
|
"text PRIMARY KEY, prev_hash text, height bigint,"
|
|
"block blob, sub_epoch_summary blob, is_peak tinyint, is_block tinyint)"
|
|
)
|
|
|
|
# todo remove in v1.2
|
|
await self.db.execute("DROP TABLE IF EXISTS sub_epoch_segments_v2")
|
|
|
|
# Sub epoch segments for weight proofs
|
|
await self.db.execute(
|
|
"CREATE TABLE IF NOT EXISTS sub_epoch_segments_v3(ses_block_hash text PRIMARY KEY, challenge_segments blob)"
|
|
)
|
|
|
|
# Height index so we can look up in order of height for sync purposes
|
|
await self.db.execute("CREATE INDEX IF NOT EXISTS full_block_height on full_blocks(height)")
|
|
await self.db.execute("CREATE INDEX IF NOT EXISTS is_block on full_blocks(is_block)")
|
|
await self.db.execute("CREATE INDEX IF NOT EXISTS is_fully_compactified on full_blocks(is_fully_compactified)")
|
|
|
|
await self.db.execute("CREATE INDEX IF NOT EXISTS height on block_records(height)")
|
|
|
|
await self.db.execute("CREATE INDEX IF NOT EXISTS hh on block_records(header_hash)")
|
|
await self.db.execute("CREATE INDEX IF NOT EXISTS peak on block_records(is_peak)")
|
|
await self.db.execute("CREATE INDEX IF NOT EXISTS is_block on block_records(is_block)")
|
|
|
|
await self.db.commit()
|
|
self.block_cache = LRUCache(1000)
|
|
self.ses_challenge_cache = LRUCache(50)
|
|
return self
|
|
|
|
async def add_full_block(self, header_hash: bytes32, block: FullBlock, block_record: BlockRecord) -> None:
|
|
self.block_cache.put(header_hash, block)
|
|
cursor_1 = await self.db.execute(
|
|
"INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?)",
|
|
(
|
|
header_hash.hex(),
|
|
block.height,
|
|
int(block.is_transaction_block()),
|
|
int(block.is_fully_compactified()),
|
|
bytes(block),
|
|
),
|
|
)
|
|
|
|
await cursor_1.close()
|
|
|
|
cursor_2 = await self.db.execute(
|
|
"INSERT OR REPLACE INTO block_records VALUES(?, ?, ?, ?,?, ?, ?)",
|
|
(
|
|
header_hash.hex(),
|
|
block.prev_header_hash.hex(),
|
|
block.height,
|
|
bytes(block_record),
|
|
None
|
|
if block_record.sub_epoch_summary_included is None
|
|
else bytes(block_record.sub_epoch_summary_included),
|
|
False,
|
|
block.is_transaction_block(),
|
|
),
|
|
)
|
|
await cursor_2.close()
|
|
|
|
async def persist_sub_epoch_challenge_segments(
|
|
self, ses_block_hash: bytes32, segments: List[SubEpochChallengeSegment]
|
|
) -> None:
|
|
async with self.db_wrapper.lock:
|
|
cursor_1 = await self.db.execute(
|
|
"INSERT OR REPLACE INTO sub_epoch_segments_v3 VALUES(?, ?)",
|
|
(ses_block_hash.hex(), bytes(SubEpochSegments(segments))),
|
|
)
|
|
await cursor_1.close()
|
|
await self.db.commit()
|
|
|
|
async def get_sub_epoch_challenge_segments(
|
|
self,
|
|
ses_block_hash: bytes32,
|
|
) -> Optional[List[SubEpochChallengeSegment]]:
|
|
cached = self.ses_challenge_cache.get(ses_block_hash)
|
|
if cached is not None:
|
|
return cached
|
|
cursor = await self.db.execute(
|
|
"SELECT challenge_segments from sub_epoch_segments_v3 WHERE ses_block_hash=?", (ses_block_hash.hex(),)
|
|
)
|
|
row = await cursor.fetchone()
|
|
await cursor.close()
|
|
if row is not None:
|
|
challenge_segments = SubEpochSegments.from_bytes(row[0]).challenge_segments
|
|
self.ses_challenge_cache.put(ses_block_hash, challenge_segments)
|
|
return challenge_segments
|
|
return None
|
|
|
|
def rollback_cache_block(self, header_hash: bytes32):
|
|
try:
|
|
self.block_cache.remove(header_hash)
|
|
except KeyError:
|
|
# this is best effort. When rolling back, we may not have added the
|
|
# block to the cache yet
|
|
pass
|
|
|
|
async def get_full_block(self, header_hash: bytes32) -> Optional[FullBlock]:
|
|
cached = self.block_cache.get(header_hash)
|
|
if cached is not None:
|
|
log.debug(f"cache hit for block {header_hash.hex()}")
|
|
return cached
|
|
log.debug(f"cache miss for block {header_hash.hex()}")
|
|
cursor = await self.db.execute("SELECT block from full_blocks WHERE header_hash=?", (header_hash.hex(),))
|
|
row = await cursor.fetchone()
|
|
await cursor.close()
|
|
if row is not None:
|
|
block = FullBlock.from_bytes(row[0])
|
|
self.block_cache.put(header_hash, block)
|
|
return block
|
|
return None
|
|
|
|
async def get_full_block_bytes(self, header_hash: bytes32) -> Optional[bytes]:
|
|
cached = self.block_cache.get(header_hash)
|
|
if cached is not None:
|
|
log.debug(f"cache hit for block {header_hash.hex()}")
|
|
return bytes(cached)
|
|
log.debug(f"cache miss for block {header_hash.hex()}")
|
|
cursor = await self.db.execute("SELECT block from full_blocks WHERE header_hash=?", (header_hash.hex(),))
|
|
row = await cursor.fetchone()
|
|
await cursor.close()
|
|
if row is not None:
|
|
return row[0]
|
|
return None
|
|
|
|
async def get_full_blocks_at(self, heights: List[uint32]) -> List[FullBlock]:
|
|
if len(heights) == 0:
|
|
return []
|
|
|
|
heights_db = tuple(heights)
|
|
formatted_str = f'SELECT block from full_blocks WHERE height in ({"?," * (len(heights_db) - 1)}?)'
|
|
cursor = await self.db.execute(formatted_str, heights_db)
|
|
rows = await cursor.fetchall()
|
|
await cursor.close()
|
|
return [FullBlock.from_bytes(row[0]) for row in rows]
|
|
|
|
async def get_block_records_by_hash(self, header_hashes: List[bytes32]):
|
|
"""
|
|
Returns a list of Block Records, ordered by the same order in which header_hashes are passed in.
|
|
Throws an exception if the blocks are not present
|
|
"""
|
|
if len(header_hashes) == 0:
|
|
return []
|
|
|
|
header_hashes_db = tuple([hh.hex() for hh in header_hashes])
|
|
formatted_str = f'SELECT block from block_records WHERE header_hash in ({"?," * (len(header_hashes_db) - 1)}?)'
|
|
cursor = await self.db.execute(formatted_str, header_hashes_db)
|
|
rows = await cursor.fetchall()
|
|
await cursor.close()
|
|
all_blocks: Dict[bytes32, BlockRecord] = {}
|
|
for row in rows:
|
|
block_rec: BlockRecord = BlockRecord.from_bytes(row[0])
|
|
all_blocks[block_rec.header_hash] = block_rec
|
|
ret: List[BlockRecord] = []
|
|
for hh in header_hashes:
|
|
if hh not in all_blocks:
|
|
raise ValueError(f"Header hash {hh} not in the blockchain")
|
|
ret.append(all_blocks[hh])
|
|
return ret
|
|
|
|
async def get_blocks_by_hash(self, header_hashes: List[bytes32]) -> List[FullBlock]:
|
|
"""
|
|
Returns a list of Full Blocks blocks, ordered by the same order in which header_hashes are passed in.
|
|
Throws an exception if the blocks are not present
|
|
"""
|
|
|
|
if len(header_hashes) == 0:
|
|
return []
|
|
|
|
header_hashes_db = tuple([hh.hex() for hh in header_hashes])
|
|
formatted_str = (
|
|
f'SELECT header_hash, block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes_db) - 1)}?)'
|
|
)
|
|
cursor = await self.db.execute(formatted_str, header_hashes_db)
|
|
rows = await cursor.fetchall()
|
|
await cursor.close()
|
|
all_blocks: Dict[bytes32, FullBlock] = {}
|
|
for row in rows:
|
|
header_hash = bytes.fromhex(row[0])
|
|
full_block: FullBlock = FullBlock.from_bytes(row[1])
|
|
all_blocks[header_hash] = full_block
|
|
self.block_cache.put(header_hash, full_block)
|
|
ret: List[FullBlock] = []
|
|
for hh in header_hashes:
|
|
if hh not in all_blocks:
|
|
raise ValueError(f"Header hash {hh} not in the blockchain")
|
|
ret.append(all_blocks[hh])
|
|
return ret
|
|
|
|
async def get_block_record(self, header_hash: bytes32) -> Optional[BlockRecord]:
|
|
cursor = await self.db.execute(
|
|
"SELECT block from block_records WHERE header_hash=?",
|
|
(header_hash.hex(),),
|
|
)
|
|
row = await cursor.fetchone()
|
|
await cursor.close()
|
|
if row is not None:
|
|
return BlockRecord.from_bytes(row[0])
|
|
return None
|
|
|
|
async def get_block_records_in_range(
|
|
self,
|
|
start: int,
|
|
stop: int,
|
|
) -> Dict[bytes32, BlockRecord]:
|
|
"""
|
|
Returns a dictionary with all blocks in range between start and stop
|
|
if present.
|
|
"""
|
|
|
|
formatted_str = f"SELECT header_hash, block from block_records WHERE height >= {start} and height <= {stop}"
|
|
|
|
cursor = await self.db.execute(formatted_str)
|
|
rows = await cursor.fetchall()
|
|
await cursor.close()
|
|
ret: Dict[bytes32, BlockRecord] = {}
|
|
for row in rows:
|
|
header_hash = bytes.fromhex(row[0])
|
|
ret[header_hash] = BlockRecord.from_bytes(row[1])
|
|
|
|
return ret
|
|
|
|
async def get_block_records_close_to_peak(
|
|
self, blocks_n: int
|
|
) -> Tuple[Dict[bytes32, BlockRecord], Optional[bytes32]]:
|
|
"""
|
|
Returns a dictionary with all blocks that have height >= peak height - blocks_n, as well as the
|
|
peak header hash.
|
|
"""
|
|
|
|
res = await self.db.execute("SELECT * from block_records WHERE is_peak = 1")
|
|
peak_row = await res.fetchone()
|
|
await res.close()
|
|
if peak_row is None:
|
|
return {}, None
|
|
|
|
formatted_str = f"SELECT header_hash, block from block_records WHERE height >= {peak_row[2] - blocks_n}"
|
|
cursor = await self.db.execute(formatted_str)
|
|
rows = await cursor.fetchall()
|
|
await cursor.close()
|
|
ret: Dict[bytes32, BlockRecord] = {}
|
|
for row in rows:
|
|
header_hash = bytes.fromhex(row[0])
|
|
ret[header_hash] = BlockRecord.from_bytes(row[1])
|
|
return ret, bytes.fromhex(peak_row[0])
|
|
|
|
async def get_peak_height_dicts(self) -> Tuple[Dict[uint32, bytes32], Dict[uint32, SubEpochSummary]]:
|
|
"""
|
|
Returns a dictionary with all blocks, as well as the header hash of the peak,
|
|
if present.
|
|
"""
|
|
|
|
res = await self.db.execute("SELECT * from block_records WHERE is_peak = 1")
|
|
row = await res.fetchone()
|
|
await res.close()
|
|
if row is None:
|
|
return {}, {}
|
|
|
|
peak: bytes32 = bytes.fromhex(row[0])
|
|
cursor = await self.db.execute("SELECT header_hash,prev_hash,height,sub_epoch_summary from block_records")
|
|
rows = await cursor.fetchall()
|
|
await cursor.close()
|
|
hash_to_prev_hash: Dict[bytes32, bytes32] = {}
|
|
hash_to_height: Dict[bytes32, uint32] = {}
|
|
hash_to_summary: Dict[bytes32, SubEpochSummary] = {}
|
|
|
|
for row in rows:
|
|
hash_to_prev_hash[bytes.fromhex(row[0])] = bytes.fromhex(row[1])
|
|
hash_to_height[bytes.fromhex(row[0])] = row[2]
|
|
if row[3] is not None:
|
|
hash_to_summary[bytes.fromhex(row[0])] = SubEpochSummary.from_bytes(row[3])
|
|
|
|
height_to_hash: Dict[uint32, bytes32] = {}
|
|
sub_epoch_summaries: Dict[uint32, SubEpochSummary] = {}
|
|
|
|
curr_header_hash = peak
|
|
curr_height = hash_to_height[curr_header_hash]
|
|
while True:
|
|
height_to_hash[curr_height] = curr_header_hash
|
|
if curr_header_hash in hash_to_summary:
|
|
sub_epoch_summaries[curr_height] = hash_to_summary[curr_header_hash]
|
|
if curr_height == 0:
|
|
break
|
|
curr_header_hash = hash_to_prev_hash[curr_header_hash]
|
|
curr_height = hash_to_height[curr_header_hash]
|
|
return height_to_hash, sub_epoch_summaries
|
|
|
|
async def set_peak(self, header_hash: bytes32) -> None:
|
|
# We need to be in a sqlite transaction here.
|
|
# Note: we do not commit this to the database yet, as we need to also change the coin store
|
|
cursor_1 = await self.db.execute("UPDATE block_records SET is_peak=0 WHERE is_peak=1")
|
|
await cursor_1.close()
|
|
cursor_2 = await self.db.execute(
|
|
"UPDATE block_records SET is_peak=1 WHERE header_hash=?",
|
|
(header_hash.hex(),),
|
|
)
|
|
await cursor_2.close()
|
|
|
|
async def is_fully_compactified(self, header_hash: bytes32) -> Optional[bool]:
|
|
cursor = await self.db.execute(
|
|
"SELECT is_fully_compactified from full_blocks WHERE header_hash=?", (header_hash.hex(),)
|
|
)
|
|
row = await cursor.fetchone()
|
|
await cursor.close()
|
|
if row is None:
|
|
return None
|
|
return bool(row[0])
|
|
|
|
async def get_first_not_compactified(self, min_height: int) -> Optional[int]:
|
|
cursor = await self.db.execute(
|
|
"SELECT MIN(height) from full_blocks WHERE is_fully_compactified=0 AND height>=?", (min_height,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
await cursor.close()
|
|
if row is None:
|
|
return None
|
|
return int(row[0])
|