Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,3 +2258,80 @@ async def test_get_keys_values_both_disk_and_db(
retrieved_keys_values = {node.key: node.value for node in terminal_nodes}

assert retrieved_keys_values == inserted_keys_values


@pytest.mark.anyio
@boolean_datacases(name="success", false="invalid file", true="valid file")
async def test_db_data_insert_from_file(
data_store: DataStore,
store_id: bytes32,
tmp_path: Path,
seeded_random: random.Random,
success: bool,
) -> None:
num_keys = 1000
db_uri = generate_in_memory_db_uri()

async with DataStore.managed(
database=db_uri,
uri=True,
merkle_blobs_path=tmp_path.joinpath("merkle-blobs-tmp"),
key_value_blobs_path=tmp_path.joinpath("key-value-blobs-tmp"),
) as tmp_data_store:
await tmp_data_store.create_tree(store_id, status=Status.COMMITTED)
changelist: list[dict[str, Any]] = []
for _ in range(num_keys):
use_file = seeded_random.choice([True, False])
assert tmp_data_store.prefer_db_kv_blob_length > 7
size = tmp_data_store.prefer_db_kv_blob_length + 1 if use_file else 8
key = bytes(seeded_random.getrandbits(8) for _ in range(size))
value = bytes(seeded_random.getrandbits(8) for _ in range(size))
changelist.append({"action": "insert", "key": key, "value": value})

await tmp_data_store.insert_batch(store_id, changelist, status=Status.COMMITTED)
root = await tmp_data_store.get_tree_root(store_id)
files_path = tmp_path.joinpath("files")
await write_files_for_root(tmp_data_store, store_id, root, files_path, 1000)
assert root.node_hash is not None
filename = get_delta_filename_path(files_path, store_id, root.node_hash, 1)
assert filename.exists()

root_hash = bytes32([0] * 31 + [1]) if not success else root.node_hash
sinfo = ServerInfo("http://127.0.0.1/8003", 0, 0)

if not success:
target_filename_path = get_delta_filename_path(files_path, store_id, root_hash, 1)
shutil.copyfile(filename, target_filename_path)
assert target_filename_path.exists()

keys_value_path = data_store.key_value_blobs_path.joinpath(store_id.hex())
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) == 0

is_success = await insert_from_delta_file(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=1,
root_hashes=[root_hash],
server_info=sinfo,
client_foldername=files_path,
timeout=aiohttp.ClientTimeout(total=15, sock_connect=5),
log=log,
proxy_url="",
downloader=None,
)
assert is_success == success

async with data_store.db_wrapper.reader() as reader:
async with reader.execute("SELECT COUNT(*) FROM ids") as cursor:
row_count = await cursor.fetchone()
assert row_count is not None
if success:
assert row_count[0] > 0
else:
assert row_count[0] == 0

if success:
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) > 0
else:
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) == 0
37 changes: 32 additions & 5 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class DataStore:
recent_merkle_blobs: LRUCache[bytes32, MerkleBlob]
merkle_blobs_path: Path
key_value_blobs_path: Path
unconfirmed_keys_values: defaultdict[bytes32, list[bytes32]]
prefer_db_kv_blob_length: int = default_prefer_file_kv_blob_length

@classmethod
Expand Down Expand Up @@ -110,11 +111,14 @@ async def managed(
log_path=sql_log_path,
) as db_wrapper:
recent_merkle_blobs: LRUCache[bytes32, MerkleBlob] = LRUCache(capacity=cache_capacity)
unconfirmed_keys_values: defaultdict[bytes32, list[bytes32]] = defaultdict(list)

self = cls(
db_wrapper=db_wrapper,
recent_merkle_blobs=recent_merkle_blobs,
merkle_blobs_path=merkle_blobs_path,
key_value_blobs_path=key_value_blobs_path,
unconfirmed_keys_values=unconfirmed_keys_values,
prefer_db_kv_blob_length=prefer_db_kv_blob_length,
)

Expand Down Expand Up @@ -204,6 +208,9 @@ async def insert_into_data_store_from_file(
filename: Path,
delta_reader: Optional[DeltaReader] = None,
) -> Optional[DeltaReader]:
if self.unconfirmed_keys_values[store_id]:
raise Exception("Internal error: unconfirmed keys values cache not cleaned")

if root_hash is None:
merkle_blob = MerkleBlob(b"")
else:
Expand Down Expand Up @@ -236,6 +243,7 @@ async def insert_into_data_store_from_file(

# Don't store these blob objects into cache, since their data structures are not calculated yet.
await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED, update_cache=False)
await self.confirm_all_kvids(store_id=store_id)
return delta_reader

async def build_merkle_blob_queries_for_missing_hashes(
Expand Down Expand Up @@ -380,6 +388,7 @@ async def read_from_file(
serialized_node.value2,
store_id,
writer=writer,
confirmed=False,
)
node_hash = leaf_hash(serialized_node.value1, serialized_node.value2)
terminal_nodes[node_hash] = (kid, vid)
Expand Down Expand Up @@ -468,13 +477,15 @@ async def migrate_db(self, server_files_location: Path) -> None:
break

try:
await self.insert_into_data_store_from_file(store_id, root.node_hash, recovery_filename)
async with self.db_wrapper.writer():
await self.insert_into_data_store_from_file(store_id, root.node_hash, recovery_filename)
synced_generations += 1
log.info(
f"Successfully recovered root from {filename}. "
f"Total roots processed: {(synced_generations / total_generations * 100):.2f}%"
)
except Exception as e:
await self.delete_unconfirmed_kvids(store_id)
log.error(f"Cannot recover data from {filename}: {e}")
break

Expand Down Expand Up @@ -625,7 +636,9 @@ async def get_terminal_node(self, kid: KeyId, vid: ValueId, store_id: bytes32) -

return TerminalNode(hash=leaf_hash(key, value), key=key, value=value)

async def add_kvid(self, blob: bytes, store_id: bytes32, writer: aiosqlite.Connection) -> KeyOrValueId:
async def add_kvid(
self, blob: bytes, store_id: bytes32, writer: aiosqlite.Connection, confirmed: bool = True
) -> KeyOrValueId:
use_file = self._use_file_for_new_kv_blob(blob)
blob_hash = bytes32(sha256(blob).digest())
if use_file:
Expand All @@ -641,6 +654,8 @@ async def add_kvid(self, blob: bytes, store_id: bytes32, writer: aiosqlite.Conne
store_id,
),
)
if not confirmed and use_file:
self.unconfirmed_keys_values[store_id].append(blob_hash)
except sqlite3.IntegrityError as e:
if "UNIQUE constraint failed" in str(e):
kv_id = await self.get_kvid(blob, store_id)
Expand All @@ -659,11 +674,23 @@ async def add_kvid(self, blob: bytes, store_id: bytes32, writer: aiosqlite.Conne
path.write_bytes(zstd.compress(blob))
return KeyOrValueId(row[0])

async def delete_unconfirmed_kvids(self, store_id: bytes32) -> None:
for blob_hash in self.unconfirmed_keys_values[store_id]:
path = self.get_key_value_path(store_id=store_id, blob_hash=blob_hash)
try:
path.unlink()
except FileNotFoundError:
log.error(f"Cannot find key/value path {path} for hash {blob_hash}")
self.unconfirmed_keys_values[store_id].clear()

async def confirm_all_kvids(self, store_id: bytes32) -> None:
self.unconfirmed_keys_values[store_id].clear()

async def add_key_value(
self, key: bytes, value: bytes, store_id: bytes32, writer: aiosqlite.Connection
self, key: bytes, value: bytes, store_id: bytes32, writer: aiosqlite.Connection, confirmed: bool = True
) -> tuple[KeyId, ValueId]:
kid = KeyId(await self.add_kvid(key, store_id, writer=writer))
vid = ValueId(await self.add_kvid(value, store_id, writer=writer))
kid = KeyId(await self.add_kvid(key, store_id, writer=writer, confirmed=confirmed))
vid = ValueId(await self.add_kvid(value, store_id, writer=writer, confirmed=confirmed))

return (kid, vid)

Expand Down
15 changes: 8 additions & 7 deletions chia/data_layer/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,13 @@ async def insert_from_delta_file(
existing_generation,
group_files_by_store,
)
delta_reader = await data_store.insert_into_data_store_from_file(
store_id,
None if root_hash == bytes32.zeros else root_hash,
target_filename_path,
delta_reader=delta_reader,
)
async with data_store.db_wrapper.writer():
delta_reader = await data_store.insert_into_data_store_from_file(
store_id,
None if root_hash == bytes32.zeros else root_hash,
target_filename_path,
delta_reader=delta_reader,
)
log.info(
f"Successfully inserted hash {root_hash} from delta file. "
f"Generation: {existing_generation}. Store id: {store_id}."
Expand Down Expand Up @@ -256,7 +257,7 @@ async def insert_from_delta_file(
if not filename_exists:
# Don't penalize this server if we didn't download the file from it.
await data_store.server_misses_file(store_id, server_info, timestamp)
await data_store.rollback_to_generation(store_id, existing_generation - 1)
await data_store.delete_unconfirmed_kvids(store_id)
return False

return True
Expand Down
Loading