Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2255,3 +2255,81 @@ async def test_get_keys_values_both_disk_and_db(
retrieved_keys_values = {node.key: node.value for node in terminal_nodes}

assert retrieved_keys_values == inserted_keys_values


@pytest.mark.skip(reason="cleanup not implemented")
@pytest.mark.anyio
@boolean_datacases(name="success", false="invalid file", true="valid file")
async def test_db_data_insert_from_file(
data_store: DataStore,
store_id: bytes32,
tmp_path: Path,
seeded_random: random.Random,
success: bool,
) -> None:
num_keys = 1000
db_uri = generate_in_memory_db_uri()

async with DataStore.managed(
database=db_uri,
uri=True,
merkle_blobs_path=tmp_path.joinpath("merkle-blobs-tmp"),
key_value_blobs_path=tmp_path.joinpath("key-value-blobs-tmp"),
) as tmp_data_store:
await tmp_data_store.create_tree(store_id, status=Status.COMMITTED)
changelist: list[dict[str, Any]] = []
for _ in range(num_keys):
use_file = seeded_random.choice([True, False])
assert tmp_data_store.prefer_db_kv_blob_length > 7
size = tmp_data_store.prefer_db_kv_blob_length + 1 if use_file else 8
key = bytes(seeded_random.getrandbits(8) for _ in range(size))
value = bytes(seeded_random.getrandbits(8) for _ in range(size))
changelist.append({"action": "insert", "key": key, "value": value})

await tmp_data_store.insert_batch(store_id, changelist, status=Status.COMMITTED)
root = await tmp_data_store.get_tree_root(store_id)
files_path = tmp_path.joinpath("files")
await write_files_for_root(tmp_data_store, store_id, root, files_path, 1000)
assert root.node_hash is not None
filename = get_delta_filename_path(files_path, store_id, root.node_hash, 1)
assert filename.exists()

root_hash = bytes32([0] * 31 + [1]) if not success else root.node_hash
sinfo = ServerInfo("http://127.0.0.1/8003", 0, 0)

if not success:
target_filename_path = get_delta_filename_path(files_path, store_id, root_hash, 1)
shutil.copyfile(filename, target_filename_path)
assert target_filename_path.exists()

keys_value_path = data_store.key_value_blobs_path.joinpath(store_id.hex())
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) == 0

is_success = await insert_from_delta_file(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=1,
root_hashes=[root_hash],
server_info=sinfo,
client_foldername=files_path,
timeout=aiohttp.ClientTimeout(total=15, sock_connect=5),
log=log,
proxy_url="",
downloader=None,
)
assert is_success == success

async with data_store.db_wrapper.reader() as reader:
async with reader.execute("SELECT COUNT(*) FROM ids") as cursor:
row_count = await cursor.fetchone()
assert row_count is not None
if success:
assert row_count[0] > 0
else:
assert row_count[0] == 0

if success:
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) > 0
else:
assert sum(1 for _ in keys_value_path.rglob("*") if _.is_file()) == 0
Loading