Skip to content

Commit 487b35a

Browse files
Merge pull request #212 from Aiven-Open/daniel.blasina-calculate-max-file-size
Transfer return max file size
2 parents 345608d + 127a558 commit 487b35a

File tree

8 files changed

+50
-5
lines changed

8 files changed

+50
-5
lines changed

rohmu/object_storage/azure.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from rohmu.object_storage.config import ( # noqa: F401
3131
AZURE_ENDPOINT_SUFFIXES as ENDPOINT_SUFFIXES,
3232
AZURE_MAX_BLOCK_SIZE as MAX_BLOCK_SIZE,
33+
AZURE_MAX_NUM_PARTS_PER_UPLOAD as MAX_NUM_PARTS_PER_UPLOAD,
3334
AzureObjectStorageConfig as Config,
3435
calculate_azure_max_block_size as calculate_max_block_size,
3536
)
@@ -139,6 +140,9 @@ def create_object_store_if_needed(self) -> None:
139140
else:
140141
raise TransferObjectStoreInitializationError() from ex
141142

143+
def calculate_max_unknown_file_size(self) -> int:
144+
return MAX_NUM_PARTS_PER_UPLOAD * MAX_BLOCK_SIZE
145+
142146
def get_blob_service_client(self) -> BlobServiceClient:
143147
if self._blob_service_client is None:
144148
self._blob_service_client = BlobServiceClient.from_connection_string(
@@ -178,7 +182,7 @@ def conn_string(
178182
def copy_file(
179183
self, *, source_key: str, destination_key: str, metadata: Optional[Metadata] = None, **kwargs: Any
180184
) -> None:
181-
timeout = kwargs.get("timeout") or 15.0
185+
timeout = kwargs.get("timeout") or 15
182186
self._copy_file_from_bucket(
183187
source_bucket=self, source_key=source_key, destination_key=destination_key, metadata=metadata, timeout=timeout
184188
)
@@ -190,7 +194,7 @@ def _copy_file_from_bucket(
190194
source_key: str,
191195
destination_key: str,
192196
metadata: Optional[Metadata] = None,
193-
timeout: float = 15.0,
197+
timeout: int = 15,
194198
) -> None:
195199
source_path = source_bucket.format_key_for_backend(source_key, remove_slash_prefix=True, trailing_slash=False)
196200
source_client = source_bucket.get_blob_service_client().get_blob_client(source_bucket.container_name, source_path)

rohmu/object_storage/base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import logging
4242
import os
43+
import sys
4344

4445
KEY_TYPE_OBJECT = "object"
4546
KEY_TYPE_PREFIX = "prefix"
@@ -230,7 +231,7 @@ def _copy_file_from_bucket(
230231
source_key: str,
231232
destination_key: str,
232233
metadata: Optional[Metadata] = None,
233-
timeout: float = 15.0,
234+
timeout: int = 15,
234235
) -> None:
235236
raise NotImplementedError
236237

@@ -271,6 +272,9 @@ def delete_tree(self, key: str, preserve_trailing_slash: bool = False) -> None:
271272
names = [item["name"] for item in self.list_path(key, with_metadata=False, deep=True)]
272273
self.delete_keys(names, preserve_trailing_slash=preserve_trailing_slash)
273274

275+
def calculate_max_unknown_file_size(self) -> int:
276+
return sys.maxsize
277+
274278
def get_contents_to_file(
275279
self, key: str, filepath_to_store_to: AnyPath, *, progress_callback: ProgressProportionCallbackType = None
276280
) -> Metadata:

rohmu/object_storage/config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ def calculate_azure_max_block_size() -> int:
5050
# to that 5 TiB increase the block size based on host memory; we don't want to use the max 100 for all
5151
# hosts because the uploader will allocate (with default settings) 3 x block size of memory.
5252
AZURE_MAX_BLOCK_SIZE: Final[int] = calculate_azure_max_block_size()
53-
53+
AZURE_MAX_NUM_PARTS_PER_UPLOAD: Final[int] = 10000
5454

5555
# googleapiclient download performs some 3-4 times better with 50 MB chunk size than 5 MB chunk size;
5656
# but decrypting/decompressing big chunks needs a lot of memory so use smaller chunks on systems with less
5757
# than 2 GB RAM
5858
GOOGLE_DOWNLOAD_CHUNK_SIZE: Final[int] = 1024 * 1024 * 5 if (get_total_memory() or 0) < 2048 else 1024 * 1024 * 50
5959
GOOGLE_UPLOAD_CHUNK_SIZE: Final[int] = 1024 * 1024 * 5
60-
60+
GOOGLE_MAX_NUM_PARTS_PER_UPLOAD: Final[int] = 10000
6161

6262
LOCAL_CHUNK_SIZE: Final[int] = 1024 * 1024
6363

@@ -85,6 +85,7 @@ def calculate_s3_chunk_size() -> int:
8585

8686
SWIFT_CHUNK_SIZE: Final[int] = 1024 * 1024 * 5 # 5 Mi
8787
SWIFT_SEGMENT_SIZE: Final[int] = 1024 * 1024 * 1024 * 3 # 3 Gi
88+
SWIFT_MAX_NUM_PARTS_PER_UPLOAD = 10000
8889

8990

9091
class AzureObjectStorageConfig(StorageModel):

rohmu/object_storage/google.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
)
4444
from rohmu.object_storage.config import (
4545
GOOGLE_DOWNLOAD_CHUNK_SIZE as DOWNLOAD_CHUNK_SIZE,
46+
GOOGLE_MAX_NUM_PARTS_PER_UPLOAD as MAX_NUM_PARTS_PER_UPLOAD,
4647
GOOGLE_UPLOAD_CHUNK_SIZE as UPLOAD_CHUNK_SIZE,
4748
GoogleObjectStorageConfig as Config,
4849
)
@@ -401,6 +402,9 @@ def _copy_file_from_bucket(
401402
self.notifier.object_copied(key=destination_key, size=size, metadata=metadata)
402403
reporter.report(self.stats)
403404

405+
def calculate_max_unknown_file_size(self) -> int:
406+
return MAX_NUM_PARTS_PER_UPLOAD * UPLOAD_CHUNK_SIZE
407+
404408
def get_metadata_for_key(self, key: str) -> Metadata:
405409
path = self.format_key_for_backend(key)
406410
with self._object_client(not_found=path) as clob:

rohmu/object_storage/s3.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ def __init__(
178178
def _verify_object_storage_unwrapped(self) -> None:
179179
self.check_or_create_bucket(create_if_needed=False)
180180

181+
def calculate_max_unknown_file_size(self) -> int:
182+
return self.default_multipart_chunk_size * S3_MAX_NUM_PARTS_PER_UPLOAD
183+
181184
def verify_object_storage(self) -> None:
182185
try:
183186
self._verify_object_storage_unwrapped()

rohmu/object_storage/swift.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
)
2525
from rohmu.object_storage.config import (
2626
SWIFT_CHUNK_SIZE as CHUNK_SIZE,
27+
SWIFT_MAX_NUM_PARTS_PER_UPLOAD as MAX_NUM_PARTS_PER_UPLOAD,
2728
SWIFT_SEGMENT_SIZE as SEGMENT_SIZE,
2829
SwiftObjectStorageConfig as Config,
2930
)
@@ -153,6 +154,9 @@ def get_metadata_for_key(self, key: str) -> Metadata:
153154
path = self.format_key_for_backend(key)
154155
return self._metadata_for_key(path)
155156

157+
def calculate_max_unknown_file_size(self) -> int:
158+
return MAX_NUM_PARTS_PER_UPLOAD * CHUNK_SIZE
159+
156160
def _metadata_for_key(self, key: str, *, resolve_manifest: bool = False) -> Metadata:
157161
try:
158162
headers = self.conn.head_object(self.container_name, key)

test/object_storage/test_local.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import json
1616
import os
1717
import pytest
18+
import sys
1819

1920

2021
def test_store_file_from_disk() -> None:
@@ -37,6 +38,15 @@ def test_store_file_from_disk() -> None:
3738
)
3839

3940

41+
def test_calculate_max_unknown_file_size() -> None:
42+
with TemporaryDirectory() as destdir:
43+
transfer = LocalTransfer(
44+
directory=destdir,
45+
)
46+
47+
assert transfer.calculate_max_unknown_file_size() == sys.maxsize
48+
49+
4050
def test_store_file_object() -> None:
4151
with TemporaryDirectory() as destdir:
4252
notifier = MagicMock()

test/object_storage/test_s3.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@ def _get_session(cls: S3Transfer) -> Iterator[MagicMock]:
4949
return transfer
5050

5151

52+
def test_calculate_max_unknown_file_size(mocker: Any) -> None:
53+
segment_size = 100
54+
transfer = make_mock_transfer(
55+
mocker,
56+
{
57+
"region": "test-region",
58+
"bucket_name": "test-bucket",
59+
"prefix": "test-prefix",
60+
"segment_size": segment_size,
61+
},
62+
)
63+
64+
assert transfer.calculate_max_unknown_file_size() == segment_size * S3_MAX_NUM_PARTS_PER_UPLOAD
65+
66+
5267
@pytest.fixture(name="infra")
5368
def fixture_infra(mocker: Any) -> Iterator[S3Infra]:
5469
operation = mocker.patch("rohmu.common.statsd.StatsClient.operation")

0 commit comments

Comments
 (0)