Skip to content

Commit f79c149

Browse files
author
Rakshil Modi
committed
Updated S3 transfer Manager for copy operation
updated linting changes Handling Zero byte edge case Fixed Exception type Updated doc string Updated docs string Fixed doc string Update transfer manager for copy operation
1 parent 6345224 commit f79c149

File tree

3 files changed

+261
-15
lines changed

3 files changed

+261
-15
lines changed

awscli/s3transfer/copies.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import copy
1414
import math
1515

16+
from botocore.exceptions import ClientError
1617
from s3transfer.tasks import (
1718
CompleteMultipartUploadTask,
1819
CreateMultipartUploadTask,
@@ -67,6 +68,7 @@ class CopySubmissionTask(SubmissionTask):
6768
'CopySourceSSECustomerKeyMD5',
6869
'MetadataDirective',
6970
'TaggingDirective',
71+
'IfNoneMatch',
7072
]
7173

7274
COMPLETE_MULTIPART_ARGS = [
@@ -75,6 +77,11 @@ class CopySubmissionTask(SubmissionTask):
7577
'SSECustomerKeyMD5',
7678
'RequestPayer',
7779
'ExpectedBucketOwner',
80+
'IfNoneMatch',
81+
]
82+
83+
COPY_OBJECT_ARGS_BLOCKLIST = [
84+
'IfNoneMatch',
7885
]
7986

8087
def _submit(
@@ -98,14 +105,14 @@ def _submit(
98105
:param transfer_future: The transfer future associated with the
99106
transfer request that tasks are being submitted for
100107
"""
108+
call_args = transfer_future.meta.call_args
101109
# Determine the size if it was not provided
102110
if transfer_future.meta.size is None:
103111
# If a size was not provided figure out the size for the
104112
# user. Note that we will only use the client provided to
105113
# the TransferManager. If the object is outside of the region
106114
# of the client, they may have to provide the file size themselves
107115
# with a completely new client.
108-
call_args = transfer_future.meta.call_args
109116
head_object_request = (
110117
self._get_head_object_request_from_copy_source(
111118
call_args.copy_source
@@ -127,10 +134,24 @@ def _submit(
127134
transfer_future.meta.provide_transfer_size(
128135
response['ContentLength']
129136
)
130-
131-
# If it is greater than threshold do a multipart copy, otherwise
132-
# do a regular copy object.
133-
if transfer_future.meta.size < config.multipart_threshold:
137+
# Check for ifNoneMatch is enabled and file has content
138+
# Special handling for 0-byte files: Since multipart copy works with object size
139+
# and divides the object into smaller chunks, there's an edge case when the object
140+
# size is zero. This would result in 0 parts being calculated, and the
141+
# CompleteMultipartUpload operation throws a MalformedXML error when transferring
142+
# 0 parts because the XML does not validate against the published schema.
143+
# Therefore, 0-byte files are always handled via single copy request regardless
144+
# of the multipart threshold setting.
145+
should_overwrite = (
146+
call_args.extra_args.get("IfNoneMatch")
147+
and transfer_future.meta.size != 0
148+
)
149+
# If it is less than threshold and ifNoneMatch is not in parameters
150+
# do a regular copy else do multipart copy.
151+
if (
152+
transfer_future.meta.size < config.multipart_threshold
153+
and not should_overwrite
154+
):
134155
self._submit_copy_request(
135156
client, config, osutil, request_executor, transfer_future
136157
)
@@ -147,19 +168,25 @@ def _submit_copy_request(
147168
# Get the needed progress callbacks for the task
148169
progress_callbacks = get_callbacks(transfer_future, 'progress')
149170

150-
# Submit the request of a single copy.
171+
# Submit the request of a single copy and make sure it
172+
# does not include any blocked arguments.
173+
copy_object_extra_args = {
174+
param: val
175+
for param, val in call_args.extra_args.items()
176+
if param not in self.COPY_OBJECT_ARGS_BLOCKLIST
177+
}
151178
self._transfer_coordinator.submit(
152179
request_executor,
153180
CopyObjectTask(
154181
transfer_coordinator=self._transfer_coordinator,
155182
main_kwargs={
156-
'client': client,
157-
'copy_source': call_args.copy_source,
158-
'bucket': call_args.bucket,
159-
'key': call_args.key,
160-
'extra_args': call_args.extra_args,
161-
'callbacks': progress_callbacks,
162-
'size': transfer_future.meta.size,
183+
"client": client,
184+
"copy_source": call_args.copy_source,
185+
"bucket": call_args.bucket,
186+
"key": call_args.key,
187+
"extra_args": copy_object_extra_args,
188+
"callbacks": progress_callbacks,
189+
"size": transfer_future.meta.size,
163190
},
164191
is_final=True,
165192
),

awscli/s3transfer/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,14 @@ class TransferManager:
188188
'SSEKMSEncryptionContext',
189189
'Tagging',
190190
'WebsiteRedirectLocation',
191+
'IfNoneMatch',
191192
]
192193

193194
ALLOWED_UPLOAD_ARGS = (
194195
_ALLOWED_SHARED_ARGS
195196
+ [
196197
'ChecksumType',
197198
'MpuObjectSize',
198-
'IfNoneMatch',
199199
]
200200
+ FULL_OBJECT_CHECKSUM_ARGS
201201
)

tests/functional/s3transfer/test_copy.py

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# language governing permissions and limitations under the License.
1313
from botocore.exceptions import ClientError
1414
from botocore.stub import Stubber
15+
from s3transfer.copies import CopySubmissionTask
1516
from s3transfer.manager import TransferConfig, TransferManager
1617
from s3transfer.utils import MIN_UPLOAD_CHUNKSIZE
1718

@@ -275,7 +276,12 @@ def test_copy_maps_extra_args_to_head_object(self):
275276

276277
def test_allowed_copy_params_are_valid(self):
277278
op_model = self.client.meta.service_model.operation_model('CopyObject')
278-
for allowed_upload_arg in self._manager.ALLOWED_COPY_ARGS:
279+
allowed_copy_arg = [
280+
arg
281+
for arg in self._manager.ALLOWED_COPY_ARGS
282+
if arg not in CopySubmissionTask.COPY_OBJECT_ARGS_BLOCKLIST
283+
]
284+
for allowed_upload_arg in allowed_copy_arg:
279285
self.assertIn(allowed_upload_arg, op_model.input_shape.members)
280286

281287
def test_copy_with_tagging(self):
@@ -700,3 +706,216 @@ def test_mp_copy_with_tagging_directive(self):
700706
)
701707
future.result()
702708
self.stubber.assert_no_pending_responses()
709+
710+
def test_copy_with_no_overwrite_flag_when_small_object_exists_at_target(
711+
self,
712+
):
713+
# Set up IfNoneMatch in extra_args
714+
self.extra_args['IfNoneMatch'] = '*'
715+
# Setting up the size of object
716+
small_content_size = 5
717+
self.content = b'0' * small_content_size
718+
# Add head object response with small content size
719+
head_response = self.create_stubbed_responses()[0]
720+
head_response['service_response'] = {
721+
'ContentLength': small_content_size
722+
}
723+
self.stubber.add_response(**head_response)
724+
# Should use multipart upload
725+
# Add create_multipart_upload response
726+
self.stubber.add_response(
727+
'create_multipart_upload',
728+
service_response={'UploadId': self.multipart_id},
729+
expected_params={
730+
'Bucket': self.bucket,
731+
'Key': self.key,
732+
},
733+
)
734+
# Add upload_part_copy response
735+
self.stubber.add_response(
736+
'upload_part_copy',
737+
{'CopyPartResult': {'ETag': 'etag-1'}},
738+
{
739+
'Bucket': self.bucket,
740+
'Key': self.key,
741+
'CopySource': self.copy_source,
742+
'UploadId': self.multipart_id,
743+
'PartNumber': 1,
744+
'CopySourceRange': f'bytes=0-{small_content_size-1}',
745+
},
746+
)
747+
# Mock a PreconditionFailed error for complete_multipart_upload
748+
self.stubber.add_client_error(
749+
method='complete_multipart_upload',
750+
service_error_code='PreconditionFailed',
751+
service_message='The condition specified in the conditional header(s) was not met',
752+
http_status_code=412,
753+
expected_params={
754+
'Bucket': self.bucket,
755+
'Key': self.key,
756+
'UploadId': self.multipart_id,
757+
'MultipartUpload': {
758+
'Parts': [{'ETag': 'etag-1', 'PartNumber': 1}]
759+
},
760+
'IfNoneMatch': '*',
761+
},
762+
)
763+
# Add abort_multipart_upload response
764+
self.stubber.add_response(
765+
'abort_multipart_upload',
766+
service_response={},
767+
expected_params={
768+
'Bucket': self.bucket,
769+
'Key': self.key,
770+
'UploadId': self.multipart_id,
771+
},
772+
)
773+
call_kwargs = self.create_call_kwargs()
774+
call_kwargs['extra_args'] = self.extra_args
775+
future = self.manager.copy(**call_kwargs)
776+
with self.assertRaises(ClientError) as context:
777+
future.result()
778+
self.assertEqual(
779+
context.exception.response['Error']['Code'], 'PreconditionFailed'
780+
)
781+
self.stubber.assert_no_pending_responses()
782+
783+
def test_copy_with_no_overwrite_flag_when_small_object_not_exists_at_target(
784+
self,
785+
):
786+
# Set up IfNoneMatch in extra_args
787+
self.extra_args['IfNoneMatch'] = '*'
788+
# Setting up the size of object
789+
small_content_size = 5
790+
self.content = b'0' * small_content_size
791+
# Add head object response with small content size
792+
head_response = self.create_stubbed_responses()[0]
793+
head_response['service_response'] = {
794+
'ContentLength': small_content_size
795+
}
796+
self.stubber.add_response(**head_response)
797+
# Should use multipart copy
798+
# Add create_multipart_upload response
799+
self.stubber.add_response(
800+
'create_multipart_upload',
801+
service_response={'UploadId': self.multipart_id},
802+
expected_params={
803+
'Bucket': self.bucket,
804+
'Key': self.key,
805+
},
806+
)
807+
# Add upload_part_copy response
808+
self.stubber.add_response(
809+
'upload_part_copy',
810+
{'CopyPartResult': {'ETag': 'etag-1'}},
811+
{
812+
'Bucket': self.bucket,
813+
'Key': self.key,
814+
'CopySource': self.copy_source,
815+
'UploadId': self.multipart_id,
816+
'PartNumber': 1,
817+
'CopySourceRange': f'bytes=0-{small_content_size-1}',
818+
},
819+
)
820+
self.stubber.add_response(
821+
'complete_multipart_upload',
822+
service_response={},
823+
expected_params={
824+
'Bucket': self.bucket,
825+
'Key': self.key,
826+
'UploadId': self.multipart_id,
827+
'MultipartUpload': {
828+
'Parts': [{'ETag': 'etag-1', 'PartNumber': 1}]
829+
},
830+
'IfNoneMatch': '*',
831+
},
832+
)
833+
call_kwargs = self.create_call_kwargs()
834+
call_kwargs['extra_args'] = self.extra_args
835+
future = self.manager.copy(**call_kwargs)
836+
future.result()
837+
self.stubber.assert_no_pending_responses()
838+
839+
def test_copy_with_no_overwrite_flag_when_large_object_exists_at_target(
840+
self,
841+
):
842+
# Set up IfNoneMatch in extra_args
843+
self.extra_args['IfNoneMatch'] = '*'
844+
# Add head object response
845+
self.add_get_head_response_with_default_expected_params()
846+
# Should use multipart upload
847+
self.add_create_multipart_response_with_default_expected_params()
848+
self.add_upload_part_copy_responses_with_default_expected_params()
849+
# Mock a PreconditionFailed error for complete_multipart_upload
850+
self.stubber.add_client_error(
851+
method='complete_multipart_upload',
852+
service_error_code='PreconditionFailed',
853+
service_message='The condition specified in the conditional header(s) was not met',
854+
http_status_code=412,
855+
expected_params={
856+
'Bucket': self.bucket,
857+
'Key': self.key,
858+
'UploadId': self.multipart_id,
859+
'MultipartUpload': {
860+
'Parts': [
861+
{'ETag': 'etag-1', 'PartNumber': 1},
862+
{'ETag': 'etag-2', 'PartNumber': 2},
863+
{'ETag': 'etag-3', 'PartNumber': 3},
864+
]
865+
},
866+
'IfNoneMatch': '*',
867+
},
868+
)
869+
# Add abort_multipart_upload response
870+
self.stubber.add_response(
871+
'abort_multipart_upload',
872+
service_response={},
873+
expected_params={
874+
'Bucket': self.bucket,
875+
'Key': self.key,
876+
'UploadId': self.multipart_id,
877+
},
878+
)
879+
call_kwargs = self.create_call_kwargs()
880+
call_kwargs['extra_args'] = self.extra_args
881+
future = self.manager.copy(**call_kwargs)
882+
with self.assertRaises(ClientError) as context:
883+
future.result()
884+
self.assertEqual(
885+
context.exception.response['Error']['Code'], 'PreconditionFailed'
886+
)
887+
self.stubber.assert_no_pending_responses()
888+
889+
def test_copy_with_no_overwrite_flag_when_large_object_not_exists_at_target(
890+
self,
891+
):
892+
# Set up IfNoneMatch in extra_args
893+
self.extra_args['IfNoneMatch'] = '*'
894+
# Add head object response
895+
self.add_get_head_response_with_default_expected_params()
896+
# Should use multipart upload
897+
self.add_create_multipart_response_with_default_expected_params()
898+
self.add_upload_part_copy_responses_with_default_expected_params()
899+
# Add successful complete_multipart_upload response
900+
self.stubber.add_response(
901+
'complete_multipart_upload',
902+
service_response={},
903+
expected_params={
904+
'Bucket': self.bucket,
905+
'Key': self.key,
906+
'UploadId': self.multipart_id,
907+
'MultipartUpload': {
908+
'Parts': [
909+
{'ETag': 'etag-1', 'PartNumber': 1},
910+
{'ETag': 'etag-2', 'PartNumber': 2},
911+
{'ETag': 'etag-3', 'PartNumber': 3},
912+
]
913+
},
914+
'IfNoneMatch': '*',
915+
},
916+
)
917+
call_kwargs = self.create_call_kwargs()
918+
call_kwargs['extra_args'] = self.extra_args
919+
future = self.manager.copy(**call_kwargs)
920+
future.result()
921+
self.stubber.assert_no_pending_responses()

0 commit comments

Comments
 (0)