Skip to content

Commit 1b25676

Browse files
OpportunisticStreamingCarBlockStore
1 parent ccb902c commit 1b25676

File tree

2 files changed

+91
-41
lines changed

2 files changed

+91
-41
lines changed

src/atmst/blockstore/car_file.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,77 @@ def get_block(self, key: bytes) -> bytes:
105105
def del_block(self, key: bytes) -> None:
106106
raise NotImplementedError("ReadOnlyCARBlockStore does not support delete()")
107107

108+
class OptimisticRetryError(KeyError):
109+
"""Raised when an optimistic streaming read fails and a retry with
110+
optimistic=False may succeed."""
111+
pass
112+
113+
class OpportunisticStreamingCarBlockStore(BlockStore):
114+
"""
115+
A BlockStore backed by a CarStreamReader. Optimistically reads blocks
116+
sequentially from the stream, assuming preorder traversal order. If a
117+
block is received out of order, falls back to slurping the rest of
118+
the stream into memory.
119+
120+
This should work in 99.999% of cases. The fallback case could fail if the CAR
121+
starts off in canonical order, but has duplicate record CIDs where the second
122+
record is *not* duplicated into the CAR.
123+
124+
In such a case, an OptimisticRetryError is raised, and the caller should
125+
retry from scratch with optimistic=False.
126+
"""
127+
128+
car_root: CID
129+
130+
def __init__(self, carstream: "CarStreamReader", optimistic: bool=True) -> None:
131+
self.car_root = carstream.car_root
132+
self._car_iter = iter(carstream)
133+
self._blocks: Dict[bytes, bytes] = {}
134+
self._optimistic = optimistic
135+
self._was_optimistic = optimistic
136+
if not optimistic:
137+
self._slurp(self._car_iter)
138+
139+
def _slurp(self, car_iter) -> None:
140+
for k, v in car_iter:
141+
self._blocks[bytes(k)] = v
142+
143+
def get_block(self, key: bytes) -> bytes:
144+
if self._optimistic:
145+
try:
146+
k, v = next(self._car_iter)
147+
except StopIteration:
148+
raise KeyError(f"block not found: {key!r}")
149+
if bytes(k) == key:
150+
return v
151+
# CAR is not canonically ordered, slurp the rest into memory
152+
self._optimistic = False
153+
self._blocks[bytes(k)] = v
154+
self._slurp(self._car_iter)
155+
try:
156+
return self._blocks[key]
157+
except KeyError:
158+
if self._was_optimistic:
159+
raise OptimisticRetryError(key)
160+
raise
161+
162+
def is_canonical(self) -> bool:
163+
if not self._optimistic:
164+
return False
165+
# check that the stream has no remaining blocks
166+
try:
167+
next(self._car_iter)
168+
return False
169+
except StopIteration:
170+
return True
171+
172+
def put_block(self, key: bytes, value: bytes) -> None:
173+
raise NotImplementedError
174+
175+
def del_block(self, key: bytes) -> None:
176+
raise NotImplementedError
177+
178+
108179
class CarStreamReader:
109180
"""
110181
Rather than pre-indexing the block offsets, this lets you iterate over the k/v pairs

src/atmst/cartool.py

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@
66

77
from cbrrr import encode_dag_cbor, decode_dag_cbor, CID
88

9-
from .blockstore.car_file import ReadOnlyCARBlockStore, encode_varint, CarStreamReader
9+
from .blockstore.car_file import ReadOnlyCARBlockStore, encode_varint, CarStreamReader, OpportunisticStreamingCarBlockStore, OptimisticRetryError
1010
from .blockstore import OverlayBlockStore
1111
from .mst.node_store import NodeStore
1212
from .mst.node_walker import NodeWalker
1313
from .mst.diff import mst_diff, record_diff
14-
from .mst.node import MSTNode
1514

1615

1716
def prettify_record(record) -> str:
@@ -110,48 +109,28 @@ def print_record_diff(car_a: str, car_b: str):
110109
for delta in record_diff(ns, mst_created, mst_deleted):
111110
print(delta)
112111

113-
def verify_car_streaming(carstream: CarStreamReader):
114-
blocks = {} # for a preorder-traversal-ordered CAR, this never grows beyond 0
115-
optimistic = [True]
116-
car_iter = iter(carstream)
117-
def lazy_get(key: CID) -> bytes:
118-
print("len", len(blocks))
119-
if optimistic[0]:
120-
try:
121-
k, v = next(car_iter)
122-
except StopIteration:
123-
raise ValueError(f"lookup failed for {key}")
124-
if k == key:
125-
return v
126-
# if we reached here the CAR is not canonically ordered
127-
optimistic[0] = False
128-
blocks[k] = v
129-
for k, v in car_iter: # slurp the entire rest of CAR into RAM
130-
blocks[k] = v
131-
# fall thru
132-
return blocks[key] # TODO: reopen input and re-slurp if this fails
133-
commit = decode_dag_cbor(lazy_get(carstream.car_root))
112+
def verify_car_streaming(carstream: CarStreamReader, optimistic: bool=True):
113+
bs = OpportunisticStreamingCarBlockStore(carstream, optimistic=optimistic)
114+
commit = decode_dag_cbor(bs.get_block(bytes(bs.car_root)))
134115
assert isinstance(commit, dict)
135-
root_cid = commit["data"]
136-
assert isinstance(root_cid, CID)
137-
def verify_mst(node_cid: CID):
138-
node = MSTNode.deserialise(lazy_get(node_cid))
139-
if node.subtrees[0] is not None:
140-
verify_mst(node.subtrees[0])
141-
for k, v, subtree in zip(node.keys, node.vals, node.subtrees[1:]):
142-
print(k)
143-
rv = lazy_get(v)
144-
print(k, len(rv))
145-
if subtree is not None:
146-
verify_mst(subtree)
147-
148-
verify_mst(root_cid)
149-
print(carstream.file.tell()) # should be at EOF now
116+
root = commit["data"]
117+
assert isinstance(root, CID)
118+
ns = NodeStore(bs)
119+
count = 0
120+
for k, v in NodeWalker(ns, root).iter_kv():
121+
bs.get_block(bytes(v)) # force-read every record block
122+
count += 1
123+
print(f"Verified {count} records (assuming commit signature is valid)")
124+
print("canonical:", bs.is_canonical())
150125

151126
def verify_car(car_path: str):
152-
with open(car_path, "rb") as carfile:
153-
carstream = CarStreamReader(carfile)
154-
verify_car_streaming(carstream)
127+
try:
128+
with open(car_path, "rb") as carfile:
129+
verify_car_streaming(CarStreamReader(carfile))
130+
except OptimisticRetryError:
131+
print("Optimistic streaming failed, retrying with full buffering...")
132+
with open(car_path, "rb") as carfile:
133+
verify_car_streaming(CarStreamReader(carfile), optimistic=False)
155134

156135
COMMANDS = {
157136
"info": (print_info, "print CAR header and repo info"),

0 commit comments

Comments
 (0)