Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"root": ["tree_id", "generation", "node_hash", "status"],
"subscriptions": ["tree_id", "url", "ignore_till", "num_consecutive_failures", "from_wallet"],
"schema": ["version_id", "applied_at"],
"ids": ["kv_id", "hash", "blob", "store_id"],
"ids": ["kv_id", "hash", "blob", "store_id", "confirmed"],
"nodes": ["store_id", "hash", "root_hash", "generation", "idx"],
}

Expand Down Expand Up @@ -2258,3 +2258,54 @@ async def test_get_keys_values_both_disk_and_db(
retrieved_keys_values = {node.key: node.value for node in terminal_nodes}

assert retrieved_keys_values == inserted_keys_values


@pytest.mark.anyio
@boolean_datacases(name="test_confirmation", true="confirm blobs", false="don't confirm blobs")
async def test_unconfirmed_keys_values(
data_store: DataStore,
store_id: bytes32,
seeded_random: random.Random,
test_confirmation: bool,
) -> None:
num_keys = 10000
num_confirmed_files = 0
num_files = 0
num_confirmed_blobs = 0

async with data_store.db_wrapper.writer() as writer:
for _ in range(num_keys):
confirmed = seeded_random.choice([True, False])
use_file = seeded_random.choice([True, False])
assert data_store.prefer_db_kv_blob_length > 7
size = data_store.prefer_db_kv_blob_length + 1 if use_file else 8
key = bytes(seeded_random.getrandbits(8) for _ in range(size))
value = bytes(seeded_random.getrandbits(8) for _ in range(size))
await data_store.add_key_value(key, value, store_id, writer, confirmed)
if use_file:
num_files += 1
if confirmed:
num_confirmed_files += confirmed
num_confirmed_blobs += confirmed

async with data_store.db_wrapper.reader() as reader:
async with reader.execute("SELECT COUNT(*) FROM ids WHERE confirmed = 0") as cursor:
row_count = await cursor.fetchone()
assert row_count is not None
assert row_count[0] == 2 * (num_keys - num_confirmed_blobs)

keys_value_path = data_store.key_value_blobs_path.joinpath(store_id.hex())
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) == 2 * num_files

if test_confirmation:
await data_store.confirm_all_kvids(store_id)
expected_num_files = 2 * num_files if test_confirmation else 2 * num_confirmed_files

await data_store.delete_unconfirmed_kvids(store_id)
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) == expected_num_files

async with data_store.db_wrapper.reader() as reader:
async with reader.execute("SELECT COUNT(*) FROM ids WHERE confirmed = 0") as cursor:
row_count = await cursor.fetchone()
assert row_count is not None
assert row_count[0] == 0
63 changes: 57 additions & 6 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ async def managed(
kv_id INTEGER PRIMARY KEY,
hash BLOB NOT NULL CHECK(length(store_id) == 32),
blob BLOB,
store_id BLOB NOT NULL CHECK(length(store_id) == 32)
store_id BLOB NOT NULL CHECK(length(store_id) == 32),
confirmed tinyint CHECK(confirmed == 0 OR confirmed == 1)
)
"""
)
Expand All @@ -184,6 +185,11 @@ async def managed(
CREATE UNIQUE INDEX IF NOT EXISTS ids_hash_index ON ids(hash, store_id)
"""
)
await writer.execute(
"""
CREATE INDEX IF NOT EXISTS ids_confirmed_index ON ids(confirmed, store_id)
"""
)
await writer.execute(
"""
CREATE INDEX IF NOT EXISTS nodes_generation_index ON nodes(generation)
Expand Down Expand Up @@ -236,6 +242,7 @@ async def insert_into_data_store_from_file(

# Don't store these blob objects into cache, since their data structures are not calculated yet.
await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED, update_cache=False)
await self.confirm_all_kvids(store_id=store_id)
return delta_reader

async def build_merkle_blob_queries_for_missing_hashes(
Expand Down Expand Up @@ -380,6 +387,7 @@ async def read_from_file(
serialized_node.value2,
store_id,
writer=writer,
confirmed=False,
)
node_hash = leaf_hash(serialized_node.value1, serialized_node.value2)
terminal_nodes[node_hash] = (kid, vid)
Expand Down Expand Up @@ -625,7 +633,9 @@ async def get_terminal_node(self, kid: KeyId, vid: ValueId, store_id: bytes32) -

return TerminalNode(hash=leaf_hash(key, value), key=key, value=value)

async def add_kvid(self, blob: bytes, store_id: bytes32, writer: aiosqlite.Connection) -> KeyOrValueId:
async def add_kvid(
self, blob: bytes, store_id: bytes32, writer: aiosqlite.Connection, confirmed: bool = True
) -> KeyOrValueId:
use_file = self._use_file_for_new_kv_blob(blob)
blob_hash = bytes32(sha256(blob).digest())
if use_file:
Expand All @@ -634,11 +644,12 @@ async def add_kvid(self, blob: bytes, store_id: bytes32, writer: aiosqlite.Conne
table_blob = blob
try:
row = await writer.execute_insert(
"INSERT INTO ids (hash, blob, store_id) VALUES (?, ?, ?)",
"INSERT INTO ids (hash, blob, store_id, confirmed) VALUES (?, ?, ?, ?)",
(
blob_hash,
table_blob,
store_id,
confirmed,
),
)
except sqlite3.IntegrityError as e:
Expand All @@ -659,11 +670,51 @@ async def add_kvid(self, blob: bytes, store_id: bytes32, writer: aiosqlite.Conne
path.write_bytes(zstd.compress(blob))
return KeyOrValueId(row[0])

async def delete_unconfirmed_kvids(self, store_id: bytes32) -> None:
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(
"""
SELECT blob, hash FROM ids WHERE store_id == :store_id AND confirmed == 0
""",
{
"store_id": store_id,
},
)

rows = await cursor.fetchall()

for row in rows:
blob = row["blob"]
if blob is None:
blob_hash = row["hash"]
path = self.get_key_value_path(store_id=store_id, blob_hash=blob_hash)
try:
path.unlink()
except FileNotFoundError:
log.error(f"Cannot find key/value path {path} for hash {blob_hash}")

async with self.db_wrapper.writer() as writer:
await writer.execute(
"DELETE FROM ids WHERE store_id == :store_id AND confirmed == 0",
{
"store_id": store_id,
},
)

async def confirm_all_kvids(self, store_id: bytes32) -> None:
async with self.db_wrapper.writer() as writer:
await writer.execute(
"UPDATE ids SET confirmed = 1 WHERE store_id == :store_id and confirmed == 0",
{
"store_id": store_id,
},
)

async def add_key_value(
self, key: bytes, value: bytes, store_id: bytes32, writer: aiosqlite.Connection
self, key: bytes, value: bytes, store_id: bytes32, writer: aiosqlite.Connection, confirmed: bool = True
) -> tuple[KeyId, ValueId]:
kid = KeyId(await self.add_kvid(key, store_id, writer=writer))
vid = ValueId(await self.add_kvid(value, store_id, writer=writer))
kid = KeyId(await self.add_kvid(key, store_id, writer=writer, confirmed=confirmed))
vid = ValueId(await self.add_kvid(value, store_id, writer=writer, confirmed=confirmed))

return (kid, vid)

Expand Down
1 change: 1 addition & 0 deletions chia/data_layer/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ async def insert_from_delta_file(
# Don't penalize this server if we didn't download the file from it.
await data_store.server_misses_file(store_id, server_info, timestamp)
await data_store.rollback_to_generation(store_id, existing_generation - 1)
await data_store.delete_unconfirmed_kvids(store_id)
return False

return True
Expand Down
Loading