Skip to content

Commit 28b654a

Browse files
authored
feat: implement range-based S3 Reader (#339)
- Add RangedS3Reader for efficient byte range requests with adaptive buffering - Update Rust components to support range parameters in mountpoint client - Introduce S3ReaderConstructor to configure reader types and parameters, maintaining sequential reader as default for backwards compatibility - Expose reader_constructor in S3Client, datasets, and DCP interfaces - Update user agent to include dataset and reader types - Extend test coverage with parametrized tests for both reader implementations - Add configurable S3Reader to s3torchbenchmarking module - Update documentation with usage examples and performance considerations
1 parent 13fb31b commit 28b654a

36 files changed

+2355
-238
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,6 @@ s3torchconnectorclient/**/*.rs.bk
7979

8080
# JetBrains
8181
.idea/
82+
83+
# Sphinx documentation
84+
s3torchconnector/docs/

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## TBD
22

33
### New features
4+
* Implement range-based S3 Reader for byte range requests, enabling efficient random read patterns (#339)
45

56
### Bug fixes
67
* Enable multiple CRT clients per process with different configs (#340)

README.md

Lines changed: 122 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,16 @@ For example, assuming the following directory bucket name `my-test-bucket--usw2-
115115
usw2-az1, then the URI used will look like: `s3://my-test-bucket--usw2-az1--x-s3/<PREFIX>` (**please note that the
116116
prefix for Amazon S3 Express One Zone should end with '/'**), paired with region us-west-2.
117117

118+
118119
## Distributed checkpoints
119120

120121
### Overview
121122

122123
Amazon S3 Connector for PyTorch provides robust support for PyTorch distributed checkpoints. This feature includes:
123124

124-
- `S3StorageWriter` and `S3StorageReader`: Implementations of PyTorch's StorageWriter and StorageReader interfaces.
125+
- `S3StorageWriter`: Implementation of PyTorch's StorageWriter interface.
126+
127+
- `S3StorageReader`: Implementation of PyTorch's StorageReader interface. Supports configurable reading strategies via the `reader_constructor` parameter (see [Reader Configurations](#reader-configurations)).
125128
- `S3FileSystem`: An implementation of PyTorch's FileSystemBase.
126129

127130
These tools enable seamless integration of Amazon S3 with
@@ -187,7 +190,7 @@ the load across multiple S3 partitions.
187190
#### 1. RoundRobinPrefixStrategy
188191
Distributes checkpoints across specified prefixes in a round-robin fashion, ideal for balancing data across multiple storage locations.
189192

190-
```python
193+
```py
191194
from s3torchconnector.dcp import RoundRobinPrefixStrategy, S3StorageWriter
192195

193196
model = torchvision.models.resnet18()
@@ -234,7 +237,7 @@ CHECKPOINT_URI
234237

235238
Generates binary (base-2) prefixes for optimal partitioning in distributed environments.
236239

237-
```python
240+
```py
238241
from s3torchconnector.dcp import BinaryPrefixStrategy
239242

240243
strategy = BinaryPrefixStrategy(
@@ -261,7 +264,7 @@ s3://my-bucket/checkpoints/
261264
#### 3. HexPrefixStrategy
262265

263266
Uses hexadecimal (base-16) prefixes for a balance of efficiency and readability.
264-
```
267+
```py
265268
from s3torchconnector.dcp import HexPrefixStrategy
266269

267270
strategy = HexPrefixStrategy(
@@ -288,7 +291,7 @@ s3://my-bucket/checkpoints/
288291
### Creating Custom Strategies
289292

290293
You can implement custom prefix strategies by extending the S3PrefixStrategyBase class:
291-
```
294+
```py
292295
from s3torchconnector.dcp import S3PrefixStrategyBase
293296

294297
class CustomPrefixStrategy(S3PrefixStrategyBase):
@@ -312,7 +315,7 @@ The S3IterableDataset can be directly passed to PyTorch's DataLoader for paralle
312315
By default, all worker processes will share the same list of training objects. However,
313316
if you need each worker to have access to a unique portion of the dataset for better parallelization,
314317
you can enable dataset sharding using the `enable_sharding` parameter.
315-
```
318+
```py
316319
dataset = S3IterableDataset.from_prefix(DATASET_URI, region=REGION, enable_sharding=True)
317320
dataloader = DataLoader(dataset, num_workers=4)
318321
```
@@ -324,7 +327,7 @@ Each worker, regardless of its host, will load and process a distinct subset of
324327
For the S3MapDataset, you need to pass it to DataLoader along with a [DistributedSampler](https://pytorch.org/docs/stable/data.html#torch.utils.data.distributed.DistributedSampler) wrapped around it.
325328
The DistributedSampler ensures that each worker or node receives a unique subset of the dataset,
326329
enabling efficient parallel and distributed training.
327-
```
330+
```py
328331
dataset = S3MapDataset.from_prefix(DATASET_URI, region=REGION)
329332
sampler = DistributedSampler(dataset)
330333
dataloader = DataLoader(dataset, sampler=sampler, num_workers=4)
@@ -371,6 +374,118 @@ To enable versioning on an S3 bucket, see [Enabling versioning on buckets](https
371374

372375
S3 Versioning and S3 Lifecycle are not supported by S3 Express One Zone.
373376

377+
378+
## Direct S3Client Usage
379+
380+
For advanced use cases, you can use the S3Client directly for custom streaming patterns and integration with existing pipelines.
381+
382+
```py
383+
from s3torchconnector._s3client import S3Client
384+
385+
REGION = "us-east-1"
386+
BUCKET_NAME = "my-bucket"
387+
OBJECT_KEY = "large_object.bin"
388+
389+
s3_client = S3Client(region=REGION)
390+
391+
# Writing data to S3
392+
data = b"content" * 1048576
393+
s3writer = s3_client.put_object(bucket=BUCKET_NAME, key=OBJECT_KEY)
394+
s3writer.write(data)
395+
s3writer.close()
396+
397+
# Reading data from S3
398+
s3reader = s3_client.get_object(bucket=BUCKET_NAME, key=OBJECT_KEY)
399+
data = s3reader.read()
400+
```
401+
402+
## Reader Configurations
403+
404+
Amazon S3 Connector for PyTorch supports two types of readers, configurable through `S3ReaderConstructor`.
405+
406+
### Reader Types
407+
408+
#### 1. Sequential Reader (Default)
409+
410+
- Downloads and buffers the entire S3 object in memory.
411+
- Prioritizes performance over memory usage by buffering entire objects.
412+
413+
#### 2. Range-based Reader
414+
415+
- Performs byte-range requests to read specific portions of S3 objects without downloading the entire file.
416+
- Prioritizes memory efficiency, with performance gains only for sparse partial reads.
417+
- Features adaptive buffering with forward overlap handling:
418+
- **Small reads** (< `buffer_size`): Use internal buffer to reduce S3 API calls.
419+
- **Large reads** (≥ `buffer_size`): Bypass buffer for direct transfer.
420+
421+
### When to Use Each Reader
422+
423+
- **Sequential Reader**: For processing entire files, and when repeated access to the data is required. Best for most general use cases.
424+
- **Range-based Reader**: For larger objects (100MB+) that require sparse partial reads, and in memory-constrained environments.
425+
426+
**Note**: S3Reader instances are not thread-safe and should not be shared across threads. For multiprocessing with DataLoader, each worker process creates its own S3Reader instance automatically.
427+
428+
### Examples
429+
430+
Direct method - `S3Client` usage with range-based reader without buffer:
431+
```py
432+
# Direct S3Client usage for zero-copy partial reads into pre-allocated buffers, for memory efficiency and fast data transfer
433+
from s3torchconnector._s3client import S3Client
434+
from s3torchconnector import S3ReaderConstructor
435+
436+
s3_client = S3Client(region=REGION)
437+
reader_constructor = S3ReaderConstructor.range_based(
438+
buffer_size=0 # No buffer, for direct transfer
439+
)
440+
s3reader = s3_client.get_object(
441+
bucket=BUCKET_NAME,
442+
key=OBJECT_NAME,
443+
reader_constructor=reader_constructor
444+
)
445+
446+
buffer = bytearray(10 * 1024 * 1024) # 10MB buffer
447+
s3reader.seek(100 * 1024 * 1024) # Skip to 100MB offset
448+
bytes_read = s3reader.readinto(buffer) # Direct read into buffer
449+
```
450+
451+
DCP interface - `S3StorageReader` usage with range-based reader with buffer:
452+
```py
453+
# Load distributed checkpoint with range-based reader to optimize memory usage for large checkpoint files
454+
from s3torchconnector.dcp import S3StorageReader
455+
from s3torchconnector import S3ReaderConstructor
456+
457+
reader_constructor = S3ReaderConstructor.range_based(
458+
buffer_size=16*1024*1024 # 16MB buffer
459+
)
460+
s3_storage_reader = S3StorageReader(
461+
region=REGION,
462+
path=CHECKPOINT_URI,
463+
reader_constructor=reader_constructor
464+
)
465+
DCP.load(
466+
state_dict=model_state_dict,
467+
storage_reader=s3_storage_reader,
468+
)
469+
```
470+
471+
Dataset interface - `S3MapDataset` usage with sequential reader:
472+
```py
473+
# Use sequential reader for optimal performance when reading entire objects
474+
from s3torchconnector import S3MapDataset, S3ReaderConstructor
475+
476+
dataset = S3MapDataset.from_prefix(
477+
DATASET_URI,
478+
region=REGION,
479+
reader_constructor=S3ReaderConstructor.sequential()
480+
)
481+
482+
for item in dataset:
483+
content = item.read()
484+
...
485+
```
486+
487+
For `S3ReaderConstructor` usage details, please refer to the [`S3ReaderConstructor` documentation](https://awslabs.github.io/s3-connector-for-pytorch/autoapi/s3torchconnector/s3reader/constructor/index.html).
488+
374489
## Contributing
375490

376491
We welcome contributions to Amazon S3 Connector for PyTorch. Please see [CONTRIBUTING](CONTRIBUTING.md) for more

examples/dcp/stateful_example.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from torch.distributed.device_mesh import init_device_mesh
1818
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
1919

20-
from s3torchconnector import S3ClientConfig
20+
from s3torchconnector import S3ClientConfig, S3ReaderConstructor
2121
from s3torchconnector.dcp import S3StorageWriter, S3StorageReader
2222
from s3torchconnector.dcp.s3_prefix_strategy import RoundRobinPrefixStrategy
2323

@@ -170,9 +170,15 @@ def run(rank, world_size, region, s3_uri, device="cuda"):
170170
device, model, rank, world_size
171171
)
172172
print(f"Load previously saved checkpoint on rank:{rank}")
173+
# Configure S3 reader constructor (sequential or range_based)
174+
reader_constructor = S3ReaderConstructor.sequential()
175+
# reader_constructor = S3ReaderConstructor.range_based(buffer_size=16 * 1024 * 1024)
173176
# initialize S3StorageReader with region and bucket name, before passing to dcp.load as reader
174177
storage_reader = S3StorageReader(
175-
region=region, path=s3_uri, s3client_config=s3config
178+
region=region,
179+
path=s3_uri,
180+
s3client_config=s3config,
181+
reader_constructor=reader_constructor,
176182
)
177183
dcp.load(
178184
state_dict={"model": modified_model, "optimizer": modified_optim},
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
kind: s3iterabledataset
22
batch_size: 128
3-
num_workers: 8
3+
num_workers: 8
4+
s3reader:
5+
# s3reader type: sequential or range_based
6+
type: sequential # default
7+
# buffer_size (bytes): only used with range_based s3reader type
8+
buffer_size: 8*1024*1024 # default
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
kind: s3mapdataset
22
batch_size: 128
3-
num_workers: 8
3+
num_workers: 8
4+
s3reader:
5+
# s3reader type: sequential or range_based
6+
type: sequential # default
7+
# buffer_size (bytes): only used with range_based s3reader type
8+
buffer_size: 8*1024*1024 # default

s3torchbenchmarking/conf/dataset.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ hydra:
3131
# Name of a model (valid options: "entitlement", "vit").
3232
+model: entitlement
3333
# Kind of the dataloader (valid options: "fsspec", "s3iterabledataset", "mountpoint", "mountpointcache").
34+
# For dataloader kind specific options, see specific conf/dataloader/{dataloader-kind}.yaml
3435
+dataloader: fsspec, s3iterabledataset, mountpoint, mountpointcache
3536
# Dataset name (corresponds to the name of a folder in S3); will be used to build an S3 URI
3637
+dataset: 100k_496x387_images
38+
# S3 reader sweeps (only applies to s3iterabledataset/s3mapdataset)
39+
# s3reader type: sequential or range_based
40+
dataloader.s3reader.type: sequential
41+
# buffer_size (bytes): only used with range_based s3reader
42+
dataloader.s3reader.buffer_size: 8*1024*1024

0 commit comments

Comments
 (0)