Skip to content

Commit 3c30d8c

Browse files
authored
Feat: Adaptive request splitting in EagerStoreReader (#26)
1 parent 5cb959a commit 3c30d8c

File tree

2 files changed

+148
-75
lines changed

2 files changed

+148
-75
lines changed

src/obspec_utils/obspec.py

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,14 @@ class EagerStoreReader:
361361
subsequent reads from the in-memory cache. Useful for files that will be
362362
read multiple times or when seeking is frequent.
363363
364-
When `chunk_size` is provided, the file is fetched using parallel chunked
365-
requests via `get_ranges()`, which can significantly reduce load time for
366-
large files by maximizing parallelism. If the store supports the `Head`
367-
protocol, the file size will be determined automatically via a HEAD request.
364+
By default, the file is fetched using parallel range requests via
365+
`get_ranges()`, which can significantly improve load time for large files.
366+
The defaults (12 MB request size, max 18 concurrent requests) are tuned for
367+
cloud storage. If the store supports the `Head` protocol, the file size
368+
will be determined automatically via a HEAD request.
369+
370+
The parallel fetching strategy is based on Icechunk's approach:
371+
https://github.com/earth-mover/icechunk/blob/main/icechunk/src/storage/mod.rs
368372
369373
Works with any ReadableStore protocol implementation.
370374
@@ -377,8 +381,8 @@ class EagerStoreReader:
377381
- **Repeated random access**: After the initial load, any byte is accessible
378382
with no network latency.
379383
- **Small to medium files**: Files that fit comfortably in memory.
380-
- **Parallel initial fetch**: With `chunk_size` set, the initial load uses
381-
parallel requests for faster download.
384+
- **Parallel initial fetch**: The default settings use parallel requests
385+
for faster download on cloud storage.
382386
383387
Consider alternatives when:
384388
@@ -397,8 +401,9 @@ def __init__(
397401
self,
398402
store: ReadableStore,
399403
path: str,
400-
chunk_size: int | None = None,
404+
request_size: int = 12 * 1024 * 1024,
401405
file_size: int | None = None,
406+
max_concurrent_requests: int = 18,
402407
) -> None:
403408
"""
404409
Create an eager reader that loads the entire file into memory.
@@ -411,54 +416,62 @@ def __init__(
411416
Any object implementing the [ReadableStore][obspec_utils.obspec.ReadableStore] protocol.
412417
path
413418
The path to the file within the store.
414-
chunk_size
415-
If provided, fetch the file using parallel requests of this size.
416-
The file will be divided into chunks and fetched using `get_ranges()`.
417-
If the store supports the `Head` protocol, the file size will be
418-
determined automatically. Otherwise, `file_size` must be provided
419-
for chunked fetching to work. If None (default), fetch with a single
420-
`get()` request.
419+
request_size
420+
Target size for each parallel range request in bytes. Default is 12 MB,
421+
tuned for cloud storage throughput. The file will be divided into
422+
parts of this size and fetched using `get_ranges()`.
421423
file_size
422-
File size in bytes. If not provided and `chunk_size` is set, the
423-
reader will attempt to get the size via `store.head()` if the store
424-
supports the `Head` protocol.
424+
File size in bytes. If not provided, the reader will attempt to get
425+
the size via `store.head()` if the store supports the `Head` protocol.
426+
If the size cannot be determined, falls back to a single `get()` request.
427+
max_concurrent_requests
428+
Maximum number of parallel range requests. Default is 18. If the file
429+
would require more requests than this, request sizes are increased to
430+
fit within this limit.
425431
"""
426-
if chunk_size is None:
427-
# Single request - fetch entire file
432+
# Determine file size if not provided
433+
if file_size is None:
434+
if hasattr(store, "head") and callable(store.head):
435+
file_size = store.head(path)["size"]
436+
else:
437+
# Fall back to single request if we can't determine size
438+
result = store.get(path)
439+
data = bytes(result.buffer())
440+
self._buffer = io.BytesIO(data)
441+
return
442+
443+
# Handle empty files
444+
if file_size == 0:
445+
self._buffer = io.BytesIO(b"")
446+
return
447+
448+
# Calculate number of requests needed
449+
num_requests = (file_size + request_size - 1) // request_size
450+
451+
# Cap at max_concurrent_requests by increasing request size
452+
if num_requests > max_concurrent_requests:
453+
num_requests = max_concurrent_requests
454+
request_size = (file_size + num_requests - 1) // num_requests
455+
456+
# Skip concurrency overhead for single request
457+
if num_requests == 1:
428458
result = store.get(path)
429459
data = bytes(result.buffer())
430460
else:
431-
# Determine file size if not provided
432-
if file_size is None:
433-
if hasattr(store, "head") and callable(store.head):
434-
file_size = store.head(path)["size"]
435-
else:
436-
# Fall back to single request if we can't determine size
437-
result = store.get(path)
438-
data = bytes(result.buffer())
439-
self._buffer = io.BytesIO(data)
440-
return
441-
442-
# Parallel chunked requests
443-
if file_size == 0:
444-
data = b""
445-
else:
446-
# Calculate chunk boundaries
447-
num_chunks = (file_size + chunk_size - 1) // chunk_size
448-
449-
starts = []
450-
lengths = []
451-
for i in range(num_chunks):
452-
start = i * chunk_size
453-
length = min(chunk_size, file_size - start)
454-
starts.append(start)
455-
lengths.append(length)
456-
457-
# Fetch all chunks in parallel
458-
results = store.get_ranges(path, starts=starts, lengths=lengths)
459-
460-
# Concatenate chunks into single buffer
461-
data = b"".join(bytes(chunk) for chunk in results)
461+
# Parallel range requests
462+
starts = []
463+
lengths = []
464+
for i in range(num_requests):
465+
start = i * request_size
466+
length = min(request_size, file_size - start)
467+
starts.append(start)
468+
lengths.append(length)
469+
470+
# Fetch all parts in parallel
471+
results = store.get_ranges(path, starts=starts, lengths=lengths)
472+
473+
# Concatenate into single buffer
474+
data = b"".join(bytes(part) for part in results)
462475

463476
self._buffer = io.BytesIO(data)
464477

tests/test_registry.py

Lines changed: 85 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -476,8 +476,8 @@ async def __aiter__(self):
476476
yield self._data
477477

478478

479-
def test_eager_reader_with_chunk_size_and_file_size():
480-
"""Test EagerStoreReader uses get_ranges when chunk_size and file_size provided."""
479+
def test_eager_reader_with_request_size_and_file_size():
480+
"""Test EagerStoreReader uses get_ranges when request_size and file_size provided."""
481481
from obspec_utils.tracing import TracingReadableStore, RequestTrace
482482

483483
# Create test data (16 bytes)
@@ -488,22 +488,22 @@ def test_eager_reader_with_chunk_size_and_file_size():
488488
trace = RequestTrace()
489489
traced_store = TracingReadableStore(mock_store, trace)
490490

491-
# Create reader with chunk_size and file_size
491+
# Create reader with request_size and file_size
492492
reader = EagerStoreReader(
493-
traced_store, "test.txt", chunk_size=4, file_size=len(data)
493+
traced_store, "test.txt", request_size=4, file_size=len(data)
494494
)
495495

496496
# Verify the data is correct
497497
assert reader.read() == data
498498

499499
# Verify get_ranges was used (not get)
500500
summary = trace.summary()
501-
assert summary["total_requests"] == 4 # 16 bytes / 4 byte chunks = 4 requests
501+
assert summary["total_requests"] == 4 # 16 bytes / 4 byte requests = 4 requests
502502
assert all(r.method == "get_ranges" for r in trace.requests)
503503
assert summary["total_bytes"] == len(data)
504504

505505

506-
def test_eager_reader_with_chunk_size_uses_head():
506+
def test_eager_reader_uses_head():
507507
"""Test EagerStoreReader uses head() to get file size when available."""
508508
from obspec_utils.tracing import TracingReadableStore, RequestTrace
509509

@@ -515,16 +515,16 @@ def test_eager_reader_with_chunk_size_uses_head():
515515
trace = RequestTrace()
516516
traced_store = TracingReadableStore(mock_store, trace)
517517

518-
# Create reader with chunk_size but no file_size
518+
# Create reader with request_size but no file_size
519519
# Store has head() method so it should be used
520-
reader = EagerStoreReader(traced_store, "test.txt", chunk_size=4)
520+
reader = EagerStoreReader(traced_store, "test.txt", request_size=4)
521521

522522
# Verify the data is correct
523523
assert reader.read() == data
524524

525525
# Verify get_ranges was used (head() call isn't traced, only data requests)
526526
summary = trace.summary()
527-
assert summary["total_requests"] == 4 # 16 bytes / 4 byte chunks
527+
assert summary["total_requests"] == 4 # 16 bytes / 4 byte requests
528528
assert all(r.method == "get_ranges" for r in trace.requests)
529529
assert summary["total_bytes"] == len(data)
530530

@@ -541,9 +541,9 @@ def test_eager_reader_falls_back_to_single_get():
541541
trace = RequestTrace()
542542
traced_store = TracingReadableStore(mock_store, trace)
543543

544-
# Create reader with chunk_size but no file_size and no head()
544+
# Create reader without file_size and no head()
545545
# Should fall back to single get() request
546-
reader = EagerStoreReader(traced_store, "test.txt", chunk_size=4)
546+
reader = EagerStoreReader(traced_store, "test.txt", request_size=4)
547547

548548
# Verify the data is correct
549549
assert reader.read() == data
@@ -555,25 +555,25 @@ def test_eager_reader_falls_back_to_single_get():
555555
assert summary["total_bytes"] == len(data)
556556

557557

558-
def test_eager_reader_no_chunk_size():
559-
"""Test EagerStoreReader uses single get() when no chunk_size specified."""
558+
def test_eager_reader_small_file_uses_single_get():
559+
"""Test EagerStoreReader uses single get() when file fits in one request."""
560560
from obspec_utils.tracing import TracingReadableStore, RequestTrace
561561

562-
# Create test data
562+
# Create test data smaller than default request_size (12 MB)
563563
data = b"0123456789ABCDEF"
564564
mock_store = MockReadableStoreWithHead(data)
565565

566566
# Wrap with tracing
567567
trace = RequestTrace()
568568
traced_store = TracingReadableStore(mock_store, trace)
569569

570-
# Create reader without chunk_size
570+
# Create reader with default settings - file is smaller than request_size
571571
reader = EagerStoreReader(traced_store, "test.txt")
572572

573573
# Verify the data is correct
574574
assert reader.read() == data
575575

576-
# Verify single get() was used
576+
# Verify single get() was used (skips concurrency overhead)
577577
summary = trace.summary()
578578
assert summary["total_requests"] == 1
579579
assert trace.requests[0].method == "get"
@@ -591,8 +591,8 @@ def test_eager_reader_empty_file():
591591
trace = RequestTrace()
592592
traced_store = TracingReadableStore(mock_store, trace)
593593

594-
# Create reader with chunk_size and file_size=0
595-
reader = EagerStoreReader(traced_store, "test.txt", chunk_size=4, file_size=0)
594+
# Create reader with file_size=0
595+
reader = EagerStoreReader(traced_store, "test.txt", request_size=4, file_size=0)
596596

597597
# Verify the data is empty
598598
assert reader.read() == b""
@@ -601,36 +601,96 @@ def test_eager_reader_empty_file():
601601
assert trace.total_requests == 0
602602

603603

604-
def test_eager_reader_chunk_boundaries():
605-
"""Test EagerStoreReader handles non-aligned chunk boundaries."""
604+
def test_eager_reader_request_boundaries():
605+
"""Test EagerStoreReader handles non-aligned request boundaries."""
606606
from obspec_utils.tracing import TracingReadableStore, RequestTrace
607607

608-
# Create test data (10 bytes, not evenly divisible by chunk_size=4)
608+
# Create test data (10 bytes, not evenly divisible by request_size=4)
609609
data = b"0123456789"
610610
mock_store = MockReadableStoreWithHead(data)
611611

612612
# Wrap with tracing
613613
trace = RequestTrace()
614614
traced_store = TracingReadableStore(mock_store, trace)
615615

616-
# Create reader with chunk_size=4, file_size=10
616+
# Create reader with request_size=4, file_size=10
617617
reader = EagerStoreReader(
618-
traced_store, "test.txt", chunk_size=4, file_size=len(data)
618+
traced_store, "test.txt", request_size=4, file_size=len(data)
619619
)
620620

621621
# Verify the data is correct
622622
assert reader.read() == data
623623

624-
# Should be 3 chunks: 0-3 (4 bytes), 4-7 (4 bytes), 8-9 (2 bytes)
624+
# Should be 3 requests: 0-3 (4 bytes), 4-7 (4 bytes), 8-9 (2 bytes)
625625
summary = trace.summary()
626626
assert summary["total_requests"] == 3
627627
assert summary["total_bytes"] == len(data)
628628

629-
# Verify chunk sizes
629+
# Verify request sizes
630630
lengths = [r.length for r in trace.requests]
631631
assert lengths == [4, 4, 2]
632632

633633

634+
def test_eager_reader_max_concurrent_requests():
635+
"""Test EagerStoreReader caps requests at max_concurrent_requests."""
636+
from obspec_utils.tracing import TracingReadableStore, RequestTrace
637+
638+
# Create test data (100 bytes)
639+
data = b"x" * 100
640+
mock_store = MockReadableStoreWithHead(data)
641+
642+
# Wrap with tracing
643+
trace = RequestTrace()
644+
traced_store = TracingReadableStore(mock_store, trace)
645+
646+
# With request_size=10, would need 10 requests
647+
# But max_concurrent_requests=4, so should redistribute to 4 requests
648+
reader = EagerStoreReader(
649+
traced_store,
650+
"test.txt",
651+
request_size=10,
652+
file_size=len(data),
653+
max_concurrent_requests=4,
654+
)
655+
656+
# Verify the data is correct
657+
assert reader.read() == data
658+
659+
# Should be capped at 4 requests
660+
summary = trace.summary()
661+
assert summary["total_requests"] == 4
662+
assert summary["total_bytes"] == len(data)
663+
664+
665+
def test_eager_reader_redistribution_even_split():
666+
"""Test EagerStoreReader redistributes evenly when capping requests."""
667+
from obspec_utils.tracing import TracingReadableStore, RequestTrace
668+
669+
# Create test data (100 bytes)
670+
data = b"x" * 100
671+
mock_store = MockReadableStoreWithHead(data)
672+
673+
# Wrap with tracing
674+
trace = RequestTrace()
675+
traced_store = TracingReadableStore(mock_store, trace)
676+
677+
# With request_size=10, would need 10 requests
678+
# With max_concurrent_requests=4, should get 4 requests of 25 bytes each
679+
reader = EagerStoreReader(
680+
traced_store,
681+
"test.txt",
682+
request_size=10,
683+
file_size=len(data),
684+
max_concurrent_requests=4,
685+
)
686+
687+
assert reader.read() == data
688+
689+
# Verify redistributed request sizes (25, 25, 25, 25)
690+
lengths = [r.length for r in trace.requests]
691+
assert lengths == [25, 25, 25, 25]
692+
693+
634694
@pytest.mark.parametrize("ReaderClass", ALL_READERS)
635695
def test_reader_context_manager(ReaderClass):
636696
"""Test that readers work as context managers and release resources."""

0 commit comments

Comments
 (0)