Skip to content

Commit 76374b0

Browse files
authored
parallel missing hash collection (#19552)
* parallel missing hash collection * more * breakdown * update chia_rs
1 parent 75758c9 commit 76374b0

File tree

2 files changed

+98
-58
lines changed

2 files changed

+98
-58
lines changed

chia/data_layer/data_store.py

Lines changed: 97 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
import shutil
88
import sqlite3
99
from collections import defaultdict
10-
from collections.abc import AsyncIterator, Awaitable, Collection
10+
from collections.abc import AsyncIterator, Awaitable, Sequence
1111
from contextlib import asynccontextmanager
1212
from dataclasses import dataclass, replace
1313
from hashlib import sha256
1414
from pathlib import Path
1515
from typing import Any, BinaryIO, Callable, Optional, Union
1616

1717
import aiosqlite
18+
import anyio.to_thread
1819
import chia_rs.datalayer
1920
import zstd
2021
from chia_rs.datalayer import (
@@ -211,15 +212,14 @@ async def insert_into_data_store_from_file(
211212

212213
if len(missing_hashes) > 0:
213214
# TODO: consider adding transactions around this code
214-
merkle_blob_queries = await self.build_merkle_blob_queries_for_missing_hashes(
215-
missing_hashes, root, store_id
216-
)
217-
218-
# TODO: consider parallel collection
219-
for old_root_hash, indexes in merkle_blob_queries.items():
220-
delta_reader.collect_from_merkle_blob(
221-
self.get_merkle_path(store_id=store_id, root_hash=old_root_hash), indexes=indexes
222-
)
215+
merkle_blob_queries = await self.build_merkle_blob_queries_for_missing_hashes(missing_hashes, store_id)
216+
if len(merkle_blob_queries) > 0:
217+
jobs = [
218+
(self.get_merkle_path(store_id=store_id, root_hash=old_root_hash), indexes)
219+
for old_root_hash, indexes in merkle_blob_queries.items()
220+
]
221+
await anyio.to_thread.run_sync(delta_reader.collect_from_merkle_blobs, jobs)
222+
await self.build_cache_and_collect_missing_hashes(root, store_id, delta_reader)
223223

224224
merkle_blob, _ = delta_reader.create_merkle_blob(root_hash, set())
225225

@@ -228,63 +228,103 @@ async def insert_into_data_store_from_file(
228228

229229
async def build_merkle_blob_queries_for_missing_hashes(
230230
self,
231-
missing_hashes: Collection[bytes32],
232-
root: Root,
231+
missing_hashes: set[bytes32],
233232
store_id: bytes32,
234233
) -> defaultdict[bytes32, list[TreeIndex]]:
235234
queries = defaultdict[bytes32, list[TreeIndex]](list)
236235

237-
if missing_hashes:
238-
async with self.db_wrapper.reader() as reader:
239-
cursor = await reader.execute(
240-
# TODO: the INDEXED BY seems like it shouldn't be needed, figure out why it is
241-
# https://sqlite.org/lang_indexedby.html: admonished to omit all use of INDEXED BY
242-
# https://sqlite.org/queryplanner-ng.html#howtofix
243-
"SELECT MAX(generation) FROM nodes INDEXED BY nodes_generation_index WHERE store_id = ?",
244-
(store_id,),
245-
)
246-
row = await cursor.fetchone()
247-
if row is None or row[0] is None:
248-
current_generation = 0
249-
else:
250-
current_generation = row[0]
251-
252236
batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10)
253237

238+
async with self.db_wrapper.reader() as reader:
239+
for batch in to_batches(missing_hashes, batch_size):
240+
placeholders = ",".join(["?"] * len(batch.entries))
241+
query = f"""
242+
SELECT hash, root_hash, idx
243+
FROM nodes
244+
WHERE store_id = ? AND hash IN ({placeholders})
245+
LIMIT {len(batch.entries)}
246+
"""
247+
248+
async with reader.execute(query, (store_id, *batch.entries)) as cursor:
249+
rows = await cursor.fetchall()
250+
for row in rows:
251+
root_hash_blob = bytes32(row["root_hash"])
252+
index = TreeIndex(row["idx"])
253+
queries[root_hash_blob].append(index)
254+
255+
return queries
256+
257+
async def build_cache_and_collect_missing_hashes(
258+
self,
259+
root: Root,
260+
store_id: bytes32,
261+
delta_reader: DeltaReader,
262+
) -> None:
263+
missing_hashes = delta_reader.get_missing_hashes()
264+
265+
if len(missing_hashes) == 0:
266+
return
267+
268+
async with self.db_wrapper.reader() as reader:
269+
cursor = await reader.execute(
270+
# TODO: the INDEXED BY seems like it shouldn't be needed, figure out why it is
271+
# https://sqlite.org/lang_indexedby.html: admonished to omit all use of INDEXED BY
272+
# https://sqlite.org/queryplanner-ng.html#howtofix
273+
"SELECT MAX(generation) FROM nodes INDEXED BY nodes_generation_index WHERE store_id = ?",
274+
(store_id,),
275+
)
276+
generation_row = await cursor.fetchone()
277+
if generation_row is None or generation_row[0] is None:
278+
current_generation = 0
279+
else:
280+
current_generation = generation_row[0]
281+
generations: Sequence[int] = [current_generation]
282+
254283
while missing_hashes:
255-
found_hashes: set[bytes32] = set()
256-
async with self.db_wrapper.reader() as reader:
257-
for batch in to_batches(missing_hashes, batch_size):
258-
placeholders = ",".join(["?"] * len(batch.entries))
259-
query = f"""
260-
SELECT hash, root_hash, idx
261-
FROM nodes
262-
WHERE store_id = ? AND hash IN ({placeholders})
263-
LIMIT {len(batch.entries)}
264-
"""
284+
if current_generation >= root.generation:
285+
raise Exception("Invalid delta file, cannot find all the required hashes")
265286

266-
async with reader.execute(query, (store_id, *batch.entries)) as cursor:
267-
rows = await cursor.fetchall()
268-
for row in rows:
269-
node_hash = bytes32(row["hash"])
270-
root_hash_blob = bytes32(row["root_hash"])
271-
index = TreeIndex(row["idx"])
272-
if node_hash in found_hashes:
273-
raise Exception("Internal error: duplicate node_hash found in nodes table")
274-
queries[root_hash_blob].append(index)
275-
found_hashes.add(node_hash)
276-
277-
missing_hashes = [hash for hash in missing_hashes if hash not in found_hashes]
278-
if missing_hashes:
279-
if current_generation < root.generation:
280-
current_generation += 1
281-
else:
282-
raise Exception("Invalid delta file, cannot find all the required hashes")
287+
current_generation = generations[-1] + 1
283288

284-
await self.add_node_hashes(store_id, current_generation)
285-
log.info(f"Missing hashes: added old hashes from generation {current_generation}")
289+
# TODO: at least shouldn't be hard coded
290+
batch_size = 10
291+
generations = range(
292+
current_generation,
293+
min(current_generation + batch_size, root.generation),
294+
)
295+
jobs: list[tuple[bytes32, Path]] = []
296+
generations_by_root_hash: dict[bytes32, int] = {}
297+
for generation in generations:
298+
generation_root = await self.get_tree_root(store_id=store_id, generation=generation)
299+
if generation_root.node_hash is None:
300+
# no need to process an empty generation
301+
continue
302+
path = self.get_merkle_path(store_id=store_id, root_hash=generation_root.node_hash)
303+
jobs.append((generation_root.node_hash, path))
304+
generations_by_root_hash[generation_root.node_hash] = generation
305+
306+
found = await anyio.to_thread.run_sync(
307+
delta_reader.collect_and_return_from_merkle_blobs,
308+
jobs,
309+
missing_hashes,
310+
)
311+
async with self.db_wrapper.writer() as writer:
312+
await writer.executemany(
313+
"""
314+
INSERT
315+
OR IGNORE INTO nodes(store_id, hash, root_hash, generation, idx)
316+
VALUES (?, ?, ?, ?, ?)
317+
""",
318+
(
319+
(store_id, hash, root_hash, generations_by_root_hash[root_hash], index.raw)
320+
for root_hash, map in found
321+
for hash, index in map.items()
322+
),
323+
)
286324

287-
return queries
325+
missing_hashes = delta_reader.get_missing_hashes()
326+
327+
log.info(f"Missing hashes: added old hashes from generation {current_generation}")
288328

289329
async def read_from_file(
290330
self, filename: Path, store_id: bytes32

poetry.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,7 +827,7 @@ typing-extensions = "*"
827827
type = "git"
828828
url = "https://github.com/chia-network/chia_rs"
829829
reference = "long_lived/initial_datalayer"
830-
resolved_reference = "4fe774fe2149d550776f26c7380a6aefb74407f6"
830+
resolved_reference = "14cd09555b53b210ac45c720d4eb634af6d8c769"
831831
subdirectory = "wheel/"
832832

833833
[[package]]

0 commit comments

Comments
 (0)