Skip to content

Commit 6f1b77a

Browse files
authored
feat(dcp): add support for S3 CopyObject API (#242)
Add support for S3 `CopyObject` API, binding Python and Rust clients together. Bump versions for mountpoint-s3-client and mountpoint-s3-crt (required to use the `CopyObject` API).
1 parent 712c2a3 commit 6f1b77a

File tree

12 files changed

+267
-57
lines changed

12 files changed

+267
-57
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,8 @@ venv/
3030
*.egg
3131
multirun/
3232

33+
# Unit test / coverage reports
34+
.hypothesis/
35+
3336
# Prevent publishing file with third party licenses
3437
THIRD-PARTY-LICENSES

s3torchconnector/src/s3torchconnector/_s3client/_s3client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,11 @@ def head_object(self, bucket: str, key: str) -> ObjectInfo:
130130
def delete_object(self, bucket: str, key: str) -> None:
131131
log.debug(f"DeleteObject s3://{bucket}/{key}")
132132
self._client.delete_object(bucket, key)
133+
134+
def copy_object(
135+
self, src_bucket: str, src_key: str, dst_bucket: str, dst_key: str
136+
) -> None:
137+
log.debug(
138+
f"CopyObject s3://{src_bucket}/{src_key} to s3://{dst_bucket}/{dst_key}"
139+
)
140+
return self._client.copy_object(src_bucket, src_key, dst_bucket, dst_key)

s3torchconnector/tst/unit/test_s3_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ def test_list_objects_log(s3_client: S3Client, caplog):
6060
assert f"ListObjects {S3_URI}" in caplog.messages
6161

6262

63+
def test_delete_object_log(s3_client: S3Client, caplog):
64+
with caplog.at_level(logging.DEBUG):
65+
s3_client.delete_object(TEST_BUCKET, TEST_KEY)
66+
assert f"DeleteObject {S3_URI}" in caplog.messages
67+
68+
69+
def test_copy_object_log(s3_client: S3Client, caplog):
70+
dst_bucket, dst_key = "dst_bucket", "dst_key"
71+
72+
with caplog.at_level(logging.DEBUG):
73+
s3_client.copy_object(TEST_BUCKET, TEST_KEY, dst_bucket, dst_key)
74+
assert f"CopyObject {S3_URI} to s3://{dst_bucket}/{dst_key}" in caplog.messages
75+
76+
6377
def test_s3_client_default_user_agent():
6478
s3_client = S3Client(region=TEST_REGION)
6579
expected_user_agent = f"s3torchconnector/{__version__}"

s3torchconnectorclient/Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

s3torchconnectorclient/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ built = "0.7"
1919
pyo3 = { version = "0.19.2" }
2020
pyo3-log = "0.8.3"
2121
futures = "0.3.28"
22-
mountpoint-s3-client = { version = "0.10.0", features = ["mock"] }
23-
mountpoint-s3-crt = "0.9.0"
22+
mountpoint-s3-client = { version = "0.11.0", features = ["mock"] }
23+
mountpoint-s3-crt = "0.10.0"
2424
log = "0.4.20"
2525
tracing = { version = "0.1.40", default-features = false, features = ["std", "log"] }
2626
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"]}

s3torchconnectorclient/pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ dependencies = []
2525

2626
[project.optional-dependencies]
2727
test = [
28+
"boto3",
2829
"pytest",
2930
"pytest-timeout",
3031
"hypothesis",
3132
"flake8",
3233
"black",
33-
"mypy"
34+
"mypy",
35+
"Pillow"
3436
]
3537

3638
[tool.setuptools.packages]

s3torchconnectorclient/python/src/s3torchconnectorclient/_mountpoint_s3_client.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class MountpointS3Client:
3535
) -> ListObjectStream: ...
3636
def head_object(self, bucket: str, key: str) -> ObjectInfo: ...
3737
def delete_object(self, bucket: str, key: str) -> None: ...
38+
def copy_object(
39+
self, src_bucket: str, src_key: str, dst_bucket: str, dst_key: str
40+
) -> None: ...
3841

3942
class MockMountpointS3Client:
4043
throughput_target_gbps: float

s3torchconnectorclient/python/tst/integration/conftest.py

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import io
55
import os
66
import random
7+
from dataclasses import dataclass, field
8+
from typing import Optional
79

810
import boto3
911
import numpy as np
@@ -18,33 +20,28 @@ def getenv(var: str, optional: bool = False) -> str:
1820
return v
1921

2022

21-
class BucketPrefixFixture(object):
23+
@dataclass
24+
class BucketPrefixFixture:
2225
"""An S3 bucket/prefix and its contents for use in a single unit test. The prefix will be unique
2326
to this instance, so other concurrent tests won't affect its state."""
2427

25-
region: str
26-
bucket: str
27-
prefix: str
28-
storage_class: str = None
29-
endpoint_url: str = None
30-
31-
def __init__(
32-
self,
33-
region: str,
34-
bucket: str,
35-
prefix: str,
36-
storage_class: str = None,
37-
endpoint_url: str = None,
38-
):
39-
self.bucket = bucket
40-
self.prefix = prefix
41-
self.region = region
42-
self.storage_class = storage_class
43-
self.endpoint_url = endpoint_url
44-
self.contents = {}
45-
session = boto3.Session(region_name=region)
28+
name: str
29+
30+
region: str = getenv("CI_REGION")
31+
bucket: str = getenv("CI_BUCKET")
32+
prefix: str = getenv("CI_PREFIX")
33+
storage_class: Optional[str] = getenv("CI_STORAGE_CLASS", optional=True)
34+
endpoint_url: Optional[str] = getenv("CI_CUSTOM_ENDPOINT_URL", optional=True)
35+
contents: dict = field(default_factory=dict)
36+
37+
def __post_init__(self):
38+
assert self.prefix == "" or self.prefix.endswith("/")
39+
session = boto3.Session(region_name=self.region)
4640
self.s3 = session.client("s3")
4741

42+
nonce = random.randrange(2**64)
43+
self.prefix = f"{self.prefix}{self.name}/{nonce}/"
44+
4845
@property
4946
def s3_uri(self):
5047
return f"s3://{self.bucket}/{self.prefix}"
@@ -55,34 +52,47 @@ def add(self, key: str, contents: bytes, **kwargs):
5552
self.s3.put_object(Bucket=self.bucket, Key=full_key, Body=contents, **kwargs)
5653
self.contents[full_key] = contents
5754

55+
def remove(self, key: str):
56+
full_key = f"{self.prefix}{key}"
57+
self.s3.delete_object(Bucket=self.bucket, Key=full_key)
58+
5859
def __getitem__(self, index):
5960
return self.contents[index]
6061

6162
def __iter__(self):
6263
return iter(self.contents)
6364

6465

65-
def get_test_bucket_prefix(name: str) -> BucketPrefixFixture:
66-
"""Create a new bucket/prefix fixture for the given test name."""
67-
bucket = getenv("CI_BUCKET")
68-
prefix = getenv("CI_PREFIX")
69-
region = getenv("CI_REGION")
70-
storage_class = getenv("CI_STORAGE_CLASS", optional=True)
71-
endpoint_url = getenv("CI_CUSTOM_ENDPOINT_URL", optional=True)
72-
assert prefix == "" or prefix.endswith("/")
66+
@dataclass
67+
class CopyBucketFixture(BucketPrefixFixture):
68+
src_key: str = "src.txt"
69+
dst_key: str = "dst.txt"
70+
71+
@property
72+
def full_src_key(self):
73+
return self.prefix + self.src_key
74+
75+
@property
76+
def full_dst_key(self):
77+
return self.prefix + self.dst_key
78+
79+
80+
def get_test_copy_bucket_fixture(name: str) -> CopyBucketFixture:
81+
copy_bucket_fixture = CopyBucketFixture(name=name)
7382

74-
nonce = random.randrange(2**64)
75-
prefix = f"{prefix}{name}/{nonce}/"
83+
# set up / teardown
84+
copy_bucket_fixture.add(copy_bucket_fixture.src_key, b"Hello, World!\n")
85+
copy_bucket_fixture.remove(copy_bucket_fixture.dst_key)
7686

77-
return BucketPrefixFixture(region, bucket, prefix, storage_class, endpoint_url)
87+
return copy_bucket_fixture
7888

7989

8090
@pytest.fixture
8191
def image_directory(request) -> BucketPrefixFixture:
8292
"""Create a bucket/prefix fixture that contains a directory of random JPG image files."""
8393
NUM_IMAGES = 10
8494
IMAGE_SIZE = 100
85-
fixture = get_test_bucket_prefix(f"{request.node.name}/image_directory")
95+
fixture = BucketPrefixFixture(f"{request.node.name}/image_directory")
8696
for i in range(NUM_IMAGES):
8797
data = np.random.randint(0, 256, IMAGE_SIZE * IMAGE_SIZE * 3, np.uint8)
8898
data = data.reshape(IMAGE_SIZE, IMAGE_SIZE, 3)
@@ -100,23 +110,28 @@ def image_directory(request) -> BucketPrefixFixture:
100110

101111
@pytest.fixture
102112
def sample_directory(request) -> BucketPrefixFixture:
103-
fixture = get_test_bucket_prefix(f"{request.node.name}/sample_files")
113+
fixture = BucketPrefixFixture(f"{request.node.name}/sample_files")
104114
fixture.add("hello_world.txt", b"Hello, World!\n")
105115
return fixture
106116

107117

108118
@pytest.fixture
109119
def put_object_tests_directory(request) -> BucketPrefixFixture:
110-
fixture = get_test_bucket_prefix(f"{request.node.name}/put_integration_tests")
120+
fixture = BucketPrefixFixture(f"{request.node.name}/put_integration_tests")
111121
fixture.add("to_overwrite.txt", b"before")
112122
return fixture
113123

114124

115125
@pytest.fixture
116126
def checkpoint_directory(request) -> BucketPrefixFixture:
117-
return get_test_bucket_prefix(f"{request.node.name}/checkpoint_directory")
127+
return BucketPrefixFixture(f"{request.node.name}/checkpoint_directory")
118128

119129

120130
@pytest.fixture
121131
def empty_directory(request) -> BucketPrefixFixture:
122-
return get_test_bucket_prefix(f"{request.node.name}/empty_directory")
132+
return BucketPrefixFixture(f"{request.node.name}/empty_directory")
133+
134+
135+
@pytest.fixture
136+
def copy_directory(request) -> CopyBucketFixture:
137+
return get_test_copy_bucket_fixture(f"{request.node.name}/copy_directory")

s3torchconnectorclient/python/tst/integration/test_mountpoint_s3_integration.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
ListObjectStream,
2222
)
2323

24-
from conftest import BucketPrefixFixture
24+
from conftest import BucketPrefixFixture, CopyBucketFixture
2525

2626
logging.basicConfig(
2727
format="%(levelname)s %(name)s %(asctime)-15s %(filename)s:%(lineno)d %(message)s"
@@ -404,6 +404,91 @@ def test_delete_object_invalid_bucket(
404404
)
405405

406406

407+
def test_copy_object(copy_directory: CopyBucketFixture):
408+
full_src_key, full_dst_key = (
409+
copy_directory.full_src_key,
410+
copy_directory.full_dst_key,
411+
)
412+
bucket = copy_directory.bucket
413+
414+
client = MountpointS3Client(copy_directory.region, TEST_USER_AGENT_PREFIX)
415+
416+
client.copy_object(
417+
src_bucket=bucket, src_key=full_src_key, dst_bucket=bucket, dst_key=full_dst_key
418+
)
419+
420+
src_object = client.get_object(bucket, full_src_key)
421+
dst_object = client.get_object(bucket, full_dst_key)
422+
423+
assert dst_object.key == full_dst_key
424+
assert b"".join(dst_object) == b"".join(src_object)
425+
426+
427+
def test_copy_object_raises_when_source_bucket_does_not_exist(
428+
copy_directory: CopyBucketFixture,
429+
):
430+
full_src_key, full_dst_key = (
431+
copy_directory.full_src_key,
432+
copy_directory.full_dst_key,
433+
)
434+
435+
client = MountpointS3Client(copy_directory.region, TEST_USER_AGENT_PREFIX)
436+
# TODO: error message looks unexpected for Express One Zone, compared to the other tests for non-existing bucket or
437+
# key (see below)
438+
error_message = (
439+
"Client error: Forbidden: <no message>"
440+
if copy_directory.storage_class == "EXPRESS_ONEZONE"
441+
else "Service error: The object was not found"
442+
)
443+
444+
with pytest.raises(S3Exception, match=error_message):
445+
client.copy_object(
446+
src_bucket=str(uuid.uuid4()),
447+
src_key=full_src_key,
448+
dst_bucket=copy_directory.bucket,
449+
dst_key=full_dst_key,
450+
)
451+
452+
453+
def test_copy_object_raises_when_destination_bucket_does_not_exist(
454+
copy_directory: CopyBucketFixture,
455+
):
456+
full_src_key, full_dst_key = (
457+
copy_directory.full_src_key,
458+
copy_directory.full_dst_key,
459+
)
460+
461+
client = MountpointS3Client(copy_directory.region, TEST_USER_AGENT_PREFIX)
462+
463+
# NOTE: `copy_object` and its underlying implementation does not
464+
# differentiate between `NoSuchBucket` and `NoSuchKey` errors.
465+
with pytest.raises(S3Exception, match="Service error: The object was not found"):
466+
client.copy_object(
467+
src_bucket=copy_directory.bucket,
468+
src_key=full_src_key,
469+
dst_bucket=str(uuid.uuid4()),
470+
dst_key=full_dst_key,
471+
)
472+
473+
474+
def test_copy_object_raises_when_source_key_does_not_exist(
475+
copy_directory: CopyBucketFixture,
476+
):
477+
full_dst_key = copy_directory.full_dst_key
478+
479+
bucket = copy_directory.bucket
480+
481+
client = MountpointS3Client(copy_directory.region, TEST_USER_AGENT_PREFIX)
482+
483+
with pytest.raises(S3Exception, match="Service error: The object was not found"):
484+
client.copy_object(
485+
src_bucket=bucket,
486+
src_key=str(uuid.uuid4()),
487+
dst_bucket=bucket,
488+
dst_key=full_dst_key,
489+
)
490+
491+
407492
def _parse_list_result(stream: ListObjectStream, max_keys: int):
408493
object_infos = []
409494
i = 0

0 commit comments

Comments
 (0)