Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 110 additions & 2 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2176,7 +2176,8 @@ async def test_basic_key_value_db_vs_disk_cutoff(
blob = bytes(seeded_random.getrandbits(8) for _ in range(size))
blob_hash = bytes32(sha256(blob).digest())
async with data_store.db_wrapper.writer() as writer:
await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer)
with data_store.manage_kv_files(store_id):
await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer)

file_exists = data_store.get_key_value_path(store_id=store_id, blob_hash=blob_hash).exists()
async with data_store.db_wrapper.writer() as writer:
Expand Down Expand Up @@ -2210,7 +2211,8 @@ async def test_changing_key_value_db_vs_disk_cutoff(

blob = bytes(seeded_random.getrandbits(8) for _ in range(size))
async with data_store.db_wrapper.writer() as writer:
kv_id = await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer)
with data_store.manage_kv_files(store_id):
kv_id = await data_store.add_kvid(blob=blob, store_id=store_id, writer=writer)

data_store.prefer_db_kv_blob_length += limit_change
retrieved_blob = await data_store.get_blob_from_kvid(kv_id=kv_id, store_id=store_id)
Expand Down Expand Up @@ -2258,3 +2260,109 @@ async def test_get_keys_values_both_disk_and_db(
retrieved_keys_values = {node.key: node.value for node in terminal_nodes}

assert retrieved_keys_values == inserted_keys_values


@pytest.mark.anyio
@boolean_datacases(name="success", false="invalid file", true="valid file")
async def test_db_data_insert_from_file(
data_store: DataStore,
store_id: bytes32,
tmp_path: Path,
seeded_random: random.Random,
success: bool,
) -> None:
num_keys = 1000
db_uri = generate_in_memory_db_uri()

async with DataStore.managed(
database=db_uri,
uri=True,
merkle_blobs_path=tmp_path.joinpath("merkle-blobs-tmp"),
key_value_blobs_path=tmp_path.joinpath("key-value-blobs-tmp"),
) as tmp_data_store:
await tmp_data_store.create_tree(store_id, status=Status.COMMITTED)
changelist: list[dict[str, Any]] = []
for _ in range(num_keys):
use_file = seeded_random.choice([True, False])
assert tmp_data_store.prefer_db_kv_blob_length > 7
size = tmp_data_store.prefer_db_kv_blob_length + 1 if use_file else 8
key = seeded_random.randbytes(size)
value = seeded_random.randbytes(size)
changelist.append({"action": "insert", "key": key, "value": value})

await tmp_data_store.insert_batch(store_id, changelist, status=Status.COMMITTED)
root = await tmp_data_store.get_tree_root(store_id)
files_path = tmp_path.joinpath("files")
await write_files_for_root(tmp_data_store, store_id, root, files_path, 1000)
assert root.node_hash is not None
filename = get_delta_filename_path(files_path, store_id, root.node_hash, 1)
assert filename.exists()

root_hash = bytes32([0] * 31 + [1]) if not success else root.node_hash
sinfo = ServerInfo("http://127.0.0.1/8003", 0, 0)

if not success:
target_filename_path = get_delta_filename_path(files_path, store_id, root_hash, 1)
shutil.copyfile(filename, target_filename_path)
assert target_filename_path.exists()

keys_value_path = data_store.key_value_blobs_path.joinpath(store_id.hex())
assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == 0

is_success = await insert_from_delta_file(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=1,
root_hashes=[root_hash],
server_info=sinfo,
client_foldername=files_path,
timeout=aiohttp.ClientTimeout(total=15, sock_connect=5),
log=log,
proxy_url="",
downloader=None,
)
assert is_success == success

async with data_store.db_wrapper.reader() as reader:
async with reader.execute("SELECT COUNT(*) FROM ids") as cursor:
row_count = await cursor.fetchone()
assert row_count is not None
if success:
assert row_count[0] > 0
else:
assert row_count[0] == 0

if success:
assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) > 0
else:
assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == 0


@pytest.mark.anyio
async def test_manage_kv_files(
data_store: DataStore,
store_id: bytes32,
seeded_random: random.Random,
) -> None:
num_keys = 1000
num_files = 0
keys_value_path = data_store.key_value_blobs_path.joinpath(store_id.hex())

with pytest.raises(Exception, match="Test exception"):
async with data_store.db_wrapper.writer() as writer:
with data_store.manage_kv_files(store_id):
for _ in range(num_keys):
use_file = seeded_random.choice([True, False])
assert data_store.prefer_db_kv_blob_length > 7
size = data_store.prefer_db_kv_blob_length + 1 if use_file else 8
key = seeded_random.randbytes(size)
value = seeded_random.randbytes(size)
await data_store.add_key_value(key, value, store_id, writer)
num_files += 2 * use_file

assert num_files > 0
assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == num_files
raise Exception("Test exception")

assert sum(1 for path in keys_value_path.rglob("*") if path.is_file()) == 0
Loading
Loading