Skip to content

Commit a96a10b

Browse files
Merge pull request #203 from Aiven-Open/s3-handle-max-parts-limit-for-mulitpart-upload
s3: respect max num of parts per upload
2 parents 1627355 + 735a4be commit a96a10b

File tree

3 files changed

+81
-13
lines changed

3 files changed

+81
-13
lines changed

rohmu/object_storage/config.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,24 @@ def calculate_azure_max_block_size() -> int:
6262
LOCAL_CHUNK_SIZE: Final[int] = 1024 * 1024
6363

6464

65+
S3_MAX_NUM_PARTS_PER_UPLOAD: Final[int] = 10000
66+
S3_MIN_PART_SIZE_MB: Final[int] = 5
67+
S3_MAX_PART_SIZE_MB: Final[int] = 524
68+
S3_MIN_PART_SIZE_BYTES: Final[int] = S3_MIN_PART_SIZE_MB * 1024 * 1024
69+
S3_MAX_PART_SIZE_BYTES: Final[int] = S3_MAX_PART_SIZE_MB * 1024 * 1024
70+
71+
6572
def calculate_s3_chunk_size() -> int:
6673
total_mem_mib = get_total_memory() or 0
6774
# At least 5 MiB, at most 524 MiB. Max block size used for hosts with ~210+ GB of memory
68-
return max(min(int(total_mem_mib / 400), 524), 5) * 1024 * 1024
75+
return max(min(int(total_mem_mib / 400), S3_MAX_PART_SIZE_MB), S3_MIN_PART_SIZE_MB) * 1024 * 1024
6976

7077

7178
# Set chunk size based on host memory. S3 supports up to 10k chunks and up to 5 TiB individual
7279
# files. Minimum chunk size is 5 MiB, which means max ~50 GB files can be uploaded. In order to get
7380
# to that 5 TiB increase the block size based on host memory; we don't want to use the max for all
7481
# hosts to avoid allocating too large portion of all available memory.
75-
S3_MULTIPART_CHUNK_SIZE: Final[int] = calculate_s3_chunk_size()
82+
S3_DEFAULT_MULTIPART_CHUNK_SIZE: Final[int] = calculate_s3_chunk_size()
7683
S3_READ_BLOCK_SIZE: Final[int] = 1024 * 1024 * 1
7784

7885

@@ -145,7 +152,7 @@ class S3ObjectStorageConfig(StorageModel):
145152
is_secure: bool = False
146153
is_verify_tls: bool = False
147154
cert_path: Optional[Path] = None
148-
segment_size: int = S3_MULTIPART_CHUNK_SIZE
155+
segment_size: int = S3_DEFAULT_MULTIPART_CHUNK_SIZE
149156
encrypted: bool = False
150157
proxy_info: Optional[ProxyInfo] = None
151158
connect_timeout: Optional[str] = None

rohmu/object_storage/s3.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
)
3434
from rohmu.object_storage.config import ( # noqa: F401
3535
calculate_s3_chunk_size as calculate_chunk_size,
36-
S3_MULTIPART_CHUNK_SIZE as MULTIPART_CHUNK_SIZE,
36+
S3_DEFAULT_MULTIPART_CHUNK_SIZE as MULTIPART_CHUNK_SIZE,
37+
S3_MAX_NUM_PARTS_PER_UPLOAD,
38+
S3_MAX_PART_SIZE_BYTES,
3739
S3_READ_BLOCK_SIZE as READ_BLOCK_SIZE,
3840
S3AddressingStyle,
3941
S3ObjectStorageConfig as Config,
@@ -156,7 +158,7 @@ def __init__(
156158
self.read_timeout = read_timeout
157159
self.aws_session_token = aws_session_token
158160
self.use_dualstack_endpoint = use_dualstack_endpoint
159-
self.multipart_chunk_size = segment_size
161+
self.default_multipart_chunk_size = segment_size
160162
self.encrypted = encrypted
161163
self.s3_client: Optional[S3Client] = None
162164
self.location = ""
@@ -486,6 +488,29 @@ def get_file_size(self, key: str) -> int:
486488
else:
487489
raise StorageError(f"File size lookup failed for {path}") from ex
488490

491+
def calculate_chunks_and_chunk_size(self, size: Optional[int]) -> tuple[int, int]:
492+
"""Calculate the number of chunks and chunk size for multipart upload.
493+
494+
If sizes provided self.default_multipart_chunk_size wil be used as first attempt,
495+
if number of chunks is greater than S3_MAX_NUM_PARTS_PER_UPLOAD, chunk size will be doubled,
496+
until the number of chunks is less than S3_MAX_NUM_PARTS_PER_UPLOAD.
497+
"""
498+
if size is None:
499+
return 1, self.default_multipart_chunk_size
500+
chunks = math.ceil(size / self.default_multipart_chunk_size)
501+
chunk_size = self.default_multipart_chunk_size
502+
503+
if chunks > S3_MAX_NUM_PARTS_PER_UPLOAD:
504+
chunk_size = math.ceil(size / S3_MAX_NUM_PARTS_PER_UPLOAD)
505+
if chunk_size > S3_MAX_PART_SIZE_BYTES:
506+
raise StorageError(
507+
f"Cannot upload a file of size {size}. "
508+
f"Chunk size {chunk_size} is too big for each part of multipart upload."
509+
)
510+
chunks = math.ceil(size / chunk_size)
511+
512+
return chunks, chunk_size
513+
489514
def multipart_upload_file_object(
490515
self,
491516
*,
@@ -500,11 +525,11 @@ def multipart_upload_file_object(
500525
start_of_multipart_upload = time.monotonic()
501526
bytes_sent = 0
502527

503-
chunks: int = 1
504-
if size is not None:
505-
chunks = math.ceil(size / self.multipart_chunk_size)
528+
chunks, chunk_size = self.calculate_chunks_and_chunk_size(size)
506529
args, sanitized_metadata, path = self._init_args_for_multipart(key, metadata, mimetype, cache_control)
507-
self.log.debug("Starting to upload multipart file: %r, size: %s, chunks: %s", path, size, chunks)
530+
self.log.debug(
531+
"Starting to upload multipart file: %r, size: %s, chunks: %d (chunk size: %d)", path, size, chunks, chunk_size
532+
)
508533

509534
parts: list[CompletedPartTypeDef] = []
510535
part_number = 1
@@ -518,7 +543,7 @@ def multipart_upload_file_object(
518543
mp_id = cmu_response["UploadId"]
519544

520545
while True:
521-
data = self._read_bytes(fp, self.multipart_chunk_size)
546+
data = self._read_bytes(fp, chunk_size)
522547
if not data:
523548
break
524549

@@ -635,7 +660,7 @@ def store_file_object(
635660
upload_progress_fn: IncrementalProgressCallbackType = None,
636661
) -> None:
637662
if not self._should_multipart(
638-
fd=fd, chunk_size=self.multipart_chunk_size, default=True, metadata=metadata, multipart=multipart
663+
fd=fd, chunk_size=self.default_multipart_chunk_size, default=True, metadata=metadata, multipart=multipart
639664
):
640665
data = fd.read()
641666
self.store_file_from_memory(key, data, metadata, cache_control=cache_control, mimetype=mimetype)

test/object_storage/test_s3.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from pathlib import Path
99
from pydantic.v1 import ValidationError
1010
from rohmu.common.models import StorageOperation
11-
from rohmu.errors import InvalidByteRangeError
11+
from rohmu.errors import InvalidByteRangeError, StorageError
1212
from rohmu.object_storage.base import TransferWithConcurrentUploadSupport
1313
from rohmu.object_storage.config import S3ObjectStorageConfig
1414
from rohmu.object_storage.s3 import S3Transfer
@@ -85,7 +85,7 @@ def test_store_file_object_large(infra: S3Infra) -> None:
8585
chunk_size = len(test_data) // 2
8686
file_object = BytesIO(test_data)
8787

88-
infra.transfer.multipart_chunk_size = chunk_size # simulate smaller chunk size to force multiple chunks
88+
infra.transfer.default_multipart_chunk_size = chunk_size # simulate smaller chunk size to force multiple chunks
8989

9090
metadata = {"Content-Length": len(test_data), "some-date": datetime(2022, 11, 15, 18, 30, 58, 486644)}
9191
infra.transfer.store_file_object(key="test_key2", fd=file_object, metadata=metadata, multipart=True)
@@ -373,3 +373,39 @@ def test_cert_path(is_verify_tls: bool, cert_path: Optional[Path], expected: Uni
373373
)
374374
mock.assert_called_once()
375375
assert mock.call_args[1]["verify"] == expected
376+
377+
378+
def mb_to_bytes(size: int) -> int:
379+
return size * 1024 * 1024
380+
381+
382+
@pytest.mark.parametrize(
383+
("file_size", "default_multipart_chunk_size", "expected_chunks", "expected_chunk_size"),
384+
[
385+
# for a 100 MB file, on a system with ~4 GB of RAM, default chunk size is 9 MB, so we expect 12 chunks of 9 MB each
386+
(100, 9, 12, 9),
387+
(10_000, 9, 1112, 9),
388+
# for a 150 GB file, on a system with ~4 GB of RAM, default chunk size is 9 MB, so we expect 10000 chunks of 15 MB
389+
# each (we increase chunk size to fit the file size in 10000 chunks)
390+
(150_000, 9, 10_000, 15),
391+
],
392+
)
393+
def test_calculate_chunks_and_chunk_size(
394+
infra: S3Infra, file_size: int, default_multipart_chunk_size: int, expected_chunks: int, expected_chunk_size: int
395+
) -> None:
396+
t = infra.transfer
397+
t.default_multipart_chunk_size = mb_to_bytes(default_multipart_chunk_size)
398+
chunks, chunk_size = t.calculate_chunks_and_chunk_size(mb_to_bytes(file_size))
399+
assert chunks == expected_chunks
400+
assert chunk_size == mb_to_bytes(expected_chunk_size)
401+
402+
403+
def test_calculate_chunks_and_chunk_size_error(infra: S3Infra) -> None:
404+
t = infra.transfer
405+
t.default_multipart_chunk_size = mb_to_bytes(9)
406+
with pytest.raises(StorageError) as e:
407+
t.calculate_chunks_and_chunk_size(mb_to_bytes(50000000))
408+
assert (
409+
str(e.value) == "Cannot upload a file of size 52428800000000. "
410+
"Chunk size 5242880000 is too big for each part of multipart upload."
411+
)

0 commit comments

Comments
 (0)