Skip to content

Commit 52c49e1

Browse files
committed
BlockStoreProtocol
1 parent 018c3fe commit 52c49e1

File tree

8 files changed

+82
-45
lines changed

8 files changed

+82
-45
lines changed

chia/_tests/blockchain/blockchain_test_utils.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import Optional
3+
from typing import Optional, cast
44

55
from chia_rs import FullBlock, SpendBundleConditions
66
from chia_rs.sized_ints import uint32, uint64
@@ -10,19 +10,20 @@
1010
from chia.consensus.blockchain import AddBlockResult, Blockchain
1111
from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty
1212
from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block
13+
from chia.full_node.block_store import BlockStore
1314
from chia.types.validation_state import ValidationState
1415
from chia.util.errors import Err
1516

1617

1718
async def check_block_store_invariant(bc: Blockchain):
18-
db_wrapper = bc.block_store.db_wrapper
19+
block_store = cast(BlockStore, bc.block_store)
1920

20-
if db_wrapper.db_version == 1:
21+
if block_store.db_wrapper.db_version == 1:
2122
return
2223

2324
in_chain = set()
2425
max_height = -1
25-
async with db_wrapper.writer_maybe_transaction() as conn:
26+
async with block_store.transaction() as conn:
2627
async with conn.execute("SELECT height, in_main_chain FROM full_blocks") as cursor:
2728
rows = await cursor.fetchall()
2829
for row in rows:

chia/_tests/blockchain/test_blockchain.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3647,7 +3647,7 @@ async def test_get_blocks_at(self, empty_blockchain: Blockchain, default_1000_bl
36473647
heights.append(block.height)
36483648
await _validate_and_add_block(b, block)
36493649

3650-
blocks = await b.get_block_records_at(heights, batch_size=2)
3650+
blocks = await b.get_block_records_at(heights)
36513651
assert blocks
36523652
assert len(blocks) == 200
36533653
assert blocks[-1].height == 199
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from __future__ import annotations
2+
3+
from contextlib import AbstractAsyncContextManager
4+
from typing import Any, Optional, Protocol
5+
6+
from chia_rs import BlockRecord, FullBlock, SubEpochChallengeSegment
7+
from chia_rs.sized_bytes import bytes32
8+
from chia_rs.sized_ints import uint32
9+
10+
11+
class BlockStoreProtocol(Protocol):
12+
async def add_full_block(self, header_hash: bytes32, block: FullBlock, block_record: BlockRecord) -> None: ...
13+
def get_block_from_cache(self, header_hash: bytes32) -> Optional[FullBlock]: ...
14+
async def get_full_block(self, header_hash: bytes32) -> Optional[FullBlock]: ...
15+
async def get_generator(self, header_hash: bytes32) -> Optional[bytes]: ...
16+
async def get_generators_at(self, heights: set[uint32]) -> dict[uint32, bytes]: ...
17+
async def get_block_record(self, header_hash: bytes32) -> Optional[BlockRecord]: ...
18+
async def get_block_records_in_range(self, start: int, stop: int) -> dict[bytes32, BlockRecord]: ...
19+
async def get_block_records_by_hash(self, header_hashes: list[bytes32]) -> list[BlockRecord]: ...
20+
async def get_blocks_by_hash(self, header_hashes: list[bytes32]) -> list[FullBlock]: ...
21+
async def persist_sub_epoch_challenge_segments(
22+
self, ses_block_hash: bytes32, segments: list[SubEpochChallengeSegment]
23+
) -> None: ...
24+
async def get_sub_epoch_challenge_segments(
25+
self,
26+
ses_block_hash: bytes32,
27+
) -> Optional[list[SubEpochChallengeSegment]]: ...
28+
async def rollback(self, height: int) -> None: ...
29+
def rollback_cache_block(self, header_hash: bytes32) -> None: ...
30+
async def set_in_chain(self, header_hashes: list[tuple[bytes32]]) -> None: ...
31+
async def set_peak(self, header_hash: bytes32) -> None: ...
32+
async def get_block_records_close_to_peak(
33+
self, blocks_n: int
34+
) -> tuple[dict[bytes32, BlockRecord], Optional[bytes32]]: ...
35+
async def get_prev_hash(self, header_hash: bytes32) -> bytes32: ...
36+
def transaction(self) -> AbstractAsyncContextManager[Any]: ...

chia/consensus/blockchain.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from chia.consensus.block_body_validation import ForkInfo, validate_block_body
2828
from chia.consensus.block_header_validation import validate_unfinished_header_block
2929
from chia.consensus.block_height_map import BlockHeightMap
30+
from chia.consensus.block_store_protocol import BlockStoreProtocol
3031
from chia.consensus.coin_store_protocol import CoinStoreProtocol
3132
from chia.consensus.cost_calculator import NPCResult
3233
from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty
@@ -35,7 +36,6 @@
3536
from chia.consensus.generator_tools import get_block_header
3637
from chia.consensus.get_block_generator import get_block_generator
3738
from chia.consensus.multiprocess_validation import PreValidationResult
38-
from chia.full_node.block_store import BlockStore
3939
from chia.types.blockchain_format.coin import Coin
4040
from chia.types.blockchain_format.vdf import VDFInfo
4141
from chia.types.coin_record import CoinRecord
@@ -104,7 +104,7 @@ class Blockchain:
104104
# Unspent Store
105105
coin_store: CoinStoreProtocol
106106
# Store
107-
block_store: BlockStore
107+
block_store: BlockStoreProtocol
108108
# Used to verify blocks in parallel
109109
pool: Executor
110110
# Set holding seen compact proofs, in order to avoid duplicates.
@@ -122,7 +122,7 @@ class Blockchain:
122122
@staticmethod
123123
async def create(
124124
coin_store: CoinStoreProtocol,
125-
block_store: BlockStore,
125+
block_store: BlockStoreProtocol,
126126
height_map: BlockHeightMap,
127127
consensus_constants: ConsensusConstants,
128128
reserved_cores: int,
@@ -421,7 +421,7 @@ async def add_block(
421421

422422
try:
423423
# Always add the block to the database
424-
async with self.block_store.db_wrapper.writer():
424+
async with self.block_store.transaction():
425425
# Perform the DB operations to update the state, and rollback if something goes wrong
426426
await self.block_store.add_full_block(header_hash, block, block_record)
427427
records, state_change_summary = await self._reconsider_peak(block_record, genesis, fork_info)
@@ -883,7 +883,7 @@ async def get_header_blocks_in_range(
883883

884884
blocks: list[FullBlock] = []
885885
for hash in hashes.copy():
886-
block = self.block_store.block_cache.get(hash)
886+
block = self.block_store.get_block_from_cache(hash)
887887
if block is not None:
888888
blocks.append(block)
889889
hashes.remove(hash)
@@ -926,27 +926,18 @@ async def get_header_block_by_height(
926926
return None
927927
return header_dict[header_hash]
928928

929-
async def get_block_records_at(self, heights: list[uint32], batch_size: int = 900) -> list[BlockRecord]:
929+
async def get_block_records_at(self, heights: list[uint32]) -> list[BlockRecord]:
930930
"""
931931
gets block records by height (only blocks that are part of the chain)
932932
"""
933-
records: list[BlockRecord] = []
934933
hashes: list[bytes32] = []
935-
assert batch_size < self.block_store.db_wrapper.host_parameter_limit
936934
for height in heights:
937935
header_hash: Optional[bytes32] = self.height_to_hash(height)
938936
if header_hash is None:
939937
raise ValueError(f"Do not have block at height {height}")
940938
hashes.append(header_hash)
941-
if len(hashes) > batch_size:
942-
res = await self.block_store.get_block_records_by_hash(hashes)
943-
records.extend(res)
944-
hashes = []
945-
946-
if len(hashes) > 0:
947-
res = await self.block_store.get_block_records_by_hash(hashes)
948-
records.extend(res)
949-
return records
939+
940+
return await self.block_store.get_block_records_by_hash(hashes)
950941

951942
def try_block_record(self, header_hash: bytes32) -> Optional[BlockRecord]:
952943
if header_hash in self.__block_records:

chia/full_node/block_store.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import dataclasses
44
import logging
55
import sqlite3
6-
from typing import Optional
6+
from contextlib import AbstractAsyncContextManager
7+
from typing import Any, Optional
78

89
import typing_extensions
910
import zstd
@@ -12,6 +13,7 @@
1213
from chia_rs.sized_ints import uint32
1314

1415
from chia.full_node.full_block_utils import GeneratorBlockInfo, block_info_from_block, generator_from_block
16+
from chia.util.batches import to_batches
1517
from chia.util.db_wrapper import DBWrapper2, execute_fetchone
1618
from chia.util.errors import Err
1719
from chia.util.lru_cache import LRUCache
@@ -189,6 +191,12 @@ async def get_sub_epoch_challenge_segments(
189191
return challenge_segments
190192
return None
191193

194+
def transaction(self) -> AbstractAsyncContextManager[Any]:
195+
return self.db_wrapper.writer()
196+
197+
def get_block_from_cache(self, header_hash: bytes32) -> Optional[FullBlock]:
198+
return self.block_cache.get(header_hash)
199+
192200
def rollback_cache_block(self, header_hash: bytes32) -> None:
193201
try:
194202
self.block_cache.remove(header_hash)
@@ -322,20 +330,21 @@ async def get_block_records_by_hash(self, header_hashes: list[bytes32]) -> list[
322330
Returns a list of Block Records, ordered by the same order in which header_hashes are passed in.
323331
Throws an exception if the blocks are not present
324332
"""
333+
325334
if len(header_hashes) == 0:
326335
return []
327336

328337
all_blocks: dict[bytes32, BlockRecord] = {}
329-
async with self.db_wrapper.reader_no_transaction() as conn:
330-
async with conn.execute(
331-
"SELECT header_hash,block_record "
332-
"FROM full_blocks "
333-
f"WHERE header_hash in ({'?,' * (len(header_hashes) - 1)}?)",
334-
header_hashes,
335-
) as cursor:
336-
for row in await cursor.fetchall():
337-
block_rec = BlockRecord.from_bytes(row[1])
338-
all_blocks[block_rec.header_hash] = block_rec
338+
for batch in to_batches(header_hashes, self.db_wrapper.host_parameter_limit):
339+
async with self.db_wrapper.reader_no_transaction() as conn:
340+
async with conn.execute(
341+
"SELECT header_hash,block_record FROM full_blocks "
342+
f"WHERE header_hash in ({'?,' * (len(batch.entries) - 1)}?)",
343+
batch.entries,
344+
) as cursor:
345+
for row in await cursor.fetchall():
346+
block_rec = BlockRecord.from_bytes(row[1])
347+
all_blocks[block_rec.header_hash] = block_rec
339348

340349
ret: list[BlockRecord] = []
341350
for hh in header_hashes:
@@ -468,7 +477,8 @@ async def get_block_bytes_in_range(
468477
No orphan blocks.
469478
"""
470479

471-
assert self.db_wrapper.db_version == 2
480+
if self.db_wrapper.db_version != 2:
481+
raise NotImplementedError("get_block_bytes_in_range requires DB version 2")
472482
async with self.db_wrapper.reader_no_transaction() as conn:
473483
async with conn.execute(
474484
"SELECT block FROM full_blocks WHERE height >= ? AND height <= ? AND in_main_chain=1",

chia/full_node/full_node_api.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,15 +1453,13 @@ async def request_block_headers(self, request: wallet_protocol.RequestBlockHeade
14531453

14541454
if request.end_height < request.start_height or request.end_height - request.start_height > 128:
14551455
return make_msg(ProtocolMessageTypes.reject_block_headers, reject)
1456-
if self.full_node.block_store.db_wrapper.db_version == 2:
1457-
try:
1458-
blocks_bytes = await self.full_node.block_store.get_block_bytes_in_range(
1459-
request.start_height, request.end_height
1460-
)
1461-
except ValueError:
1462-
return make_msg(ProtocolMessageTypes.reject_block_headers, reject)
1463-
1464-
else:
1456+
try:
1457+
blocks_bytes = await self.full_node.block_store.get_block_bytes_in_range(
1458+
request.start_height, request.end_height
1459+
)
1460+
except NotImplementedError:
1461+
# The underlying block store may not support this optimized call
1462+
# (e.g. v1 DB). In this case, we fall back to the legacy approach
14651463
height_to_hash = self.full_node.blockchain.height_to_hash
14661464
header_hashes: list[bytes32] = []
14671465
for i in range(request.start_height, request.end_height + 1):
@@ -1471,6 +1469,8 @@ async def request_block_headers(self, request: wallet_protocol.RequestBlockHeade
14711469
header_hashes.append(header_hash)
14721470

14731471
blocks_bytes = await self.full_node.block_store.get_block_bytes_by_hash(header_hashes)
1472+
except ValueError:
1473+
return make_msg(ProtocolMessageTypes.reject_block_headers, reject)
14741474
if len(blocks_bytes) != (request.end_height - request.start_height + 1): # +1 because interval is inclusive
14751475
return make_msg(ProtocolMessageTypes.reject_block_headers, reject)
14761476
return_filter = request.return_filter

chia/simulator/full_node_simulator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ async def revert_block_height(self, new_height: uint32) -> None:
167167
raise ValueError("Cannot revert to a height less than 1.")
168168
block_record: BlockRecord = self.full_node.blockchain.height_to_block_record(new_height)
169169
# remove enough data to allow a bunch of blocks to be wiped.
170-
async with self.full_node.block_store.db_wrapper.writer():
170+
async with self.full_node.block_store.transaction():
171171
# set coinstore
172172
await self.full_node.coin_store.rollback_to_block(new_height)
173173
# set blockstore to new height

tach.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ path = "chia.consensus"
2222
depends_on = [
2323
"chia.types",
2424
"chia.util",
25-
{ path = "chia.full_node", deprecated = false },
2625
]
2726

2827
[[modules]]

0 commit comments

Comments
 (0)