Skip to content

Commit 743741d

Browse files
TheGreatAlgoFaolain0xSwegocoderabbitai[bot]
authored
feat: Sharding (#68)
* feat: support partials * fix: re-add accidentally deleted _map_byte_request * fix: tidying up * fix: full coverage * fix: re-order * fix: update ruff and mypy * fix: pre-commit * deps: update kubo to latest in tests * test: add test for ipfs gateway partials * fix: sharding * fix: converter * fix: more work on sharding * fix: pinning * fix: pre-commit * fix: tidying * fix: target * fix: fixing types * fix: more type cleanups * fix: remove unused * fix: change imports * fix: fix ruff * fix: remove aiohttp * fix: add tuple * fix: ruff and version * fix: remove aiohttp * fix: remove -s * fix: more changes * fix: test era5 * Update test_sharded_zarr_store.py * fix: logging * fix: async pinning, chunker * fix: dag cbor * fix: remove print * fix: sharding print and default * fix: fix race condition * fix: revert metadata logic * fix: debug key * fix: more debug * fix: more debug * fix: array shape * fix: tests and race condition on caches * fix: more locks * fix: more tests * fix: remove comment * fix: tests * fix: update test * fix: add concat test * fix: helpful test for local testing * fix: kubo store httpx coverage * fix: full coverage * fix: fix mypy * ci: update ipfs from 0.35 to 0.36 * ci: revert back to 0.35 from 0.36 to find out why Error: IPFS API service unreachable * fix: change integer math * fix: print debug * fix: remove debug * fix: reformat * fix: print key * fix: update tests * fix: update formatting * fix: update cids * fix: more changes * fix: remove duplicate * fix: with read only * Update py_hamt/store_httpx.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * fix: fix casing * fix: linting * lint(ruff): add stricter rule to __init__ * fix: lru cache * fix: zarr coverage * fix: final tests * fix: small changes * fix: small updates * fix: remove duplicate --------- Co-authored-by: Faolain <[email protected]> Co-authored-by: 0xSwego <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent de33efe commit 743741d

26 files changed

+6505
-54
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ repos:
1616
- id: mixed-line-ending
1717
- id: trailing-whitespace
1818
- repo: https://github.com/charliermarsh/ruff-pre-commit
19-
rev: v0.11.11
19+
rev: v0.12.12
2020
hooks:
2121
- id: ruff-check
2222
- id: ruff-format

CLAUDE.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Common Development Commands
6+
7+
Setup environment:
8+
```bash
9+
uv sync
10+
source .venv/bin/activate
11+
pre-commit install
12+
```
13+
14+
Run all checks (tests, linting, formatting, type checking):
15+
```bash
16+
bash run-checks.sh
17+
```
18+
19+
Run tests:
20+
```bash
21+
# All tests (requires IPFS daemon or Docker)
22+
pytest --ipfs --cov=py_hamt tests/
23+
24+
# Quick tests without IPFS integration
25+
pytest --cov=py_hamt tests/
26+
27+
# Single test file
28+
pytest tests/test_hamt.py
29+
30+
# Coverage report
31+
uv run coverage report --fail-under=100 --show-missing
32+
```
33+
34+
Linting and formatting:
35+
```bash
36+
# Run all pre-commit hooks
37+
uv run pre-commit run --all-files --show-diff-on-failure
38+
39+
# Fix auto-fixable ruff issues
40+
uv run ruff check --fix
41+
```
42+
43+
Type checking and other tools:
44+
```bash
45+
# Type checking is handled by pre-commit hooks (mypy)
46+
# Documentation preview
47+
uv run pdoc py_hamt
48+
```
49+
50+
## Architecture Overview
51+
52+
py-hamt implements a Hash Array Mapped Trie (HAMT) for IPFS/IPLD content-addressed storage. The core architecture follows this pattern:
53+
54+
1. **ContentAddressedStore (CAS)** - Abstract storage layer (store.py)
55+
- `KuboCAS` - IPFS/Kubo implementation for production
56+
- `InMemoryCAS` - In-memory implementation for testing
57+
58+
2. **HAMT** - Core data structure (hamt.py)
59+
- Uses blake3 hashing by default
60+
- Implements content-addressed trie for efficient key-value storage
61+
- Supports async operations for large datasets
62+
63+
3. **ZarrHAMTStore** - Zarr integration (zarr_hamt_store.py)
64+
- Implements zarr.abc.store.Store interface
65+
- Enables storing large Zarr arrays on IPFS via HAMT
66+
- Keys stored verbatim, values as raw bytes
67+
68+
4. **Encryption Layer** - Optional encryption (encryption_hamt_store.py)
69+
- `SimpleEncryptedZarrHAMTStore` for fully encrypted storage
70+
71+
## Key Design Patterns
72+
73+
- All storage operations are async to handle IPFS network calls
74+
- Content addressing means identical data gets same hash/CID
75+
- HAMT provides O(log n) access time for large key sets
76+
- Store abstractions allow swapping storage backends
77+
- Type hints required throughout (mypy enforced)
78+
- 100% test coverage required with hypothesis property-based testing
79+
80+
## IPFS Integration Requirements
81+
82+
Tests require either:
83+
- Local IPFS daemon running (`ipfs daemon`)
84+
- Docker available for containerized IPFS
85+
- Neither (unit tests only, integration tests skip)
86+
87+
The `--ipfs` pytest flag controls IPFS test execution.

fsgs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717

1818
async def main():
19-
cid = "bafyr4iecw3faqyvj75psutabk2jxpddpjdokdy5b26jdnjjzpkzbgb5xoq"
19+
cid = "bafyr4ibiduv7ml3jeyl3gn6cjcrcizfqss7j64rywpbj3whr7tc6xipt3y"
2020

2121
# Use KuboCAS as an async context manager
2222
async with KuboCAS() as kubo_cas: # connects to a local kubo node

public_gateway_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async def fetch_zarr_from_gateway(cid: str, gateway: str = "https://ipfs.io"):
5353

5454
async def main():
5555
# Example CID - this points to a weather dataset stored on IPFS
56-
cid = "bafyr4iecw3faqyvj75psutabk2jxpddpjdokdy5b26jdnjjzpkzbgb5xoq"
56+
cid = "bafyr4ibiduv7ml3jeyl3gn6cjcrcizfqss7j64rywpbj3whr7tc6xipt3y"
5757

5858
# Try different public gateways
5959
gateways = [

py_hamt/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from .encryption_hamt_store import SimpleEncryptedZarrHAMTStore
22
from .hamt import HAMT, blake3_hashfn
3+
from .hamt_to_sharded_converter import convert_hamt_to_sharded, sharded_converter_cli
4+
from .sharded_zarr_store import ShardedZarrStore
35
from .store_httpx import ContentAddressedStore, InMemoryCAS, KuboCAS
46
from .zarr_hamt_store import ZarrHAMTStore
57

@@ -11,6 +13,7 @@
1113
"KuboCAS",
1214
"ZarrHAMTStore",
1315
"SimpleEncryptedZarrHAMTStore",
16+
"ShardedZarrStore",
17+
"convert_hamt_to_sharded",
18+
"sharded_converter_cli",
1419
]
15-
16-
print("Running py-hamt from source!")

py_hamt/hamt.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
Callable,
99
Dict,
1010
Iterator,
11+
Optional,
1112
cast,
1213
)
1314

@@ -589,10 +590,18 @@ async def delete(self, key: str) -> None:
589590
# If we didn't make a change, then this key must not exist within the HAMT
590591
raise KeyError
591592

592-
async def get(self, key: str) -> IPLDKind:
593+
async def get(
594+
self,
595+
key: str,
596+
offset: Optional[int] = None,
597+
length: Optional[int] = None,
598+
suffix: Optional[int] = None,
599+
) -> IPLDKind:
593600
"""Get a value."""
594601
pointer: IPLDKind = await self.get_pointer(key)
595-
data: bytes = await self.cas.load(pointer)
602+
data: bytes = await self.cas.load(
603+
pointer, offset=offset, length=length, suffix=suffix
604+
)
596605
if self.values_are_bytes:
597606
return data
598607
else:
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import argparse
2+
import asyncio
3+
import time
4+
5+
import xarray as xr
6+
from multiformats import CID
7+
8+
from .hamt import HAMT
9+
from .sharded_zarr_store import ShardedZarrStore
10+
from .store_httpx import KuboCAS
11+
from .zarr_hamt_store import ZarrHAMTStore
12+
13+
14+
async def convert_hamt_to_sharded(
15+
cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int
16+
) -> str:
17+
"""
18+
Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
19+
20+
Args:
21+
cas: An initialized ContentAddressedStore instance (KuboCAS).
22+
hamt_root_cid: The root CID of the source ZarrHAMTStore.
23+
chunks_per_shard: The number of chunks to group into a single shard in the new store.
24+
25+
Returns:
26+
The root CID of the newly created ShardedZarrStore.
27+
"""
28+
print(f"--- Starting Conversion from HAMT Root {hamt_root_cid} ---")
29+
start_time = time.perf_counter()
30+
# 1. Open the source HAMT store for reading
31+
print("Opening source HAMT store...")
32+
hamt_ro = await HAMT.build(
33+
cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True
34+
)
35+
source_store = ZarrHAMTStore(hamt_ro, read_only=True)
36+
source_dataset = xr.open_zarr(store=source_store, consolidated=True)
37+
# 2. Introspect the source array to get its configuration
38+
print("Reading metadata from source store...")
39+
40+
# Read the stores metadata to get array shape and chunk shape
41+
data_var_name = next(iter(source_dataset.data_vars))
42+
ordered_dims = list(source_dataset[data_var_name].dims)
43+
array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims)
44+
chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims)
45+
array_shape = array_shape_tuple
46+
chunk_shape = chunk_shape_tuple
47+
48+
# 3. Create the destination ShardedZarrStore for writing
49+
print(
50+
f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..."
51+
)
52+
dest_store = await ShardedZarrStore.open(
53+
cas=cas,
54+
read_only=False,
55+
array_shape=array_shape,
56+
chunk_shape=chunk_shape,
57+
chunks_per_shard=chunks_per_shard,
58+
)
59+
60+
print("Destination store initialized.")
61+
62+
# 4. Iterate and copy all data from source to destination
63+
print("Starting data migration...")
64+
count = 0
65+
async for key in hamt_ro.keys():
66+
count += 1
67+
# Read the raw data (metadata or chunk) from the source
68+
cid: CID = await hamt_ro.get_pointer(key)
69+
cid_base32_str = str(cid.encode("base32"))
70+
71+
# Write the exact same key-value pair to the destination.
72+
await dest_store.set_pointer(key, cid_base32_str)
73+
if count % 200 == 0: # pragma: no cover
74+
print(f"Migrated {count} keys...") # pragma: no cover
75+
76+
print(f"Migration of {count} total keys complete.")
77+
78+
# 5. Finalize the new store by flushing it to the CAS
79+
print("Flushing new store to get final root CID...")
80+
new_root_cid = await dest_store.flush()
81+
end_time = time.perf_counter()
82+
83+
print("\n--- Conversion Complete! ---")
84+
print(f"Total time: {end_time - start_time:.2f} seconds")
85+
print(f"New ShardedZarrStore Root CID: {new_root_cid}")
86+
return new_root_cid
87+
88+
89+
async def sharded_converter_cli():
90+
parser = argparse.ArgumentParser(
91+
description="Convert a Zarr HAMT store to a Sharded Zarr store."
92+
)
93+
parser.add_argument(
94+
"hamt_cid", type=str, help="The root CID of the source Zarr HAMT store."
95+
)
96+
parser.add_argument(
97+
"--chunks-per-shard",
98+
type=int,
99+
default=6250,
100+
help="Number of chunk CIDs to store per shard in the new store.",
101+
)
102+
parser.add_argument(
103+
"--rpc-url",
104+
type=str,
105+
default="http://127.0.0.1:5001",
106+
help="The URL of the IPFS Kubo RPC API.",
107+
)
108+
parser.add_argument(
109+
"--gateway-url",
110+
type=str,
111+
default="http://127.0.0.1:8080",
112+
help="The URL of the IPFS Gateway.",
113+
)
114+
args = parser.parse_args()
115+
# Initialize the KuboCAS client with the provided RPC and Gateway URLs
116+
async with KuboCAS(
117+
rpc_base_url=args.rpc_url, gateway_base_url=args.gateway_url
118+
) as cas_client:
119+
try:
120+
await convert_hamt_to_sharded(
121+
cas=cas_client,
122+
hamt_root_cid=args.hamt_cid,
123+
chunks_per_shard=args.chunks_per_shard,
124+
)
125+
except Exception as e:
126+
print(f"\nAn error occurred: {e}")
127+
128+
129+
if __name__ == "__main__":
130+
asyncio.run(sharded_converter_cli()) # pragma: no cover

0 commit comments

Comments
 (0)