Skip to content

Commit 7759619

Browse files
authored
[CHIA-1427]: Limit full file creation when processing subscription generations (#18612)
* Some logging and some code to limit full file generation based on the max number of full files allowed * Don't write out full files that aren't needed * Black fixes * Adjust the full file during error conditions * No need for try
1 parent cc554ea commit 7759619

File tree

3 files changed

+63
-17
lines changed

3 files changed

+63
-17
lines changed

chia/_tests/core/data_layer/test_data_store.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,6 +1369,7 @@ async def mock_http_download(
13691369
data_store=data_store,
13701370
store_id=store_id,
13711371
existing_generation=3,
1372+
target_generation=4,
13721373
root_hashes=[bytes32.random(seeded_random)],
13731374
server_info=sinfo,
13741375
client_foldername=tmp_path,
@@ -1392,6 +1393,7 @@ async def mock_http_download(
13921393
data_store=data_store,
13931394
store_id=store_id,
13941395
existing_generation=3,
1396+
target_generation=4,
13951397
root_hashes=[bytes32.random(seeded_random)],
13961398
server_info=sinfo,
13971399
client_foldername=tmp_path,
@@ -1830,13 +1832,15 @@ async def test_delete_store_data_protects_pending_roots(raw_data_store: DataStor
18301832

18311833
@pytest.mark.anyio
18321834
@boolean_datacases(name="group_files_by_store", true="group by singleton", false="don't group by singleton")
1835+
@pytest.mark.parametrize("max_full_files", [1, 2, 5])
18331836
async def test_insert_from_delta_file(
18341837
data_store: DataStore,
18351838
store_id: bytes32,
18361839
monkeypatch: Any,
18371840
tmp_path: Path,
18381841
seeded_random: random.Random,
18391842
group_files_by_store: bool,
1843+
max_full_files: int,
18401844
) -> None:
18411845
await data_store.create_tree(store_id=store_id, status=Status.COMMITTED)
18421846
num_files = 5
@@ -1908,6 +1912,7 @@ async def mock_http_download_2(
19081912
data_store=data_store,
19091913
store_id=store_id,
19101914
existing_generation=0,
1915+
target_generation=num_files + 1,
19111916
root_hashes=root_hashes,
19121917
server_info=sinfo,
19131918
client_foldername=tmp_path_1,
@@ -1916,6 +1921,7 @@ async def mock_http_download_2(
19161921
proxy_url="",
19171922
downloader=None,
19181923
group_files_by_store=group_files_by_store,
1924+
maximum_full_file_count=max_full_files,
19191925
)
19201926
assert not success
19211927

@@ -1929,6 +1935,7 @@ async def mock_http_download_2(
19291935
data_store=data_store,
19301936
store_id=store_id,
19311937
existing_generation=0,
1938+
target_generation=num_files + 1,
19321939
root_hashes=root_hashes,
19331940
server_info=sinfo,
19341941
client_foldername=tmp_path_1,
@@ -1937,14 +1944,15 @@ async def mock_http_download_2(
19371944
proxy_url="",
19381945
downloader=None,
19391946
group_files_by_store=group_files_by_store,
1947+
maximum_full_file_count=max_full_files,
19401948
)
19411949
assert success
19421950

19431951
root = await data_store.get_tree_root(store_id=store_id)
19441952
assert root.generation == num_files + 1
19451953
with os.scandir(store_path) as entries:
19461954
filenames = {entry.name for entry in entries}
1947-
assert len(filenames) == 2 * (num_files + 1)
1955+
assert len(filenames) == num_files + 1 + max_full_files # 6 deltas and max_full_files full files
19481956
kv = await data_store.get_keys_values(store_id=store_id)
19491957
assert kv == kv_before
19501958

@@ -2032,6 +2040,7 @@ async def test_insert_from_delta_file_correct_file_exists(
20322040
data_store=data_store,
20332041
store_id=store_id,
20342042
existing_generation=0,
2043+
target_generation=num_files + 1,
20352044
root_hashes=root_hashes,
20362045
server_info=sinfo,
20372046
client_foldername=tmp_path,
@@ -2047,7 +2056,7 @@ async def test_insert_from_delta_file_correct_file_exists(
20472056
assert root.generation == num_files + 1
20482057
with os.scandir(store_path) as entries:
20492058
filenames = {entry.name for entry in entries}
2050-
assert len(filenames) == 2 * (num_files + 1)
2059+
assert len(filenames) == num_files + 2 # 1 full and 6 deltas
20512060
kv = await data_store.get_keys_values(store_id=store_id)
20522061
assert kv == kv_before
20532062

@@ -2094,6 +2103,7 @@ async def test_insert_from_delta_file_incorrect_file_exists(
20942103
data_store=data_store,
20952104
store_id=store_id,
20962105
existing_generation=1,
2106+
target_generation=6,
20972107
root_hashes=[incorrect_root_hash],
20982108
server_info=sinfo,
20992109
client_foldername=tmp_path,

chia/data_layer/data_layer.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
568568
servers_info = await self.data_store.get_available_servers_for_store(store_id, timestamp)
569569
# TODO: maybe append a random object to the whole DataLayer class?
570570
random.shuffle(servers_info)
571+
success = False
571572
for server_info in servers_info:
572573
url = server_info.url
573574

@@ -600,14 +601,16 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
600601
self.data_store,
601602
store_id,
602603
root.generation,
603-
[record.root for record in reversed(to_download)],
604-
server_info,
605-
self.server_files_location,
606-
self.client_timeout,
607-
self.log,
608-
proxy_url,
609-
await self.get_downloader(store_id, url),
610-
self.group_files_by_store,
604+
target_generation=singleton_record.generation,
605+
root_hashes=[record.root for record in reversed(to_download)],
606+
server_info=server_info,
607+
client_foldername=self.server_files_location,
608+
timeout=self.client_timeout,
609+
log=self.log,
610+
proxy_url=proxy_url,
611+
downloader=await self.get_downloader(store_id, url),
612+
group_files_by_store=self.group_files_by_store,
613+
maximum_full_file_count=self.maximum_full_file_count,
611614
)
612615
if success:
613616
self.log.info(
@@ -621,6 +624,30 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
621624
except Exception as e:
622625
self.log.warning(f"Exception while downloading files for {store_id}: {e} {traceback.format_exc()}.")
623626

627+
# if there aren't any servers then don't try to write the full tree
628+
if not success and len(servers_info) > 0:
629+
root = await self.data_store.get_tree_root(store_id=store_id)
630+
if root.node_hash is None:
631+
return
632+
filename_full_tree = get_full_tree_filename_path(
633+
foldername=self.server_files_location,
634+
store_id=store_id,
635+
node_hash=root.node_hash,
636+
generation=root.generation,
637+
group_by_store=self.group_files_by_store,
638+
)
639+
# Had trouble with this generation, so generate full file for the generation we currently have
640+
if not os.path.exists(filename_full_tree):
641+
with open(filename_full_tree, "wb") as writer:
642+
await self.data_store.write_tree_to_file(
643+
root=root,
644+
node_hash=root.node_hash,
645+
store_id=store_id,
646+
deltas_only=False,
647+
writer=writer,
648+
)
649+
self.log.info(f"Successfully written full tree filename {filename_full_tree}.")
650+
624651
async def get_downloader(self, store_id: bytes32, url: str) -> Optional[PluginRemote]:
625652
request_json = {"store_id": store_id.hex(), "url": url}
626653
for d in self.downloaders:

chia/data_layer/download_data.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ async def insert_into_data_store_from_file(
9292
store_id: bytes32,
9393
root_hash: Optional[bytes32],
9494
filename: Path,
95-
) -> None:
95+
) -> int:
96+
num_inserted = 0
9697
with open(filename, "rb") as reader:
9798
while True:
9899
chunk = b""
@@ -119,8 +120,10 @@ async def insert_into_data_store_from_file(
119120

120121
node_type = NodeType.TERMINAL if serialized_node.is_terminal else NodeType.INTERNAL
121122
await data_store.insert_node(node_type, serialized_node.value1, serialized_node.value2)
123+
num_inserted += 1
122124

123125
await data_store.insert_root_with_ancestor_table(store_id=store_id, node_hash=root_hash, status=Status.COMMITTED)
126+
return num_inserted
124127

125128

126129
@dataclass
@@ -233,6 +236,7 @@ async def insert_from_delta_file(
233236
data_store: DataStore,
234237
store_id: bytes32,
235238
existing_generation: int,
239+
target_generation: int,
236240
root_hashes: List[bytes32],
237241
server_info: ServerInfo,
238242
client_foldername: Path,
@@ -241,6 +245,7 @@ async def insert_from_delta_file(
241245
proxy_url: str,
242246
downloader: Optional[PluginRemote],
243247
group_files_by_store: bool = False,
248+
maximum_full_file_count: int = 1,
244249
) -> bool:
245250
if group_files_by_store:
246251
client_foldername.joinpath(f"{store_id}").mkdir(parents=True, exist_ok=True)
@@ -283,21 +288,25 @@ async def insert_from_delta_file(
283288
existing_generation,
284289
group_files_by_store,
285290
)
286-
await insert_into_data_store_from_file(
291+
num_inserted = await insert_into_data_store_from_file(
287292
data_store,
288293
store_id,
289294
None if root_hash == bytes32([0] * 32) else root_hash,
290295
target_filename_path,
291296
)
292297
log.info(
293298
f"Successfully inserted hash {root_hash} from delta file. "
294-
f"Generation: {existing_generation}. Store id: {store_id}."
299+
f"Generation: {existing_generation}. Store id: {store_id}. Nodes inserted: {num_inserted}."
295300
)
296301

297-
root = await data_store.get_tree_root(store_id=store_id)
298-
with open(filename_full_tree, "wb") as writer:
299-
await data_store.write_tree_to_file(root, root_hash, store_id, False, writer)
300-
log.info(f"Successfully written full tree filename {filename_full_tree}.")
302+
if target_generation - existing_generation <= maximum_full_file_count - 1:
303+
root = await data_store.get_tree_root(store_id=store_id)
304+
with open(filename_full_tree, "wb") as writer:
305+
await data_store.write_tree_to_file(root, root_hash, store_id, False, writer)
306+
log.info(f"Successfully written full tree filename {filename_full_tree}.")
307+
else:
308+
log.info(f"Skipping full file generation for {existing_generation}")
309+
301310
await data_store.received_correct_file(store_id, server_info)
302311
except Exception:
303312
try:

0 commit comments

Comments
 (0)