diff --git a/chia/_tests/core/data_layer/test_data_store.py b/chia/_tests/core/data_layer/test_data_store.py index 0c6725c70657..cf1a2c68349f 100644 --- a/chia/_tests/core/data_layer/test_data_store.py +++ b/chia/_tests/core/data_layer/test_data_store.py @@ -2176,7 +2176,8 @@ async def test_basic_key_value_db_vs_disk_cutoff( blob = bytes(seeded_random.getrandbits(8) for _ in range(size)) blob_hash = bytes32(sha256(blob).digest()) async with data_store.db_wrapper.writer() as writer: - await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer) + with data_store.manage_kv_files(store_id): + await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer) file_exists = data_store.get_key_value_path(store_id=store_id, blob_hash=blob_hash).exists() async with data_store.db_wrapper.writer() as writer: @@ -2210,7 +2211,8 @@ async def test_changing_key_value_db_vs_disk_cutoff( blob = bytes(seeded_random.getrandbits(8) for _ in range(size)) async with data_store.db_wrapper.writer() as writer: - kv_id = await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer) + with data_store.manage_kv_files(store_id): + kv_id = await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer) data_store.prefer_db_kv_blob_length += limit_change retrieved_blob = await data_store.get_blob_from_kvid(kv_id=kv_id, store_id=store_id) @@ -2258,3 +2260,109 @@ async def test_get_keys_values_both_disk_and_db( retrieved_keys_values = {node.key: node.value for node in terminal_nodes} assert retrieved_keys_values == inserted_keys_values + + +@pytest.mark.anyio +@boolean_datacases(name="success", false="invalid file", true="valid file") +async def test_db_data_insert_from_file( + data_store: DataStore, + store_id: bytes32, + tmp_path: Path, + seeded_random: random.Random, + success: bool, +) -> None: + num_keys = 1000 + db_uri = generate_in_memory_db_uri() + + async with DataStore.managed( + database=db_uri, + uri=True, + merkle_blobs_path=tmp_path.joinpath("merkle-blobs-tmp"), + key_value_blobs_path=tmp_path.joinpath("key-value-blobs-tmp"), + ) as tmp_data_store: + await tmp_data_store.create_tree(store_id, status=Status.COMMITTED) + changelist: list[dict[str, Any]] = [] + for _ in range(num_keys): + use_file = seeded_random.choice([True, False]) + assert tmp_data_store.prefer_db_kv_blob_length > 7 + size = tmp_data_store.prefer_db_kv_blob_length + 1 if use_file else 8 + key = seeded_random.randbytes(size) + value = seeded_random.randbytes(size) + changelist.append({"action": "insert", "key": key, "value": value}) + + await tmp_data_store.insert_batch(store_id, changelist, status=Status.COMMITTED) + root = await tmp_data_store.get_tree_root(store_id) + files_path = tmp_path.joinpath("files") + await write_files_for_root(tmp_data_store, store_id, root, files_path, 1000) + assert root.node_hash is not None + filename = get_delta_filename_path(files_path, store_id, root.node_hash, 1) + assert filename.exists() + + root_hash = bytes32([0] * 31 + [1]) if not success else root.node_hash + sinfo = ServerInfo("http://127.0.0.1/8003", 0, 0) + + if not success: + target_filename_path = get_delta_filename_path(files_path, store_id, root_hash, 1) + shutil.copyfile(filename, target_filename_path) + assert target_filename_path.exists() + + keys_value_path = data_store.key_value_blobs_path.joinpath(store_id.hex()) + assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == 0 + + is_success = await insert_from_delta_file( + data_store=data_store, + store_id=store_id, + existing_generation=0, + target_generation=1, + root_hashes=[root_hash], + server_info=sinfo, + client_foldername=files_path, + timeout=aiohttp.ClientTimeout(total=15, sock_connect=5), + log=log, + proxy_url="", + downloader=None, + ) + assert is_success == success + + async with data_store.db_wrapper.reader() as reader: + async with reader.execute("SELECT COUNT(*) FROM ids") as cursor: + row_count = await cursor.fetchone() + assert row_count is not None + if success: + assert row_count[0] > 0 + else: + assert row_count[0] == 0 + + if success: + assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) > 0 + else: + assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == 0 + + +@pytest.mark.anyio +async def test_manage_kv_files( + data_store: DataStore, + store_id: bytes32, + seeded_random: random.Random, +) -> None: + num_keys = 1000 + num_files = 0 + keys_value_path = data_store.key_value_blobs_path.joinpath(store_id.hex()) + + with pytest.raises(Exception, match="Test exception"): + async with data_store.db_wrapper.writer() as writer: + with data_store.manage_kv_files(store_id): + for _ in range(num_keys): + use_file = seeded_random.choice([True, False]) + assert data_store.prefer_db_kv_blob_length > 7 + size = data_store.prefer_db_kv_blob_length + 1 if use_file else 8 + key = seeded_random.randbytes(size) + value = seeded_random.randbytes(size) + await data_store.add_key_value(key, value, store_id, writer) + num_files += 2 * use_file + + assert num_files > 0 + assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == num_files + raise Exception("Test exception") + + assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == 0 diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 963f30ea14d1..f7f17ee641c8 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -7,9 +7,9 @@ import shutil import sqlite3 from collections import defaultdict -from collections.abc import AsyncIterator, Awaitable, Iterable, Sequence -from contextlib import asynccontextmanager -from dataclasses import dataclass, replace +from collections.abc import AsyncIterator, Awaitable, Iterable, Iterator, Sequence +from contextlib import asynccontextmanager, contextmanager +from dataclasses import dataclass, field, replace from hashlib import sha256 from pathlib import Path from typing import Any, BinaryIO, Callable, Optional, Union @@ -82,6 +82,7 @@ class DataStore: recent_merkle_blobs: LRUCache[bytes32, MerkleBlob] merkle_blobs_path: Path key_value_blobs_path: Path + unconfirmed_keys_values: dict[bytes32, list[bytes32]] = field(default_factory=dict) prefer_db_kv_blob_length: int = default_prefer_file_kv_blob_length @classmethod @@ -204,39 +205,44 @@ async def insert_into_data_store_from_file( filename: Path, delta_reader: Optional[DeltaReader] = None, ) -> Optional[DeltaReader]: - if root_hash is None: - merkle_blob = MerkleBlob(b"") - else: - root = await self.get_tree_root(store_id=store_id) - if delta_reader is None: - delta_reader = DeltaReader(internal_nodes={}, leaf_nodes={}) - if root.node_hash is not None: - delta_reader.collect_from_merkle_blob( - self.get_merkle_path(store_id=store_id, root_hash=root.node_hash), indexes=[TreeIndex(0)] - ) - - internal_nodes, terminal_nodes = await self.read_from_file(filename, store_id) - delta_reader.add_internal_nodes(internal_nodes) - delta_reader.add_leaf_nodes(terminal_nodes) - - missing_hashes = await anyio.to_thread.run_sync(delta_reader.get_missing_hashes, root_hash) - - if len(missing_hashes) > 0: - # TODO: consider adding transactions around this code - merkle_blob_queries = await self.build_merkle_blob_queries_for_missing_hashes(missing_hashes, store_id) - if len(merkle_blob_queries) > 0: - jobs = [ - (self.get_merkle_path(store_id=store_id, root_hash=old_root_hash), indexes) - for old_root_hash, indexes in merkle_blob_queries.items() - ] - await anyio.to_thread.run_sync(delta_reader.collect_from_merkle_blobs, jobs) - await self.build_cache_and_collect_missing_hashes(root, root_hash, store_id, delta_reader) + async with self.db_wrapper.writer(): + with self.manage_kv_files(store_id): + if root_hash is None: + merkle_blob = MerkleBlob(b"") + else: + root = await self.get_tree_root(store_id=store_id) + if delta_reader is None: + delta_reader = DeltaReader(internal_nodes={}, leaf_nodes={}) + if root.node_hash is not None: + delta_reader.collect_from_merkle_blob( + self.get_merkle_path(store_id=store_id, root_hash=root.node_hash), + indexes=[TreeIndex(0)], + ) + + internal_nodes, terminal_nodes = await self.read_from_file(filename, store_id) + delta_reader.add_internal_nodes(internal_nodes) + delta_reader.add_leaf_nodes(terminal_nodes) + + missing_hashes = await anyio.to_thread.run_sync(delta_reader.get_missing_hashes, root_hash) + + if len(missing_hashes) > 0: + # TODO: consider adding transactions around this code + merkle_blob_queries = await self.build_merkle_blob_queries_for_missing_hashes( + missing_hashes, store_id + ) + if len(merkle_blob_queries) > 0: + jobs = [ + (self.get_merkle_path(store_id=store_id, root_hash=old_root_hash), indexes) + for old_root_hash, indexes in merkle_blob_queries.items() + ] + await anyio.to_thread.run_sync(delta_reader.collect_from_merkle_blobs, jobs) + await self.build_cache_and_collect_missing_hashes(root, root_hash, store_id, delta_reader) - merkle_blob = delta_reader.create_merkle_blob_and_filter_unused_nodes(root_hash, set()) + merkle_blob = delta_reader.create_merkle_blob_and_filter_unused_nodes(root_hash, set()) - # Don't store these blob objects into cache, since their data structures are not calculated yet. - await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED, update_cache=False) - return delta_reader + # Don't store these blob objects into cache, since their data structures are not calculated yet. + await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED, update_cache=False) + return delta_reader async def build_merkle_blob_queries_for_missing_hashes( self, @@ -655,10 +661,39 @@ async def add_kvid(self, blob: bytes, store_id: bytes32, writer: aiosqlite.Conne if use_file: path = self.get_key_value_path(store_id=store_id, blob_hash=blob_hash) path.parent.mkdir(parents=True, exist_ok=True) + self.unconfirmed_keys_values[store_id].append(blob_hash) # TODO: consider file-system based locking of either the file or the store directory path.write_bytes(zstd.compress(blob)) return KeyOrValueId(row[0]) + def delete_unconfirmed_kvids(self, store_id: bytes32) -> None: + for blob_hash in self.unconfirmed_keys_values[store_id]: + with log_exceptions(log=log, consume=True): + path = self.get_key_value_path(store_id=store_id, blob_hash=blob_hash) + try: + path.unlink() + except FileNotFoundError: + log.error(f"Cannot find key/value path {path} for hash {blob_hash}") + del self.unconfirmed_keys_values[store_id] + + def confirm_all_kvids(self, store_id: bytes32) -> None: + del self.unconfirmed_keys_values[store_id] + + @contextmanager + def manage_kv_files(self, store_id: bytes32) -> Iterator[None]: + if store_id not in self.unconfirmed_keys_values: + self.unconfirmed_keys_values[store_id] = [] + else: + raise Exception("Internal error: unconfirmed keys values cache not cleaned") + + try: + yield + except: + self.delete_unconfirmed_kvids(store_id) + raise + else: + self.confirm_all_kvids(store_id) + async def add_key_value( self, key: bytes, value: bytes, store_id: bytes32, writer: aiosqlite.Connection ) -> tuple[KeyId, ValueId]: @@ -1251,28 +1286,29 @@ async def insert( root: Optional[Root] = None, ) -> InsertResult: async with self.db_wrapper.writer() as writer: - if root is None: - root = await self.get_tree_root(store_id=store_id) - merkle_blob = await self.get_merkle_blob(store_id=store_id, root_hash=root.node_hash) + with self.manage_kv_files(store_id): + if root is None: + root = await self.get_tree_root(store_id=store_id) + merkle_blob = await self.get_merkle_blob(store_id=store_id, root_hash=root.node_hash) - kid, vid = await self.add_key_value(key, value, store_id, writer=writer) - hash = leaf_hash(key, value) - reference_kid = None - if reference_node_hash is not None: - reference_kid, _ = merkle_blob.get_node_by_hash(reference_node_hash) + kid, vid = await self.add_key_value(key, value, store_id, writer=writer) + hash = leaf_hash(key, value) + reference_kid = None + if reference_node_hash is not None: + reference_kid, _ = merkle_blob.get_node_by_hash(reference_node_hash) - was_empty = root.node_hash is None - if not was_empty and reference_kid is None: - if side is not None: - raise Exception("Side specified without reference node hash") + was_empty = root.node_hash is None + if not was_empty and reference_kid is None: + if side is not None: + raise Exception("Side specified without reference node hash") - seed = leaf_hash(key=key, value=value) - reference_kid, side = self.get_reference_kid_side(merkle_blob, seed) + seed = leaf_hash(key=key, value=value) + reference_kid, side = self.get_reference_kid_side(merkle_blob, seed) - merkle_blob.insert(kid, vid, hash, reference_kid, side) + merkle_blob.insert(kid, vid, hash, reference_kid, side) - new_root = await self.insert_root_from_merkle_blob(merkle_blob, store_id, status) - return InsertResult(node_hash=hash, root=new_root) + new_root = await self.insert_root_from_merkle_blob(merkle_blob, store_id, status) + return InsertResult(node_hash=hash, root=new_root) async def delete( self, @@ -1303,16 +1339,17 @@ async def upsert( root: Optional[Root] = None, ) -> InsertResult: async with self.db_wrapper.writer() as writer: - if root is None: - root = await self.get_tree_root(store_id=store_id) - merkle_blob = await self.get_merkle_blob(store_id=store_id, root_hash=root.node_hash) + with self.manage_kv_files(store_id): + if root is None: + root = await self.get_tree_root(store_id=store_id) + merkle_blob = await self.get_merkle_blob(store_id=store_id, root_hash=root.node_hash) - kid, vid = await self.add_key_value(key, new_value, store_id, writer=writer) - hash = leaf_hash(key, new_value) - merkle_blob.upsert(kid, vid, hash) + kid, vid = await self.add_key_value(key, new_value, store_id, writer=writer) + hash = leaf_hash(key, new_value) + merkle_blob.upsert(kid, vid, hash) - new_root = await self.insert_root_from_merkle_blob(merkle_blob, store_id, status) - return InsertResult(node_hash=hash, root=new_root) + new_root = await self.insert_root_from_merkle_blob(merkle_blob, store_id, status) + return InsertResult(node_hash=hash, root=new_root) async def insert_batch( self, @@ -1322,88 +1359,89 @@ async def insert_batch( enable_batch_autoinsert: bool = True, ) -> Optional[bytes32]: async with self.db_wrapper.writer() as writer: - old_root = await self.get_tree_root(store_id=store_id) - pending_root = await self.get_pending_root(store_id=store_id) - if pending_root is not None: - if pending_root.status == Status.PENDING_BATCH: - # We have an unfinished batch, continue the current batch on top of it. - if pending_root.generation != old_root.generation + 1: + with self.manage_kv_files(store_id): + old_root = await self.get_tree_root(store_id=store_id) + pending_root = await self.get_pending_root(store_id=store_id) + if pending_root is not None: + if pending_root.status == Status.PENDING_BATCH: + # We have an unfinished batch, continue the current batch on top of it. + if pending_root.generation != old_root.generation + 1: + raise Exception("Internal error") + old_root = pending_root + await self.clear_pending_roots(store_id) + else: raise Exception("Internal error") - old_root = pending_root - await self.clear_pending_roots(store_id) - else: - raise Exception("Internal error") - - merkle_blob = await self.get_merkle_blob(store_id=store_id, root_hash=old_root.node_hash) - - key_hash_frequency: dict[bytes32, int] = {} - first_action: dict[bytes32, str] = {} - last_action: dict[bytes32, str] = {} - for change in changelist: - key = change["key"] - hash = key_hash(key) - key_hash_frequency[hash] = key_hash_frequency.get(hash, 0) + 1 - if hash not in first_action: - first_action[hash] = change["action"] - last_action[hash] = change["action"] + merkle_blob = await self.get_merkle_blob(store_id=store_id, root_hash=old_root.node_hash) - batch_keys_values: list[tuple[KeyId, ValueId]] = [] - batch_hashes: list[bytes32] = [] + key_hash_frequency: dict[bytes32, int] = {} + first_action: dict[bytes32, str] = {} + last_action: dict[bytes32, str] = {} - for change in changelist: - if change["action"] == "insert": + for change in changelist: key = change["key"] - value = change["value"] - - reference_node_hash = change.get("reference_node_hash", None) - side = change.get("side", None) - reference_kid: Optional[KeyId] = None - if reference_node_hash is not None: - reference_kid, _ = merkle_blob.get_node_by_hash(reference_node_hash) - - key_hashed = key_hash(key) - kid, vid = await self.add_key_value(key, value, store_id, writer=writer) - try: - merkle_blob.get_key_index(kid) - except chia_rs.datalayer.UnknownKeyError: - pass + hash = key_hash(key) + key_hash_frequency[hash] = key_hash_frequency.get(hash, 0) + 1 + if hash not in first_action: + first_action[hash] = change["action"] + last_action[hash] = change["action"] + + batch_keys_values: list[tuple[KeyId, ValueId]] = [] + batch_hashes: list[bytes32] = [] + + for change in changelist: + if change["action"] == "insert": + key = change["key"] + value = change["value"] + + reference_node_hash = change.get("reference_node_hash", None) + side = change.get("side", None) + reference_kid: Optional[KeyId] = None + if reference_node_hash is not None: + reference_kid, _ = merkle_blob.get_node_by_hash(reference_node_hash) + + key_hashed = key_hash(key) + kid, vid = await self.add_key_value(key, value, store_id, writer=writer) + try: + merkle_blob.get_key_index(kid) + except chia_rs.datalayer.UnknownKeyError: + pass + else: + raise KeyAlreadyPresentError(kid) + hash = leaf_hash(key, value) + + if reference_node_hash is None and side is None: + if enable_batch_autoinsert and reference_kid is None: + if key_hash_frequency[key_hashed] == 1 or ( + key_hash_frequency[key_hashed] == 2 and first_action[key_hashed] == "delete" + ): + batch_keys_values.append((kid, vid)) + batch_hashes.append(hash) + continue + if not merkle_blob.empty(): + seed = leaf_hash(key=key, value=value) + reference_kid, side = self.get_reference_kid_side(merkle_blob, seed) + + merkle_blob.insert(kid, vid, hash, reference_kid, side) + elif change["action"] == "delete": + key = change["key"] + deletion_kid = await self.get_kvid(key, store_id) + if deletion_kid is not None: + merkle_blob.delete(KeyId(deletion_kid)) + elif change["action"] == "upsert": + key = change["key"] + new_value = change["value"] + kid, vid = await self.add_key_value(key, new_value, store_id, writer=writer) + hash = leaf_hash(key, new_value) + merkle_blob.upsert(kid, vid, hash) else: - raise KeyAlreadyPresentError(kid) - hash = leaf_hash(key, value) - - if reference_node_hash is None and side is None: - if enable_batch_autoinsert and reference_kid is None: - if key_hash_frequency[key_hashed] == 1 or ( - key_hash_frequency[key_hashed] == 2 and first_action[key_hashed] == "delete" - ): - batch_keys_values.append((kid, vid)) - batch_hashes.append(hash) - continue - if not merkle_blob.empty(): - seed = leaf_hash(key=key, value=value) - reference_kid, side = self.get_reference_kid_side(merkle_blob, seed) - - merkle_blob.insert(kid, vid, hash, reference_kid, side) - elif change["action"] == "delete": - key = change["key"] - deletion_kid = await self.get_kvid(key, store_id) - if deletion_kid is not None: - merkle_blob.delete(KeyId(deletion_kid)) - elif change["action"] == "upsert": - key = change["key"] - new_value = change["value"] - kid, vid = await self.add_key_value(key, new_value, store_id, writer=writer) - hash = leaf_hash(key, new_value) - merkle_blob.upsert(kid, vid, hash) - else: - raise Exception(f"Operation in batch is not insert or delete: {change}") + raise Exception(f"Operation in batch is not insert or delete: {change}") - if len(batch_keys_values) > 0: - merkle_blob.batch_insert(batch_keys_values, batch_hashes) + if len(batch_keys_values) > 0: + merkle_blob.batch_insert(batch_keys_values, batch_hashes) - new_root = await self.insert_root_from_merkle_blob(merkle_blob, store_id, status, old_root) - return new_root.node_hash + new_root = await self.insert_root_from_merkle_blob(merkle_blob, store_id, status, old_root) + return new_root.node_hash async def get_node_by_key( self, diff --git a/chia/data_layer/download_data.py b/chia/data_layer/download_data.py index 8a0580d91033..3e89684eb4fb 100644 --- a/chia/data_layer/download_data.py +++ b/chia/data_layer/download_data.py @@ -256,7 +256,6 @@ async def insert_from_delta_file( if not filename_exists: # Don't penalize this server if we didn't download the file from it. await data_store.server_misses_file(store_id, server_info, timestamp) - await data_store.rollback_to_generation(store_id, existing_generation - 1) return False return True