Skip to content

Commit 10fe04c

Browse files
committed
fix: handle multipart uploads with >1000 parts for AWS/RadosGW
* For very large files (hundreds of GB), multipart uploads in the current UI uploader can exceed 1000 parts * Unlike MinIO, AWS and RadosGW part listings return at most 1000 parts per response. Code now handles pagination to retrieve all parts * Added manual test to verify the fix (disabled by default). MinIO's implementation differs by returning full listings unconditionally and cannot be used to test the fix. → See test_multipart.py for manual test execution details
1 parent aaa4e82 commit 10fe04c

File tree

5 files changed

+99
-15
lines changed

5 files changed

+99
-15
lines changed

invenio_s3/multipart_client.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,31 @@ def get_parts(self, max_parts):
5757
:param max_parts: The maximum number of parts to list.
5858
:returns: The list of parts, including checksums and etags.
5959
"""
60-
ret = sync(
61-
self.fs.loop,
62-
self.s3_client.list_parts,
63-
Bucket=self.bucket,
64-
Key=self.key,
65-
UploadId=self.upload_id,
66-
MaxParts=max_parts,
67-
PartNumberMarker=0,
68-
)
69-
return ret.get("Parts", [])
60+
parts = []
61+
list_parts_kwargs = {
62+
"Bucket": self.bucket,
63+
"Key": self.key,
64+
"UploadId": self.upload_id,
65+
"MaxParts": max_parts,
66+
}
67+
68+
# S3 API on CEPH/Amazon returns at most 1000 parts per request,
69+
# so we need to loop until we get all parts. Minio returns all parts
70+
# in one request.
71+
while True:
72+
ret = sync(
73+
self.fs.loop,
74+
self.s3_client.list_parts,
75+
**list_parts_kwargs,
76+
)
77+
if not ret.get("Parts"):
78+
break
79+
parts.extend(ret.get("Parts", []))
80+
if not ret.get("IsTruncated", False):
81+
# if the response is not truncated, finish listing
82+
break
83+
list_parts_kwargs["PartNumberMarker"] = ret["NextPartNumberMarker"]
84+
return parts
7085

7186
def upload_part(self, part_number, data):
7287
"""Upload a part of the multipart upload. Will be used only in tests.

run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,6 @@ fi
4747
python -m check_manifest
4848
python -m sphinx.cmd.build -qnNW docs docs/_build/html
4949
eval "$(docker-services-cli up --s3 ${S3:-minio} --env)"
50-
python -m pytest ${pytest_args[@]+"${pytest_args[@]}"}
50+
python -m pytest -k "not manual" ${pytest_args[@]+"${pytest_args[@]}"}
5151
tests_exit_code=$?
5252
exit "$tests_exit_code"

setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,5 @@ ignore =
6767
[tool:pytest]
6868
addopts = --black --isort --pydocstyle --doctest-glob="*.rst" --doctest-modules --cov=invenio_s3 --cov-report=term-missing
6969
testpaths = tests invenio_s3
70+
markers =
71+
manual: Mark test as manual, not to be run by default.

tests/conftest.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def app_config(app_config):
2525
app_config["S3_ENDPOINT_URL"] = os.environ["S3_ENDPOINT_URL"]
2626
app_config["S3_ACCESS_KEY_ID"] = os.environ["S3_ACCESS_KEY_ID"]
2727
app_config["S3_SECRET_ACCESS_KEY"] = os.environ["S3_SECRET_ACCESS_KEY"]
28+
app_config["S3_BUCKET"] = os.environ.get("S3_BUCKET", "default")
2829
return app_config
2930

3031

@@ -45,15 +46,15 @@ def s3fs(app_config):
4546

4647

4748
@pytest.fixture()
48-
def s3_bucket():
49+
def s3_bucket(app_config):
4950
"""S3 test path."""
50-
return "s3://default"
51+
return f"s3://{app_config['S3_BUCKET']}"
5152

5253

5354
@pytest.fixture()
54-
def s3_path():
55+
def s3_path(s3_bucket):
5556
"""S3 test path."""
56-
return "s3://default/file.txt"
57+
return f"{s3_bucket}/file.txt"
5758

5859

5960
@pytest.fixture(scope="function")

tests/test_multipart.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
13
import pytest
24

35
MB = 2**20
@@ -71,3 +73,67 @@ def test_set_content_not_supported(base_app, s3_storage):
7173
s3_storage.multipart_set_content(
7274
1, b"0" * part_size, part_size, **upload_metadata
7375
)
76+
77+
78+
# Test for large number of parts
79+
#
80+
# This test is marked as manual because it must be run against an S3 instance
81+
# such as AWS S3 or CEPH RADOS Gateway that has a ListParts limit of 1000.
82+
# It is not suitable for local testing with MinIO as it does not have this limit
83+
# and will return all parts in a single request.
84+
#
85+
# To run this test, set the following environment variables:
86+
#
87+
# ```bash`
88+
# export S3_ENDPOINT_URL=https://<your-s3-endpoint>
89+
# export S3_ACCESS_KEY_ID=<your-access-key>
90+
# export S3_SECRET_ACCESS_KEY=<your-secret-key>
91+
# export S3_BUCKET=<your-bucket-name>
92+
# export AWS_REQUEST_CHECKSUM_CALCULATION=when_required
93+
# export AWS_RESPONSE_CHECKSUM_VALIDATION=when_required
94+
# ```
95+
# Note: the bucket must exist before running the test.
96+
#
97+
# and run the test with:
98+
#
99+
# ```bash
100+
# pytest tests/test_multipart.py::test_multipart_flow_large_number_of_parts
101+
# ```
102+
#
103+
# The test will create a multipart upload with 1560 parts, each of size 5 MB,
104+
# totalling upload size of 7.8 GB. Might take a long time to run on slow networks.
105+
#
106+
@pytest.mark.manual()
107+
def test_multipart_flow_large_number_of_parts(base_app, s3_storage):
108+
part_size = 5 * MB
109+
parts = 1560
110+
111+
# initialize the upload
112+
upload_metadata = dict(parts=parts, part_size=part_size, size=parts * part_size)
113+
upload_metadata |= s3_storage.multipart_initialize_upload(**upload_metadata) or {}
114+
115+
# check that links are generated
116+
links = s3_storage.multipart_links(**upload_metadata)["parts"]
117+
assert len(links) == parts
118+
assert links[0]["part"] == 1
119+
assert "url" in links[0]
120+
assert links[-1]["part"] == parts
121+
assert "url" in links[-1]
122+
123+
# upload the parts manually
124+
part_data = b"0" * part_size
125+
126+
def upload_part(part_number):
127+
# running in a different thread, so we need to initialize the app context
128+
with base_app.app_context():
129+
# upload the part
130+
multipart_file = s3_storage.multipart_file(upload_metadata["uploadId"])
131+
multipart_file.upload_part(part_number + 1, part_data)
132+
133+
executor = ThreadPoolExecutor(10)
134+
uploaded_count = 0
135+
for _ in executor.map(upload_part, range(parts)):
136+
uploaded_count += 1
137+
executor.shutdown(wait=True)
138+
assert uploaded_count == parts
139+
s3_storage.multipart_commit_upload(**upload_metadata)

0 commit comments

Comments
 (0)