@@ -2173,7 +2173,8 @@ async def test_basic_key_value_db_vs_disk_cutoff(
2173
2173
blob = bytes (seeded_random .getrandbits (8 ) for _ in range (size ))
2174
2174
blob_hash = bytes32 (sha256 (blob ).digest ())
2175
2175
async with data_store .db_wrapper .writer () as writer :
2176
- await data_store .add_kvid (blob = blob , store_id = store_id , writer = writer )
2176
+ with data_store .manage_kv_files (store_id ):
2177
+ await data_store .add_kvid (blob = blob , store_id = store_id , writer = writer )
2177
2178
2178
2179
file_exists = data_store .get_key_value_path (store_id = store_id , blob_hash = blob_hash ).exists ()
2179
2180
async with data_store .db_wrapper .writer () as writer :
@@ -2207,7 +2208,8 @@ async def test_changing_key_value_db_vs_disk_cutoff(
2207
2208
2208
2209
blob = bytes (seeded_random .getrandbits (8 ) for _ in range (size ))
2209
2210
async with data_store .db_wrapper .writer () as writer :
2210
- kv_id = await data_store .add_kvid (blob = blob , store_id = store_id , writer = writer )
2211
+ with data_store .manage_kv_files (store_id ):
2212
+ kv_id = await data_store .add_kvid (blob = blob , store_id = store_id , writer = writer )
2211
2213
2212
2214
data_store .prefer_db_kv_blob_length += limit_change
2213
2215
retrieved_blob = await data_store .get_blob_from_kvid (kv_id = kv_id , store_id = store_id )
@@ -2255,3 +2257,109 @@ async def test_get_keys_values_both_disk_and_db(
2255
2257
retrieved_keys_values = {node .key : node .value for node in terminal_nodes }
2256
2258
2257
2259
assert retrieved_keys_values == inserted_keys_values
2260
+
2261
+
2262
+ @pytest .mark .anyio
2263
+ @boolean_datacases (name = "success" , false = "invalid file" , true = "valid file" )
2264
+ async def test_db_data_insert_from_file (
2265
+ data_store : DataStore ,
2266
+ store_id : bytes32 ,
2267
+ tmp_path : Path ,
2268
+ seeded_random : random .Random ,
2269
+ success : bool ,
2270
+ ) -> None :
2271
+ num_keys = 1000
2272
+ db_uri = generate_in_memory_db_uri ()
2273
+
2274
+ async with DataStore .managed (
2275
+ database = db_uri ,
2276
+ uri = True ,
2277
+ merkle_blobs_path = tmp_path .joinpath ("merkle-blobs-tmp" ),
2278
+ key_value_blobs_path = tmp_path .joinpath ("key-value-blobs-tmp" ),
2279
+ ) as tmp_data_store :
2280
+ await tmp_data_store .create_tree (store_id , status = Status .COMMITTED )
2281
+ changelist : list [dict [str , Any ]] = []
2282
+ for _ in range (num_keys ):
2283
+ use_file = seeded_random .choice ([True , False ])
2284
+ assert tmp_data_store .prefer_db_kv_blob_length > 7
2285
+ size = tmp_data_store .prefer_db_kv_blob_length + 1 if use_file else 8
2286
+ key = seeded_random .randbytes (size )
2287
+ value = seeded_random .randbytes (size )
2288
+ changelist .append ({"action" : "insert" , "key" : key , "value" : value })
2289
+
2290
+ await tmp_data_store .insert_batch (store_id , changelist , status = Status .COMMITTED )
2291
+ root = await tmp_data_store .get_tree_root (store_id )
2292
+ files_path = tmp_path .joinpath ("files" )
2293
+ await write_files_for_root (tmp_data_store , store_id , root , files_path , 1000 )
2294
+ assert root .node_hash is not None
2295
+ filename = get_delta_filename_path (files_path , store_id , root .node_hash , 1 )
2296
+ assert filename .exists ()
2297
+
2298
+ root_hash = bytes32 ([0 ] * 31 + [1 ]) if not success else root .node_hash
2299
+ sinfo = ServerInfo ("http://127.0.0.1/8003" , 0 , 0 )
2300
+
2301
+ if not success :
2302
+ target_filename_path = get_delta_filename_path (files_path , store_id , root_hash , 1 )
2303
+ shutil .copyfile (filename , target_filename_path )
2304
+ assert target_filename_path .exists ()
2305
+
2306
+ keys_value_path = data_store .key_value_blobs_path .joinpath (store_id .hex ())
2307
+ assert sum (1 for path in keys_value_path .rglob ("*" ) if path .is_file ()) == 0
2308
+
2309
+ is_success = await insert_from_delta_file (
2310
+ data_store = data_store ,
2311
+ store_id = store_id ,
2312
+ existing_generation = 0 ,
2313
+ target_generation = 1 ,
2314
+ root_hashes = [root_hash ],
2315
+ server_info = sinfo ,
2316
+ client_foldername = files_path ,
2317
+ timeout = aiohttp .ClientTimeout (total = 15 , sock_connect = 5 ),
2318
+ log = log ,
2319
+ proxy_url = "" ,
2320
+ downloader = None ,
2321
+ )
2322
+ assert is_success == success
2323
+
2324
+ async with data_store .db_wrapper .reader () as reader :
2325
+ async with reader .execute ("SELECT COUNT(*) FROM ids" ) as cursor :
2326
+ row_count = await cursor .fetchone ()
2327
+ assert row_count is not None
2328
+ if success :
2329
+ assert row_count [0 ] > 0
2330
+ else :
2331
+ assert row_count [0 ] == 0
2332
+
2333
+ if success :
2334
+ assert sum (1 for path in keys_value_path .rglob ("*" ) if path .is_file ()) > 0
2335
+ else :
2336
+ assert sum (1 for path in keys_value_path .rglob ("*" ) if path .is_file ()) == 0
2337
+
2338
+
2339
+ @pytest .mark .anyio
2340
+ async def test_manage_kv_files (
2341
+ data_store : DataStore ,
2342
+ store_id : bytes32 ,
2343
+ seeded_random : random .Random ,
2344
+ ) -> None :
2345
+ num_keys = 1000
2346
+ num_files = 0
2347
+ keys_value_path = data_store .key_value_blobs_path .joinpath (store_id .hex ())
2348
+
2349
+ with pytest .raises (Exception , match = "Test exception" ):
2350
+ async with data_store .db_wrapper .writer () as writer :
2351
+ with data_store .manage_kv_files (store_id ):
2352
+ for _ in range (num_keys ):
2353
+ use_file = seeded_random .choice ([True , False ])
2354
+ assert data_store .prefer_db_kv_blob_length > 7
2355
+ size = data_store .prefer_db_kv_blob_length + 1 if use_file else 8
2356
+ key = seeded_random .randbytes (size )
2357
+ value = seeded_random .randbytes (size )
2358
+ await data_store .add_key_value (key , value , store_id , writer )
2359
+ num_files += 2 * use_file
2360
+
2361
+ assert num_files > 0
2362
+ assert sum (1 for path in keys_value_path .rglob ("*" ) if path .is_file ()) == num_files
2363
+ raise Exception ("Test exception" )
2364
+
2365
+ assert sum (1 for path in keys_value_path .rglob ("*" ) if path .is_file ()) == 0
0 commit comments