Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions invenio_s3/multipart_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,31 @@ def get_parts(self, max_parts):
:param max_parts: The maximum number of parts to list.
:returns: The list of parts, including checksums and etags.
"""
ret = sync(
self.fs.loop,
self.s3_client.list_parts,
Bucket=self.bucket,
Key=self.key,
UploadId=self.upload_id,
MaxParts=max_parts,
PartNumberMarker=0,
)
return ret.get("Parts", [])
parts = []
list_parts_kwargs = {
"Bucket": self.bucket,
"Key": self.key,
"UploadId": self.upload_id,
"MaxParts": max_parts,
}

# S3 API on CEPH/Amazon returns at most 1000 parts per request,
# so we need to loop until we get all parts. Minio returns all parts
# in one request.
while True:
ret = sync(
self.fs.loop,
self.s3_client.list_parts,
**list_parts_kwargs,
)
if not ret.get("Parts"):
break
parts.extend(ret.get("Parts", []))
if not ret.get("IsTruncated", False):
# if the response is not truncated, finish listing
break
list_parts_kwargs["PartNumberMarker"] = ret["NextPartNumberMarker"]
return parts

def upload_part(self, part_number, data):
"""Upload a part of the multipart upload. Will be used only in tests.
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ fi
python -m check_manifest
python -m sphinx.cmd.build -qnNW docs docs/_build/html
eval "$(docker-services-cli up --s3 ${S3:-minio} --env)"
python -m pytest ${pytest_args[@]+"${pytest_args[@]}"}
python -m pytest -k "not manual" ${pytest_args[@]+"${pytest_args[@]}"}
tests_exit_code=$?
exit "$tests_exit_code"
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ ignore =
[tool:pytest]
addopts = --black --isort --pydocstyle --doctest-glob="*.rst" --doctest-modules --cov=invenio_s3 --cov-report=term-missing
testpaths = tests invenio_s3
markers =
manual: Mark test as manual, not to be run by default.
9 changes: 5 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def app_config(app_config):
app_config["S3_ENDPOINT_URL"] = os.environ["S3_ENDPOINT_URL"]
app_config["S3_ACCESS_KEY_ID"] = os.environ["S3_ACCESS_KEY_ID"]
app_config["S3_SECRET_ACCESS_KEY"] = os.environ["S3_SECRET_ACCESS_KEY"]
app_config["S3_BUCKET"] = os.environ.get("S3_BUCKET", "default")
return app_config


Expand All @@ -45,15 +46,15 @@ def s3fs(app_config):


@pytest.fixture()
def s3_bucket():
def s3_bucket(app_config):
"""S3 test path."""
return "s3://default"
return f"s3://{app_config['S3_BUCKET']}"


@pytest.fixture()
def s3_path():
def s3_path(s3_bucket):
"""S3 test path."""
return "s3://default/file.txt"
return f"{s3_bucket}/file.txt"


@pytest.fixture(scope="function")
Expand Down
66 changes: 66 additions & 0 deletions tests/test_multipart.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from concurrent.futures import ThreadPoolExecutor

import pytest

MB = 2**20
Expand Down Expand Up @@ -71,3 +73,67 @@ def test_set_content_not_supported(base_app, s3_storage):
s3_storage.multipart_set_content(
1, b"0" * part_size, part_size, **upload_metadata
)


# Test for large number of parts
#
# This test is marked as manual because it must be run against an S3 instance
# such as AWS S3 or CEPH RADOS Gateway that has a ListParts limit of 1000.
# It is not suitable for local testing with MinIO as it does not have this limit
# and will return all parts in a single request.
#
# To run this test, set the following environment variables:
#
# ```bash
# export S3_ENDPOINT_URL=https://<your-s3-endpoint>
# export S3_ACCESS_KEY_ID=<your-access-key>
# export S3_SECRET_ACCESS_KEY=<your-secret-key>
# export S3_BUCKET=<your-bucket-name>
# export AWS_REQUEST_CHECKSUM_CALCULATION=when_required
# export AWS_RESPONSE_CHECKSUM_VALIDATION=when_required
# ```
# Note: the bucket must exist before running the test.
#
# and run the test with:
#
# ```bash
# pytest tests/test_multipart.py::test_multipart_flow_large_number_of_parts
# ```
#
# The test will create a multipart upload with 1560 parts, each of size 5 MB,
# totalling upload size of 7.8 GB. Might take a long time to run on slow networks.
#
@pytest.mark.manual()
def test_multipart_flow_large_number_of_parts(base_app, s3_storage):
part_size = 5 * MB
parts = 1560

# initialize the upload
upload_metadata = dict(parts=parts, part_size=part_size, size=parts * part_size)
upload_metadata |= s3_storage.multipart_initialize_upload(**upload_metadata) or {}

# check that links are generated
links = s3_storage.multipart_links(**upload_metadata)["parts"]
assert len(links) == parts
assert links[0]["part"] == 1
assert "url" in links[0]
assert links[-1]["part"] == parts
assert "url" in links[-1]

# upload the parts manually
part_data = b"0" * part_size

def upload_part(part_number):
# running in a different thread, so we need to initialize the app context
with base_app.app_context():
# upload the part
multipart_file = s3_storage.multipart_file(upload_metadata["uploadId"])
multipart_file.upload_part(part_number + 1, part_data)

executor = ThreadPoolExecutor(10)
uploaded_count = 0
for _ in executor.map(upload_part, range(parts)):
uploaded_count += 1
executor.shutdown(wait=True)
assert uploaded_count == parts
s3_storage.multipart_commit_upload(**upload_metadata)
Loading