Skip to content

Commit 2493f8b

Browse files
authored
Adding No overwrite for copy operation (#9592)
1 parent 54e84b6 commit 2493f8b

File tree

10 files changed

+632
-18
lines changed

10 files changed

+632
-18
lines changed

awscli/customizations/s3/results.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from awscli.customizations.s3.subscribers import OnDoneFilteredSubscriber
2424
from awscli.customizations.s3.utils import (
2525
WarningResult,
26-
create_warning,
2726
human_readable_size,
2827
)
2928
from awscli.customizations.utils import uni_print

awscli/customizations/s3/s3handler.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import logging
1414
import os
1515

16+
from botocore.exceptions import ClientError
1617
from s3transfer.manager import TransferManager
1718

1819
from awscli.compat import get_binary_stdin
@@ -481,7 +482,46 @@ def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
481482
)
482483

483484
def _get_warning_handlers(self):
484-
return [self._warn_glacier]
485+
return [
486+
self._warn_glacier,
487+
self._warn_if_zero_byte_file_exists_with_no_overwrite,
488+
]
489+
490+
def _warn_if_zero_byte_file_exists_with_no_overwrite(self, fileinfo):
491+
"""
492+
Warning handler to skip zero-byte files when no_overwrite is set and file exists.
493+
494+
This method handles the transfer of zero-byte objects when the no-overwrite parameter is specified.
495+
To prevent overwrite, it uses head_object to verify if the object exists at the destination:
496+
If the object is present at destination: skip the file (return True)
497+
If the object is not present at destination: allow transfer (return False)
498+
499+
:type fileinfo: FileInfo
500+
:param fileinfo: The FileInfo object containing transfer details
501+
502+
:rtype: bool
503+
:return: True if file should be skipped, False if transfer should proceed
504+
"""
505+
if not self._cli_params.get('no_overwrite') or (
506+
getattr(fileinfo, 'size') and fileinfo.size > 0
507+
):
508+
return False
509+
510+
bucket, key = find_bucket_key(fileinfo.dest)
511+
client = fileinfo.source_client
512+
try:
513+
client.head_object(Bucket=bucket, Key=key)
514+
LOGGER.debug(
515+
"Warning: Skipping file %s as it already exists on %s",
516+
fileinfo.src,
517+
fileinfo.dest,
518+
)
519+
return True
520+
except ClientError as e:
521+
if e.response['Error']['Code'] == '404':
522+
return False
523+
else:
524+
raise
485525

486526
def _format_src_dest(self, fileinfo):
487527
src = self._format_s3_path(fileinfo.src)

awscli/customizations/s3/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ def map_copy_object_params(cls, request_params, cli_params):
521521
)
522522
cls._set_request_payer_param(request_params, cli_params)
523523
cls._set_checksum_algorithm_param(request_params, cli_params)
524+
cls._set_no_overwrite_param(request_params, cli_params)
524525

525526
@classmethod
526527
def map_head_object_params(cls, request_params, cli_params):

awscli/s3transfer/copies.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import copy
1414
import math
1515

16+
from botocore.exceptions import ClientError
1617
from s3transfer.tasks import (
1718
CompleteMultipartUploadTask,
1819
CreateMultipartUploadTask,
@@ -67,6 +68,7 @@ class CopySubmissionTask(SubmissionTask):
6768
'CopySourceSSECustomerKeyMD5',
6869
'MetadataDirective',
6970
'TaggingDirective',
71+
'IfNoneMatch',
7072
]
7173

7274
COMPLETE_MULTIPART_ARGS = [
@@ -75,6 +77,11 @@ class CopySubmissionTask(SubmissionTask):
7577
'SSECustomerKeyMD5',
7678
'RequestPayer',
7779
'ExpectedBucketOwner',
80+
'IfNoneMatch',
81+
]
82+
83+
COPY_OBJECT_ARGS_BLOCKLIST = [
84+
'IfNoneMatch',
7885
]
7986

8087
def _submit(
@@ -98,14 +105,14 @@ def _submit(
98105
:param transfer_future: The transfer future associated with the
99106
transfer request that tasks are being submitted for
100107
"""
108+
call_args = transfer_future.meta.call_args
101109
# Determine the size if it was not provided
102110
if transfer_future.meta.size is None:
103111
# If a size was not provided figure out the size for the
104112
# user. Note that we will only use the client provided to
105113
# the TransferManager. If the object is outside of the region
106114
# of the client, they may have to provide the file size themselves
107115
# with a completely new client.
108-
call_args = transfer_future.meta.call_args
109116
head_object_request = (
110117
self._get_head_object_request_from_copy_source(
111118
call_args.copy_source
@@ -127,10 +134,24 @@ def _submit(
127134
transfer_future.meta.provide_transfer_size(
128135
response['ContentLength']
129136
)
130-
131-
# If it is greater than threshold do a multipart copy, otherwise
132-
# do a regular copy object.
133-
if transfer_future.meta.size < config.multipart_threshold:
137+
# Check for ifNoneMatch is enabled and file has content
138+
# Special handling for 0-byte files: Since multipart copy works with object size
139+
# and divides the object into smaller chunks, there's an edge case when the object
140+
# size is zero. This would result in 0 parts being calculated, and the
141+
# CompleteMultipartUpload operation throws a MalformedXML error when transferring
142+
# 0 parts because the XML does not validate against the published schema.
143+
# Therefore, 0-byte files are always handled via single copy request regardless
144+
# of the multipart threshold setting.
145+
should_overwrite = (
146+
call_args.extra_args.get("IfNoneMatch")
147+
and transfer_future.meta.size != 0
148+
)
149+
# If it is less than threshold and ifNoneMatch is not in parameters
150+
# do a regular copy else do multipart copy.
151+
if (
152+
transfer_future.meta.size < config.multipart_threshold
153+
and not should_overwrite
154+
):
134155
self._submit_copy_request(
135156
client, config, osutil, request_executor, transfer_future
136157
)
@@ -147,19 +168,25 @@ def _submit_copy_request(
147168
# Get the needed progress callbacks for the task
148169
progress_callbacks = get_callbacks(transfer_future, 'progress')
149170

150-
# Submit the request of a single copy.
171+
# Submit the request of a single copy and make sure it
172+
# does not include any blocked arguments.
173+
copy_object_extra_args = {
174+
param: val
175+
for param, val in call_args.extra_args.items()
176+
if param not in self.COPY_OBJECT_ARGS_BLOCKLIST
177+
}
151178
self._transfer_coordinator.submit(
152179
request_executor,
153180
CopyObjectTask(
154181
transfer_coordinator=self._transfer_coordinator,
155182
main_kwargs={
156-
'client': client,
157-
'copy_source': call_args.copy_source,
158-
'bucket': call_args.bucket,
159-
'key': call_args.key,
160-
'extra_args': call_args.extra_args,
161-
'callbacks': progress_callbacks,
162-
'size': transfer_future.meta.size,
183+
"client": client,
184+
"copy_source": call_args.copy_source,
185+
"bucket": call_args.bucket,
186+
"key": call_args.key,
187+
"extra_args": copy_object_extra_args,
188+
"callbacks": progress_callbacks,
189+
"size": transfer_future.meta.size,
163190
},
164191
is_final=True,
165192
),

awscli/s3transfer/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,14 @@ class TransferManager:
188188
'SSEKMSEncryptionContext',
189189
'Tagging',
190190
'WebsiteRedirectLocation',
191+
'IfNoneMatch',
191192
]
192193

193194
ALLOWED_UPLOAD_ARGS = (
194195
_ALLOWED_SHARED_ARGS
195196
+ [
196197
'ChecksumType',
197198
'MpuObjectSize',
198-
'IfNoneMatch',
199199
]
200200
+ FULL_OBJECT_CHECKSUM_ARGS
201201
)

tests/functional/s3/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,16 @@ def mp_copy_responses(self):
257257
self.complete_mpu_response(),
258258
]
259259

260+
def precondition_failed_error_response(self, condition='If-None-Match'):
261+
return {
262+
'Error': {
263+
'Code': 'PreconditionFailed',
264+
'Message': 'At least one of the pre-conditions you '
265+
'specified did not hold',
266+
'Condition': condition,
267+
}
268+
}
269+
260270

261271
class BaseS3CLIRunnerTest(unittest.TestCase):
262272
def setUp(self):

tests/functional/s3/test_cp_command.py

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ def test_no_overwrite_flag_multipart_upload_when_object_not_exists_on_target(
335335
# Create a large file that will trigger multipart upload
336336
full_path = self.files.create_file('foo.txt', 'a' * 10 * (1024**2))
337337
cmdline = f'{self.prefix} {full_path} s3://bucket --no-overwrite'
338-
339338
# Set up responses for multipart upload
340339
self.parsed_responses = [
341340
{'UploadId': 'foo'}, # CreateMultipartUpload response
@@ -397,6 +396,160 @@ def test_no_overwrite_flag_multipart_upload_when_object_exists_on_target(
397396
# Verify the IfNoneMatch condition was set in the CompleteMultipartUpload request
398397
self.assertEqual(self.operations_called[3][1]['IfNoneMatch'], '*')
399398

399+
def test_no_overwrite_flag_on_copy_when_small_object_does_not_exist_on_target(
400+
self,
401+
):
402+
cmdline = f'{self.prefix} s3://bucket1/key.txt s3://bucket/key1.txt --no-overwrite'
403+
# Set up responses for multipart copy (since no-overwrite always uses multipart)
404+
self.parsed_responses = [
405+
self.head_object_response(), # HeadObject to get source metadata
406+
self.create_mpu_response('foo'), # CreateMultipartUpload response
407+
self.upload_part_copy_response(), # UploadPartCopy response
408+
{}, # CompleteMultipartUpload response
409+
]
410+
self.run_cmd(cmdline, expected_rc=0)
411+
# Verify all multipart operations were called
412+
self.assertEqual(len(self.operations_called), 4)
413+
self.assertEqual(self.operations_called[0][0].name, 'HeadObject')
414+
self.assertEqual(
415+
self.operations_called[1][0].name, 'CreateMultipartUpload'
416+
)
417+
self.assertEqual(self.operations_called[2][0].name, 'UploadPartCopy')
418+
self.assertEqual(
419+
self.operations_called[3][0].name, 'CompleteMultipartUpload'
420+
)
421+
# Verify the IfNoneMatch condition was set in the CompleteMultipartUpload request
422+
self.assertEqual(self.operations_called[3][1]['IfNoneMatch'], '*')
423+
424+
def test_no_overwrite_flag_on_copy_when_small_object_exists_on_target(
425+
self,
426+
):
427+
cmdline = f'{self.prefix} s3://bucket1/key.txt s3://bucket/key.txt --no-overwrite'
428+
# Set up responses for multipart copy (since no-overwrite always uses multipart)
429+
self.parsed_responses = [
430+
self.head_object_response(), # HeadObject to get source metadata
431+
self.create_mpu_response('foo'), # CreateMultipartUpload response
432+
self.upload_part_copy_response(), # UploadPartCopy response
433+
self.precondition_failed_error_response(), # CompleteMultipartUpload
434+
{}, # AbortMultipartUpload response
435+
]
436+
self.run_cmd(cmdline, expected_rc=0)
437+
# Verify all multipart operations were called
438+
self.assertEqual(len(self.operations_called), 5)
439+
self.assertEqual(self.operations_called[0][0].name, 'HeadObject')
440+
self.assertEqual(
441+
self.operations_called[1][0].name, 'CreateMultipartUpload'
442+
)
443+
self.assertEqual(self.operations_called[2][0].name, 'UploadPartCopy')
444+
self.assertEqual(
445+
self.operations_called[3][0].name, 'CompleteMultipartUpload'
446+
)
447+
self.assertEqual(
448+
self.operations_called[4][0].name, 'AbortMultipartUpload'
449+
)
450+
# Verify the IfNoneMatch condition was set in the CompleteMultipartUpload request
451+
self.assertEqual(self.operations_called[3][1]['IfNoneMatch'], '*')
452+
453+
def test_no_overwrite_flag_on_copy_when_zero_size_object_exists_at_destination(
454+
self,
455+
):
456+
cmdline = f'{self.prefix} s3://bucket1/file.txt s3://bucket2/file.txt --no-overwrite'
457+
self.parsed_responses = [
458+
self.head_object_response(
459+
ContentLength=0
460+
), # Source object (zero size)
461+
self.head_object_response(), # Checking the object at destination
462+
]
463+
self.run_cmd(cmdline, expected_rc=0)
464+
self.assertEqual(len(self.operations_called), 2)
465+
self.assertEqual(self.operations_called[0][0].name, 'HeadObject')
466+
self.assertEqual(self.operations_called[1][0].name, 'HeadObject')
467+
468+
def test_no_overwrite_flag_on_copy_when_zero_size_object_not_exists_at_destination(
469+
self,
470+
):
471+
cmdline = f'{self.prefix} s3://bucket1/file.txt s3://bucket2/file1.txt --no-overwrite'
472+
self.parsed_responses = [
473+
self.head_object_response(
474+
ContentLength=0
475+
), # Source object (zero size)
476+
{
477+
'Error': {'Code': '404', 'Message': 'Not Found'}
478+
}, # At destination object does not exists
479+
self.copy_object_response(), # Copy Request when object does not exists
480+
]
481+
self.run_cmd(cmdline, expected_rc=0)
482+
self.assertEqual(len(self.operations_called), 3)
483+
self.assertEqual(self.operations_called[0][0].name, 'HeadObject')
484+
self.assertEqual(self.operations_called[1][0].name, 'HeadObject')
485+
self.assertEqual(self.operations_called[2][0].name, 'CopyObject')
486+
487+
def test_no_overwrite_flag_on_copy_when_large_object_exists_on_target(
488+
self,
489+
):
490+
cmdline = f'{self.prefix} s3://bucket1/key.txt s3://bucket/key.txt --no-overwrite'
491+
# Set up responses for multipart copy with large object
492+
self.parsed_responses = [
493+
self.head_object_response(
494+
ContentLength=10 * (1024**2)
495+
), # HeadObject with large content
496+
self.get_object_tagging_response({}), # GetObjectTagging response
497+
self.create_mpu_response('foo'), # CreateMultipartUpload response
498+
self.upload_part_copy_response(), # UploadPartCopy response part 1
499+
self.upload_part_copy_response(), # UploadPartCopy response part 2
500+
self.precondition_failed_error_response(), # CompleteMultipartUpload fails with PreconditionFailed
501+
{}, # AbortMultipartUpload response
502+
]
503+
self.run_cmd(cmdline, expected_rc=0)
504+
# Verify all multipart operations were called
505+
self.assertEqual(len(self.operations_called), 7)
506+
self.assertEqual(self.operations_called[0][0].name, 'HeadObject')
507+
self.assertEqual(self.operations_called[1][0].name, 'GetObjectTagging')
508+
self.assertEqual(
509+
self.operations_called[2][0].name, 'CreateMultipartUpload'
510+
)
511+
self.assertEqual(self.operations_called[3][0].name, 'UploadPartCopy')
512+
self.assertEqual(self.operations_called[4][0].name, 'UploadPartCopy')
513+
self.assertEqual(
514+
self.operations_called[5][0].name, 'CompleteMultipartUpload'
515+
)
516+
self.assertEqual(
517+
self.operations_called[6][0].name, 'AbortMultipartUpload'
518+
)
519+
# Verify the IfNoneMatch condition was set in the CompleteMultipartUpload request
520+
self.assertEqual(self.operations_called[5][1]['IfNoneMatch'], '*')
521+
522+
def test_no_overwrite_flag_on_copy_when_large_object_does_not_exist_on_target(
523+
self,
524+
):
525+
cmdline = f'{self.prefix} s3://bucket1/key.txt s3://bucket/key1.txt --no-overwrite'
526+
# Set up responses for multipart copy with large object
527+
self.parsed_responses = [
528+
self.head_object_response(
529+
ContentLength=10 * (1024**2)
530+
), # HeadObject with large content
531+
self.get_object_tagging_response({}), # GetObjectTagging response
532+
self.create_mpu_response('foo'), # CreateMultipartUpload response
533+
self.upload_part_copy_response(), # UploadPartCopy response part 1
534+
self.upload_part_copy_response(), # UploadPartCopy response part 2
535+
{}, # CompleteMultipartUpload response
536+
]
537+
self.run_cmd(cmdline, expected_rc=0)
538+
# Verify all multipart operations were called
539+
self.assertEqual(len(self.operations_called), 6)
540+
self.assertEqual(self.operations_called[0][0].name, 'HeadObject')
541+
self.assertEqual(self.operations_called[1][0].name, 'GetObjectTagging')
542+
self.assertEqual(
543+
self.operations_called[2][0].name, 'CreateMultipartUpload'
544+
)
545+
self.assertEqual(self.operations_called[3][0].name, 'UploadPartCopy')
546+
self.assertEqual(self.operations_called[4][0].name, 'UploadPartCopy')
547+
self.assertEqual(
548+
self.operations_called[5][0].name, 'CompleteMultipartUpload'
549+
)
550+
# Verify the IfNoneMatch condition was set in the CompleteMultipartUpload request
551+
self.assertEqual(self.operations_called[5][1]['IfNoneMatch'], '*')
552+
400553
def test_dryrun_download(self):
401554
self.parsed_responses = [self.head_object_response()]
402555
target = self.files.full_path('file.txt')

0 commit comments

Comments
 (0)