From bda9f37e8bb4d364f1071c8aac3a1a2f826d0a07 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Wed, 30 Jul 2025 14:13:33 -0700 Subject: [PATCH 1/2] Simplify `BlockStore` --- .../blockchain/blockchain_test_utils.py | 2 +- chia/_tests/blockchain/test_blockchain.py | 2 +- chia/consensus/blockchain.py | 19 +++-------- chia/full_node/block_store.py | 33 ++++++++++++------- chia/full_node/full_node_api.py | 23 ++++--------- chia/simulator/full_node_simulator.py | 2 +- 6 files changed, 36 insertions(+), 45 deletions(-) diff --git a/chia/_tests/blockchain/blockchain_test_utils.py b/chia/_tests/blockchain/blockchain_test_utils.py index 50df50e209ef..3de9ff90553d 100644 --- a/chia/_tests/blockchain/blockchain_test_utils.py +++ b/chia/_tests/blockchain/blockchain_test_utils.py @@ -22,7 +22,7 @@ async def check_block_store_invariant(bc: Blockchain): in_chain = set() max_height = -1 - async with db_wrapper.writer_maybe_transaction() as conn: + async with bc.block_store.transaction() as conn: async with conn.execute("SELECT height, in_main_chain FROM full_blocks") as cursor: rows = await cursor.fetchall() for row in rows: diff --git a/chia/_tests/blockchain/test_blockchain.py b/chia/_tests/blockchain/test_blockchain.py index 3e1042e58cbb..0b8d639a6d87 100644 --- a/chia/_tests/blockchain/test_blockchain.py +++ b/chia/_tests/blockchain/test_blockchain.py @@ -3647,7 +3647,7 @@ async def test_get_blocks_at(self, empty_blockchain: Blockchain, default_1000_bl heights.append(block.height) await _validate_and_add_block(b, block) - blocks = await b.get_block_records_at(heights, batch_size=2) + blocks = await b.get_block_records_at(heights) assert blocks assert len(blocks) == 200 assert blocks[-1].height == 199 diff --git a/chia/consensus/blockchain.py b/chia/consensus/blockchain.py index 47f08242991b..2117f963f308 100644 --- a/chia/consensus/blockchain.py +++ b/chia/consensus/blockchain.py @@ -421,7 +421,7 @@ async def add_block( try: # Always add the block to the database - async with self.block_store.db_wrapper.writer(): + async with self.block_store.transaction(): # Perform the DB operations to update the state, and rollback if something goes wrong await self.block_store.add_full_block(header_hash, block, block_record) records, state_change_summary = await self._reconsider_peak(block_record, genesis, fork_info) @@ -883,7 +883,7 @@ async def get_header_blocks_in_range( blocks: list[FullBlock] = [] for hash in hashes.copy(): - block = self.block_store.block_cache.get(hash) + block = self.block_store.get_block_from_cache(hash) if block is not None: blocks.append(block) hashes.remove(hash) @@ -926,27 +926,18 @@ async def get_header_block_by_height( return None return header_dict[header_hash] - async def get_block_records_at(self, heights: list[uint32], batch_size: int = 900) -> list[BlockRecord]: + async def get_block_records_at(self, heights: list[uint32]) -> list[BlockRecord]: """ gets block records by height (only blocks that are part of the chain) """ - records: list[BlockRecord] = [] hashes: list[bytes32] = [] - assert batch_size < self.block_store.db_wrapper.host_parameter_limit for height in heights: header_hash: Optional[bytes32] = self.height_to_hash(height) if header_hash is None: raise ValueError(f"Do not have block at height {height}") hashes.append(header_hash) - if len(hashes) > batch_size: - res = await self.block_store.get_block_records_by_hash(hashes) - records.extend(res) - hashes = [] - - if len(hashes) > 0: - res = await self.block_store.get_block_records_by_hash(hashes) - records.extend(res) - return records + + return await self.block_store.get_block_records_by_hash(hashes) def try_block_record(self, header_hash: bytes32) -> Optional[BlockRecord]: if header_hash in self.__block_records: diff --git a/chia/full_node/block_store.py b/chia/full_node/block_store.py index e16473675620..23edf6ca4017 100644 --- a/chia/full_node/block_store.py +++ b/chia/full_node/block_store.py @@ -3,8 +3,10 @@ import dataclasses import logging import sqlite3 +from contextlib import AbstractAsyncContextManager from typing import Optional +import aiosqlite import typing_extensions import zstd from chia_rs import BlockRecord, FullBlock, SubEpochChallengeSegment, SubEpochSegments @@ -12,6 +14,7 @@ from chia_rs.sized_ints import uint32 from chia.full_node.full_block_utils import GeneratorBlockInfo, block_info_from_block, generator_from_block +from chia.util.batches import to_batches from chia.util.db_wrapper import DBWrapper2, execute_fetchone from chia.util.errors import Err from chia.util.lru_cache import LRUCache @@ -189,6 +192,12 @@ async def get_sub_epoch_challenge_segments( return challenge_segments return None + def transaction(self) -> AbstractAsyncContextManager[aiosqlite.Connection]: + return self.db_wrapper.writer() + + def get_block_from_cache(self, header_hash: bytes32) -> Optional[FullBlock]: + return self.block_cache.get(header_hash) + def rollback_cache_block(self, header_hash: bytes32) -> None: try: self.block_cache.remove(header_hash) @@ -322,20 +331,21 @@ async def get_block_records_by_hash(self, header_hashes: list[bytes32]) -> list[ Returns a list of Block Records, ordered by the same order in which header_hashes are passed in. Throws an exception if the blocks are not present """ + if len(header_hashes) == 0: return [] all_blocks: dict[bytes32, BlockRecord] = {} - async with self.db_wrapper.reader_no_transaction() as conn: - async with conn.execute( - "SELECT header_hash,block_record " - "FROM full_blocks " - f"WHERE header_hash in ({'?,' * (len(header_hashes) - 1)}?)", - header_hashes, - ) as cursor: - for row in await cursor.fetchall(): - block_rec = BlockRecord.from_bytes(row[1]) - all_blocks[block_rec.header_hash] = block_rec + for batch in to_batches(header_hashes, self.db_wrapper.host_parameter_limit): + async with self.db_wrapper.reader_no_transaction() as conn: + async with conn.execute( + "SELECT header_hash,block_record FROM full_blocks " + f"WHERE header_hash in ({'?,' * (len(batch.entries) - 1)}?)", + batch.entries, + ) as cursor: + for row in await cursor.fetchall(): + block_rec = BlockRecord.from_bytes(row[1]) + all_blocks[block_rec.header_hash] = block_rec ret: list[BlockRecord] = [] for hh in header_hashes: @@ -468,7 +478,8 @@ async def get_block_bytes_in_range( No orphan blocks. """ - assert self.db_wrapper.db_version == 2 + if self.db_wrapper.db_version != 2: + raise RuntimeError("get_block_bytes_in_range requires DB version 2") async with self.db_wrapper.reader_no_transaction() as conn: async with conn.execute( "SELECT block FROM full_blocks WHERE height >= ? AND height <= ? AND in_main_chain=1", diff --git a/chia/full_node/full_node_api.py b/chia/full_node/full_node_api.py index 3c9da355cafe..dc435c436ced 100644 --- a/chia/full_node/full_node_api.py +++ b/chia/full_node/full_node_api.py @@ -1453,24 +1453,13 @@ async def request_block_headers(self, request: wallet_protocol.RequestBlockHeade if request.end_height < request.start_height or request.end_height - request.start_height > 128: return make_msg(ProtocolMessageTypes.reject_block_headers, reject) - if self.full_node.block_store.db_wrapper.db_version == 2: - try: - blocks_bytes = await self.full_node.block_store.get_block_bytes_in_range( - request.start_height, request.end_height - ) - except ValueError: - return make_msg(ProtocolMessageTypes.reject_block_headers, reject) - - else: - height_to_hash = self.full_node.blockchain.height_to_hash - header_hashes: list[bytes32] = [] - for i in range(request.start_height, request.end_height + 1): - header_hash: Optional[bytes32] = height_to_hash(uint32(i)) - if header_hash is None: - return make_msg(ProtocolMessageTypes.reject_header_blocks, reject) - header_hashes.append(header_hash) + try: + blocks_bytes = await self.full_node.block_store.get_block_bytes_in_range( + request.start_height, request.end_height + ) + except ValueError: + return make_msg(ProtocolMessageTypes.reject_block_headers, reject) - blocks_bytes = await self.full_node.block_store.get_block_bytes_by_hash(header_hashes) if len(blocks_bytes) != (request.end_height - request.start_height + 1): # +1 because interval is inclusive return make_msg(ProtocolMessageTypes.reject_block_headers, reject) return_filter = request.return_filter diff --git a/chia/simulator/full_node_simulator.py b/chia/simulator/full_node_simulator.py index 7ecc43d5d652..652cbd6ce4c2 100644 --- a/chia/simulator/full_node_simulator.py +++ b/chia/simulator/full_node_simulator.py @@ -167,7 +167,7 @@ async def revert_block_height(self, new_height: uint32) -> None: raise ValueError("Cannot revert to a height less than 1.") block_record: BlockRecord = self.full_node.blockchain.height_to_block_record(new_height) # remove enough data to allow a bunch of blocks to be wiped. - async with self.full_node.block_store.db_wrapper.writer(): + async with self.full_node.block_store.transaction(): # set coinstore await self.full_node.coin_store.rollback_to_block(new_height) # set blockstore to new height From aec98d06b242191c6c8fdc915c33d19f300357af Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Thu, 31 Jul 2025 17:15:54 -0700 Subject: [PATCH 2/2] Update chia/full_node/block_store.py Co-authored-by: Arvid Norberg --- chia/full_node/block_store.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/chia/full_node/block_store.py b/chia/full_node/block_store.py index 23edf6ca4017..4fef5b9a4006 100644 --- a/chia/full_node/block_store.py +++ b/chia/full_node/block_store.py @@ -478,8 +478,7 @@ async def get_block_bytes_in_range( No orphan blocks. """ - if self.db_wrapper.db_version != 2: - raise RuntimeError("get_block_bytes_in_range requires DB version 2") + assert self.db_wrapper.db_version == 2 async with self.db_wrapper.reader_no_transaction() as conn: async with conn.execute( "SELECT block FROM full_blocks WHERE height >= ? AND height <= ? AND in_main_chain=1",