Skip to content

Commit 45f276a

Browse files
authored
Merge pull request #76 from dClimate/ci/switch-setup-ipfs-gha
ci(gha): switch to custom gha which allows 0.36 of kubo installation
2 parents 964496b + f400438 commit 45f276a

File tree

12 files changed

+624
-150
lines changed

12 files changed

+624
-150
lines changed

.github/workflows/run-checks.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ jobs:
3737
run: uv sync
3838

3939
- name: Install IPFS
40-
uses: oduwsdl/setup-ipfs@e92fedca9f61ab9184cb74940254859f4d7af4d9 # v0.6.3
40+
uses: Faolain/setup-ipfs@v0.7.0
4141
with:
42-
ipfs_version: "0.35.0"
42+
ipfs_version: "0.36.0"
4343
run_daemon: true
4444

4545
- name: Run pytest with coverage

py_hamt/encryption_hamt_store.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,19 @@ def __init__(
110110
self.header = header
111111
self.metadata_read_cache: dict[str, bytes] = {}
112112

113+
def with_read_only(self, read_only: bool = False) -> "SimpleEncryptedZarrHAMTStore":
114+
if read_only == self.read_only:
115+
return self
116+
117+
clone = type(self).__new__(type(self))
118+
clone.hamt = self.hamt
119+
clone.encryption_key = self.encryption_key
120+
clone.header = self.header
121+
clone.metadata_read_cache = self.metadata_read_cache
122+
clone._forced_read_only = read_only # safe; attribute is declared
123+
zarr.abc.store.Store.__init__(clone, read_only=read_only)
124+
return clone
125+
113126
def _encrypt(self, val: bytes) -> bytes:
114127
"""Encrypts data using ChaCha20-Poly1305."""
115128
nonce = get_random_bytes(24) # XChaCha20 uses a 24-byte nonce

py_hamt/store_httpx.py

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import re
33
from abc import ABC, abstractmethod
4-
from typing import Any, Dict, Literal, Tuple, cast
4+
from typing import Any, Literal, Tuple, cast
55

66
import httpx
77
from dag_cbor.ipld import IPLDKind
@@ -210,27 +210,43 @@ def __init__(
210210
self.gateway_base_url: str = gateway_base_url
211211
"""@private"""
212212

213-
self._client_per_loop: Dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
214-
215213
if client is not None:
216-
# user supplied → bind it to *their* current loop
217-
self._client_per_loop[asyncio.get_running_loop()] = client
218-
self._owns_client: bool = False
214+
# A client was supplied by the user. We don't own it.
215+
self._owns_client = False
216+
self._client_per_loop = {asyncio.get_running_loop(): client}
219217
else:
220-
self._owns_client = True # we'll create clients lazily
218+
# No client supplied. We will own any clients we create.
219+
self._owns_client = True
220+
self._client_per_loop = {}
221+
222+
# The instance is never closed on initialization.
223+
self._closed = False
221224

222225
# store for later use by _loop_client()
223226
self._default_headers = headers
224227
self._default_auth = auth
225228

226229
self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
227-
self._closed: bool = False
228230

229231
# --------------------------------------------------------------------- #
230232
# helper: get or create the client bound to the current running loop #
231233
# --------------------------------------------------------------------- #
232234
def _loop_client(self) -> httpx.AsyncClient:
233-
"""Get or create a client for the current event loop."""
235+
"""Get or create a client for the current event loop.
236+
237+
If the instance was previously closed but owns its clients, a fresh
238+
client mapping is lazily created on demand. Users that supplied their
239+
own ``httpx.AsyncClient`` still receive an error when the instance has
240+
been closed, as we cannot safely recreate their client.
241+
"""
242+
if self._closed:
243+
if not self._owns_client:
244+
raise RuntimeError("KuboCAS is closed; create a new instance")
245+
# We previously closed all internally-owned clients. Reset the
246+
# state so that new clients can be created lazily.
247+
self._closed = False
248+
self._client_per_loop = {}
249+
234250
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
235251
try:
236252
return self._client_per_loop[loop]
@@ -241,7 +257,7 @@ def _loop_client(self) -> httpx.AsyncClient:
241257
headers=self._default_headers,
242258
auth=self._default_auth,
243259
limits=httpx.Limits(max_connections=64, max_keepalive_connections=32),
244-
# Uncomment when they finally support Robost HTTP/2 GOAWAY responses
260+
# Uncomment when they finally support Robust HTTP/2 GOAWAY responses
245261
# http2=True,
246262
)
247263
self._client_per_loop[loop] = client
@@ -251,18 +267,20 @@ def _loop_client(self) -> httpx.AsyncClient:
251267
# graceful shutdown: close **all** clients we own #
252268
# --------------------------------------------------------------------- #
253269
async def aclose(self) -> None:
254-
"""Close all internally-created clients."""
255-
if not self._owns_client:
256-
# User supplied the client; they are responsible for closing it.
270+
"""
271+
Closes all internally-created clients. Must be called from an async context.
272+
"""
273+
if self._owns_client is False: # external client → caller closes
257274
return
258275

276+
# This method is async, so we can reliably await the async close method.
277+
# The complex sync/async logic is handled by __del__.
259278
for client in list(self._client_per_loop.values()):
260279
if not client.is_closed:
261280
try:
262281
await client.aclose()
263282
except Exception:
264-
# Best-effort cleanup; ignore errors during shutdown
265-
pass
283+
pass # best-effort cleanup
266284

267285
self._client_per_loop.clear()
268286
self._closed = True
@@ -277,23 +295,44 @@ async def __aexit__(self, *exc: Any) -> None:
277295

278296
def __del__(self) -> None:
279297
"""Best-effort close for internally-created clients."""
298+
if not hasattr(self, "_owns_client") or not hasattr(self, "_closed"):
299+
return
300+
280301
if not self._owns_client or self._closed:
281302
return
282303

283304
# Attempt proper cleanup if possible
284305
try:
285306
loop = asyncio.get_running_loop()
286307
except RuntimeError:
287-
loop = None
308+
# No running loop - can't do async cleanup
309+
# Just clear the client references synchronously
310+
if hasattr(self, "_client_per_loop"):
311+
# We can't await client.aclose() without a loop,
312+
# so just clear the references
313+
self._client_per_loop.clear()
314+
self._closed = True
315+
return
288316

317+
# If we get here, we have a running loop
289318
try:
290-
if loop is None or not loop.is_running():
291-
asyncio.run(self.aclose())
292-
else:
319+
if loop.is_running():
320+
# Schedule cleanup in the existing loop
293321
loop.create_task(self.aclose())
322+
else:
323+
# Loop exists but not running - try asyncio.run
324+
coro = self.aclose() # Create the coroutine
325+
try:
326+
asyncio.run(coro)
327+
except Exception:
328+
# If asyncio.run fails, we need to close the coroutine properly
329+
coro.close() # This prevents the RuntimeWarning
330+
raise # Re-raise to hit the outer except block
294331
except Exception:
295-
# Suppress all errors during interpreter shutdown or loop teardown
296-
pass
332+
# If all else fails, just clear references
333+
if hasattr(self, "_client_per_loop"):
334+
self._client_per_loop.clear()
335+
self._closed = True
297336

298337
# --------------------------------------------------------------------- #
299338
# save() – now uses the per-loop client #

py_hamt/zarr_hamt_store.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class ZarrHAMTStore(zarr.abc.store.Store):
5151
```
5252
"""
5353

54+
_forced_read_only: bool | None = None # sentinel for wrapper clones
55+
5456
def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
5557
"""
5658
### `hamt` and `read_only`
@@ -79,10 +81,36 @@ def __init__(self, hamt: HAMT, read_only: bool = False) -> None:
7981
"""@private"""
8082

8183
@property
82-
def read_only(self) -> bool:
83-
"""@private"""
84+
def read_only(self) -> bool: # type: ignore[override]
85+
if self._forced_read_only is not None: # instance attr overrides
86+
return self._forced_read_only
8487
return self.hamt.read_only
8588

89+
def with_read_only(self, read_only: bool = False) -> "ZarrHAMTStore":
90+
"""
91+
Return this store (if the flag already matches) or a *shallow*
92+
clone that presents the requested read‑only status.
93+
94+
The clone **shares** the same :class:`~py_hamt.hamt.HAMT`
95+
instance; no flushing, network traffic or async work is done.
96+
"""
97+
# Fast path
98+
if read_only == self.read_only:
99+
return self # Same mode, return same instance
100+
101+
# Create new instance with different read_only flag
102+
# Creates a *bare* instance without running its __init__
103+
clone = type(self).__new__(type(self))
104+
105+
# Copy attributes that matter
106+
clone.hamt = self.hamt # Share the HAMT
107+
clone._forced_read_only = read_only
108+
clone.metadata_read_cache = self.metadata_read_cache.copy()
109+
110+
# Re‑initialise the zarr base class so that Zarr sees the flag
111+
zarr.abc.store.Store.__init__(clone, read_only=read_only)
112+
return clone
113+
86114
def __eq__(self, other: object) -> bool:
87115
"""@private"""
88116
if not isinstance(other, ZarrHAMTStore):
@@ -145,6 +173,9 @@ def supports_partial_writes(self) -> bool:
145173

146174
async def set(self, key: str, value: zarr.core.buffer.Buffer) -> None:
147175
"""@private"""
176+
if self.read_only:
177+
raise Exception("Cannot write to a read-only store.")
178+
148179
if key in self.metadata_read_cache:
149180
self.metadata_read_cache[key] = value.to_bytes()
150181
await self.hamt.set(key, value.to_bytes())
@@ -167,6 +198,8 @@ def supports_deletes(self) -> bool:
167198

168199
async def delete(self, key: str) -> None:
169200
"""@private"""
201+
if self.read_only:
202+
raise Exception("Cannot write to a read-only store.")
170203
try:
171204
await self.hamt.delete(key)
172205
# In practice these lines never seem to be needed, creating and appending data are the only operations most zarrs actually undergo

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ dependencies = [
99
"dag-cbor>=0.3.3",
1010
"msgspec>=0.18.6",
1111
"multiformats[full]>=0.3.1.post4",
12-
"zarr>=3.0.8",
12+
"zarr==3.0.9",
1313
"pycryptodome>=3.21.0",
1414
]
1515

tests/test_async.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,26 +104,30 @@ async def test_kubocas_no_running_loop_in_aclose():
104104
# Create a client in the current loop
105105
_ = cas._loop_client()
106106

107-
# Simulate calling aclose when there's no event loop
108-
# We'll mock this by calling the method directly
109-
import unittest.mock
110-
111-
# Test the __del__ method with no running loop scenario
107+
# Test __del__ behavior when there's no running loop
112108
with unittest.mock.patch(
113109
"asyncio.get_running_loop", side_effect=RuntimeError("No running loop")
114110
):
115-
# This will trigger the exception path in __del__
116-
# where it gets a RuntimeError and sets loop = None
111+
# This should handle the no-loop case gracefully
117112
cas.__del__()
118113

119-
# Now test the normal aclose path with no running loop
114+
# Also test aclose directly with no loop
115+
# First close it normally
116+
await cas.aclose()
117+
118+
# Create a new instance
119+
cas2 = KuboCAS()
120+
_ = cas2._loop_client()
121+
122+
# Now mock no running loop for aclose
120123
with unittest.mock.patch(
121124
"asyncio.get_running_loop", side_effect=RuntimeError("No running loop")
122125
):
123-
await cas.aclose()
126+
# The aclose method should handle this gracefully
127+
await cas2.aclose()
124128

125-
# The client references should be cleared
126-
assert len(cas._client_per_loop) == 0
129+
# Verify cleanup happened
130+
assert len(cas2._client_per_loop) == 0
127131

128132

129133
@pytest.mark.asyncio

0 commit comments

Comments
 (0)