diff --git a/.changes/next-release/enhancement-s3-34855.json b/.changes/next-release/enhancement-s3-34855.json new file mode 100644 index 000000000000..7f75bcea0abc --- /dev/null +++ b/.changes/next-release/enhancement-s3-34855.json @@ -0,0 +1,5 @@ +{ + "type": "enhancement", + "category": "`s3`", + "description": "Add all-versions flag with AWS CLI S3 rm command to delete all versions of objects present in a versioned-enabled bucket. fixes `#4070 `__" +} diff --git a/awscli/customizations/s3/filegenerator.py b/awscli/customizations/s3/filegenerator.py index d99cdcb31bf3..bdb3fb6021e4 100644 --- a/awscli/customizations/s3/filegenerator.py +++ b/awscli/customizations/s3/filegenerator.py @@ -19,6 +19,7 @@ from dateutil.tz import tzlocal from awscli.compat import queue +from awscli.customizations.s3.fileinfo import VersionedFileInfo from awscli.customizations.s3.utils import ( EPOCH_TIME, BucketLister, @@ -26,6 +27,7 @@ find_bucket_key, find_dest_path_comp_key, get_file_stat, + split_s3_bucket_key, ) _open = open @@ -406,3 +408,109 @@ def _list_single_object(self, s3_path): response['LastModified'] = last_update.astimezone(tzlocal()) response['ETag'] = response.pop('ETag', None) return s3_path, response + + +class VersionedFileGenerator: + """ + This class generates VersionedFileInfo objects for all versions of objects in a bucket. + It uses the BucketLister class to list all versions and creates appropriate + VersionedFileInfo objects for each version. + """ + + def __init__( + self, + client, + operation_name, + follow_symlinks=True, + page_size=None, + result_queue=None, + request_parameters=None, + ): + """ + Initialize a new VersionedFileGenerator. + + :param client: The S3 client to use. + :param operation_name: The name of the operation to perform. + :param follow_symlinks: Whether to follow symlinks. + :param page_size: The number of items to include in each API response. + :param result_queue: Queue for results and warnings. + :param request_parameters: Additional parameters for the request. + """ + self._client = client + self.operation_name = operation_name + self.follow_symlinks = follow_symlinks + self.page_size = page_size + self.result_queue = result_queue + if not result_queue: + self.result_queue = queue.Queue() + self.request_parameters = {} + if request_parameters is not None: + self.request_parameters = request_parameters + self._version_lister = BucketLister(client) + + def call(self, files): + """ + Generate VersionedFileInfo objects for all versions of objects. + + :param files: Dictionary containing source and destination information. + :yields: VersionedFileInfo objects for each version of each object. + """ + source = files['src']['path'] + src_type = files['src']['type'] + dest_type = files['dest']['type'] + + # Use the list_object_versions method to get all versions + file_iterator = self.list_object_versions(source, files['dir_op']) + + for src_path, content, version_id in file_iterator: + dest_path, compare_key = find_dest_path_comp_key(files, src_path) + + # Create a VersionedFileInfo for this object version + yield VersionedFileInfo( + src=src_path, + dest=dest_path, + compare_key=compare_key, + size=content.get('Size', 0), + last_update=content.get('LastModified'), + src_type=src_type, + dest_type=dest_type, + operation_name=self.operation_name, + associated_response_data=content, + version_id=version_id, + ) + + def list_object_versions(self, s3_path, dir_op): + """ + This function yields the appropriate object versions or all object versions + under a common prefix depending if the operation is on objects under a + common prefix. It yields the file's source path, content, and version ID. + + :param s3_path: The S3 path to list versions for. + :param dir_op: Whether this is a directory operation. + :yields: Tuples of (source_path, content, version_id) + """ + bucket, key = split_s3_bucket_key(s3_path) + + # Short circuit path: if we are not recursing into the s3 + # bucket and a specific path was given, we can just yield + # that path and not have to call any operation in s3. + # However, for versioned objects, we still need to list all versions + # even for a specific object, so we don't have a short circuit path here. + + # List all versions of objects + for ( + src_path, + content, + version_id, + ) in self._version_lister.list_object_versions( + bucket=bucket, + prefix=key, + page_size=self.page_size, + extra_args=self.request_parameters.get('ListObjectVersions', {}), + ): + # If this is not a directory operation and the path doesn't match exactly, + # skip it (similar to the behavior in FileGenerator.list_objects) + if not dir_op and s3_path != src_path: + continue + + yield src_path, content, version_id diff --git a/awscli/customizations/s3/fileinfo.py b/awscli/customizations/s3/fileinfo.py index 0a3dd561b501..ed0d0c241d06 100644 --- a/awscli/customizations/s3/fileinfo.py +++ b/awscli/customizations/s3/fileinfo.py @@ -109,3 +109,37 @@ def _is_restored(self, response_data): # restored back to S3. # 'Restore' looks like: 'ongoing-request="false", expiry-date="..."' return 'ongoing-request="false"' in response_data.get('Restore', '') + + +class VersionedFileInfo(FileInfo): + def __init__(self, version_id=None, is_delete_marker=False, **kwargs): + """ + This class extends FileInfo to include version information for S3 objects. + It is specifically designed for operations that need to work with versioned + S3 objects, such as deleting all versions of an object. + + :param version_id: The version ID of the S3 object. + :type version_id: string + """ + super().__init__(**kwargs) + self.version_id = version_id + + def is_glacier_compatible(self): + """ + Determines glacier compatibility for versioned S3 objects, with special handling for delete operations. + + This method overrides the parent FileInfo.is_glacier_compatible() to provide enhanced + compatibility checking for S3 objects stored in glacier storage classes + when versioning is enabled on the bucket. + + This method override allows delete operations to proceed on versioned glacier objects. + Since delete operations on glacier objects succeed regardless of storage class + + :rtype: bool + :returns: True if the operation can proceed on glacier objects (specifically for delete + operations on versioned objects), False if the operation would fail due to + glacier storage class restrictions + """ + if self.operation_name == 'delete': + return True + return super().is_glacier_compatible() diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py index 23176f30f889..bffcdaf0bb13 100644 --- a/awscli/customizations/s3/s3handler.py +++ b/awscli/customizations/s3/s3handler.py @@ -10,6 +10,7 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +import itertools import logging import os @@ -54,12 +55,18 @@ find_bucket_key, human_readable_size, relative_path, + split_s3_bucket_key, ) +from awscli.customizations.utils import uni_print +from awscli.s3transfer.constants import MAX_BATCH_SIZE LOGGER = logging.getLogger(__name__) class S3TransferHandlerFactory: + # Define parameters that requires batch handling + BATCH_PARAMS = ['all_versions'] + def __init__(self, cli_params): """Factory for S3TransferHandlers @@ -89,7 +96,10 @@ def __call__(self, transfer_manager, result_queue): command_result_recorder = CommandResultRecorder( result_queue, result_recorder, result_processor ) - + if self._requires_batch_handler(): + return BatchS3TransferHandler( + transfer_manager, self._cli_params, command_result_recorder + ) return S3TransferHandler( transfer_manager, self._cli_params, command_result_recorder ) @@ -107,6 +117,12 @@ def _add_result_printer(self, result_recorder, result_processor_handlers): result_printer = ResultPrinter(result_recorder) result_processor_handlers.append(result_printer) + def _requires_batch_handler(self): + """ + Check if any batch parameters are enabled + """ + return any(self._cli_params.get(param) for param in self.BATCH_PARAMS) + class S3TransferHandler: def __init__(self, transfer_manager, cli_params, result_command_recorder): @@ -124,6 +140,7 @@ def __init__(self, transfer_manager, cli_params, result_command_recorder): used to get the final result of the transfer """ self._transfer_manager = transfer_manager + self._cli_params = cli_params # TODO: Ideally the s3 transfer handler should not need to know # about the result command recorder. It really only needs an interface # for adding results to the queue. When all of the commands have @@ -592,3 +609,284 @@ def _submit_transfer_request(self, fileinfo, extra_args, subscribers): def _format_src_dest(self, fileinfo): return self._format_local_path(fileinfo.src), None + + +class BatchBase: + def __init__(self, result_queue): + self.result_queue = result_queue + + def create_batch(self, objects, max_size): + """ + Create a batch of objects. + + :param objects: The list of objects to batch. + :param max_size: The maximum size of the batch. + :return: A list of batches. + """ + batches = [] + for i in range(0, len(objects), max_size): + batches.append(objects[i : i + max_size]) + LOGGER.debug("Created %d batches", len(batches)) + return batches + + def create_batches(self, fileinfos): + """ + :param fileinfos: List of VersionedFileInfo objects + :return: List of batches + """ + raise NotImplementedError('create_batches()') + + +class BatchDelete(BatchBase): + """ + Specialized batch class for delete operations. + Creates batches in S3 delete-objects API format. + """ + + def create_batches(self, fileinfos): + """ + Create delete batches from VersionedFileInfo objects. + + :param fileinfos: Iterable of VersionedFileInfo objects to delete + :return: List of batches in S3 delete-objects format + """ + if not fileinfos: + return [] + bucket = None + objects_with_fileinfo = [] + for fileinfo in fileinfos: + bucket, key = split_s3_bucket_key(fileinfo.src) + obj_entry = {'Key': key} + if getattr(fileinfo, 'version_id', None): + obj_entry['VersionId'] = fileinfo.version_id + + objects_with_fileinfo.append( + {'object': obj_entry, 'fileinfo': fileinfo} + ) + + object_batches = self.create_batch( + objects_with_fileinfo, MAX_BATCH_SIZE + ) + + formatted_batches = [] + for batch in object_batches: + formatted_batch = { + 'bucket': bucket, + 'delete_request': { + 'Objects': [item['object'] for item in batch], + 'Quiet': False, + }, + 'fileinfos': [item['fileinfo'] for item in batch], + } + formatted_batches.append(formatted_batch) + + return formatted_batches + + def get_delete_batches(self, fileinfos): + """ + :param fileinfos: List of FileInfo objects to delete + :return: List of formatted delete batches + """ + return self.create_batches(fileinfos) + + +class BatchS3TransferHandler: + def __init__(self, transfer_manager, cli_params, result_command_recorder): + """ + Backend for performing S3 batch transfers . + + :type transfer_manager: s3transfer.manager.TransferManager + :param transfer_manager: Transfer manager to use for transfers + + :type cli_params: dict + :param cli_params: The parameters passed to the CLI command in the + form of a dictionary + + :type result_command_recorder: ResultCommandRecorder + :param result_command_recorder: The result command recorder to be + used to get the final result of the transfer + """ + + self._transfer_manager = transfer_manager + self._cli_params = cli_params + self._result_command_recorder = result_command_recorder + + submitter_args = ( + self._transfer_manager, + self._result_command_recorder.result_queue, + cli_params, + ) + # Submitter list for batch request submitters + self._batch_submitters = [ + BatchDeleteRequestSubmitter(*submitter_args), + ] + + def call(self, fileinfos): + """ + Process iterable of VersionedFileInfo for batch transfers + + :type versionedFileInfo: Set of VersionedFileInfo to submit to batch submitter + :param fileinfos: The fileinfos to submit to the batch submitter + + :rtype: CommandResult + :returns: The result of the command that specifies the number of + failures and warnings encountered. + """ + + with self._result_command_recorder: + with self._transfer_manager: + first_fileinfo = next(iter(fileinfos), None) + + if not first_fileinfo: + self._result_command_recorder.notify_total_submissions(0) + return self._result_command_recorder.get_command_result() + + selected_submitter = None + for submitter in self._batch_submitters: + if submitter.can_submit(first_fileinfo): + selected_submitter = submitter + + total_submissions = selected_submitter.submit_batch( + itertools.chain([first_fileinfo], fileinfos) + ) + self._result_command_recorder.notify_total_submissions( + total_submissions + ) + return self._result_command_recorder.get_command_result() + + +class BatchDeleteRequestSubmitter: + def __init__(self, transfer_manager, result_queue, cli_params): + """Submits transfer requests to the TransferManager + + Given a VersionedFileInfo object and provided CLI parameters, it will add the + necessary extra arguments and subscribers in making a call to the + TransferManager. + + :type transfer_manager: s3transfer.manager.TransferManager + :param transfer_manager: The underlying transfer manager + + :type result_queue: queue.Queue + :param result_queue: The result queue to use + + :type cli_params: dict + :param cli_params: The associated CLI parameters passed in to the + command as a dictionary. + """ + self._transfer_manager = transfer_manager + self._result_queue = result_queue + self._cli_params = cli_params + + def can_submit(self, fileinfo): + return ( + fileinfo.operation_name == 'delete' + and fileinfo.src_type == 's3' + and hasattr(fileinfo, 'version_id') + ) + + def submit_batch(self, fileinfos): + """Submit a batch of versionedFileInfo for deletion""" + delete_batch = BatchDelete(self._result_queue) + batches = delete_batch.get_delete_batches(fileinfos) + if self._cli_params.get('dryrun'): + return self._submit_dryrun(batches) + + return self._submit_transfer_request_for_batch(batches) + + def _submit_dryrun(self, batches): + total_objects = 0 + for batch in batches: + for fileinfo in batch['fileinfos']: + src, dest = self._format_src_dest(fileinfo) + self._result_queue.put( + DryRunResult(transfer_type='delete', src=src, dest=dest) + ) + total_objects += 1 + return total_objects + + def _submit_transfer_request_for_batch(self, batches): + """Transfer each batch to S3Transfer manager""" + total_objects = 0 + + for i, batch in enumerate(batches): + bucket = batch['bucket'] + objects = batch['delete_request']['Objects'] + fileinfos_in_batch = batch['fileinfos'] + + for fileinfo in fileinfos_in_batch: + src, dest = self._format_src_dest(fileinfo) + self._result_queue.put( + QueuedResult( + total_transfer_size=0, + transfer_type='delete', + src=src, + dest=dest, + ) + ) + total_objects += 1 + + try: + extra_args = {} + RequestParamsMapper.map_delete_object_params( + extra_args, self._cli_params + ) + future = self._transfer_manager.batch_delete( + bucket=bucket, objects=objects, extra_args=extra_args + ) + response = future.result() + self._handle_batch_delete_response( + response, fileinfos_in_batch + ) + + except Exception as e: + for fileinfo in fileinfos_in_batch: + self._report_failure(fileinfo, e) + return total_objects + + def _report_success(self, fileinfo): + src, dest = self._format_src_dest(fileinfo) + self._result_queue.put( + SuccessResult(transfer_type='delete', src=src, dest=dest) + ) + + def _report_failure(self, fileinfo, exception): + src, dest = self._format_src_dest(fileinfo) + self._result_queue.put( + FailureResult( + transfer_type='delete', + src=src, + dest=dest, + exception=exception, + ) + ) + + def _format_s3_path(self, path): + if path.startswith('s3://'): + return path + return 's3://' + path + + def _format_src_dest(self, fileinfo): + src = fileinfo.src + if hasattr(fileinfo, 'version_id') and fileinfo.version_id: + src = f"{src} (version {fileinfo.version_id})" + return self._format_s3_path(src), None + + def _report_error(self, fileinfo, error_message): + src, _ = self._format_src_dest(fileinfo) + uni_print(f"delete failed: {src} - {error_message}\n") + + def _handle_batch_delete_response(self, response, fileinfos_in_batch): + error_objects = {} + for error in response.get('Errors', []): + key = error['Key'] + version_id = error.get('VersionId', None) + error_objects[(key, version_id)] = error['Message'] + + for fileinfo in fileinfos_in_batch: + _, key = split_s3_bucket_key(fileinfo.src) + if (key, fileinfo.version_id) in error_objects: + self._report_error( + fileinfo, error_objects[(key, fileinfo.version_id)] + ) + else: + self._report_success(fileinfo) diff --git a/awscli/customizations/s3/subcommands.py b/awscli/customizations/s3/subcommands.py index 8dc8a61fa895..4606a656db19 100644 --- a/awscli/customizations/s3/subcommands.py +++ b/awscli/customizations/s3/subcommands.py @@ -30,7 +30,10 @@ TransferManagerFactory, ) from awscli.customizations.s3.fileformat import FileFormat -from awscli.customizations.s3.filegenerator import FileGenerator +from awscli.customizations.s3.filegenerator import ( + FileGenerator, + VersionedFileGenerator, +) from awscli.customizations.s3.fileinfo import FileInfo from awscli.customizations.s3.fileinfobuilder import FileInfoBuilder from awscli.customizations.s3.filters import create_filter @@ -642,6 +645,15 @@ ), } +ALL_VERSIONS = { + 'name': 'all-versions', + 'action': 'store_true', + 'help_text': ( + "Process all versions of objects, including delete markers. For specific objects, " + "all versions are included. For buckets/prefixes, all versions of all objects are included." + ), +} + TRANSFER_ARGS = [ DRYRUN, QUIET, @@ -1104,6 +1116,7 @@ class RmCommand(S3TransferCommand): EXCLUDE, ONLY_SHOW_ERRORS, PAGE_SIZE, + ALL_VERSIONS, ] @@ -1259,7 +1272,11 @@ def create_instructions(self): instruction list because it sends the request to S3 and does not yield anything. """ - if self.needs_filegenerator(): + if self.needs_versiongenerator(): + self.instructions.append('versioned_file_generator') + if self.parameters.get('filters'): + self.instructions.append('filters') + elif self.needs_filegenerator(): self.instructions.append('file_generator') if self.parameters.get('filters'): self.instructions.append('filters') @@ -1268,6 +1285,9 @@ def create_instructions(self): self.instructions.append('file_info_builder') self.instructions.append('s3_handler') + def needs_versiongenerator(self): + return self.parameters.get('all_versions') + def needs_filegenerator(self): return not self.parameters['is_stream'] @@ -1365,8 +1385,14 @@ def run(self): self._map_request_payer_params(rgen_request_parameters) rgen_kwargs['request_parameters'] = rgen_request_parameters - file_generator = FileGenerator(**fgen_kwargs) - rev_generator = FileGenerator(**rgen_kwargs) + if self.cmd == 'rm' and self.parameters.get('all_versions'): + versioned_file_generator = VersionedFileGenerator(**fgen_kwargs) + file_generator = None + rev_generator = None + else: + file_generator = FileGenerator(**fgen_kwargs) + rev_generator = FileGenerator(**rgen_kwargs) + versioned_file_generator = None stream_dest_path, stream_compare_key = find_dest_path_comp_key(files) stream_file_info = [ FileInfo( @@ -1416,6 +1442,13 @@ def run(self): 'file_info_builder': [file_info_builder], 's3_handler': [s3_transfer_handler], } + elif self.cmd == 'rm' and self.parameters.get('all_versions'): + command_dict = { + 'setup': [files], + 'versioned_file_generator': [versioned_file_generator], + 'filters': [create_filter(self.parameters)], + 's3_handler': [s3_transfer_handler], + } elif self.cmd == 'rm': command_dict = { 'setup': [files], diff --git a/awscli/customizations/s3/utils.py b/awscli/customizations/s3/utils.py index f6a26803b082..b3ebcd13d934 100644 --- a/awscli/customizations/s3/utils.py +++ b/awscli/customizations/s3/utils.py @@ -431,6 +431,62 @@ def list_objects( ) yield source_path, content + def list_object_versions( + self, bucket, prefix=None, page_size=None, extra_args=None + ): + """ + List all versions of objects in the bucket. + + This method yields tuples of (source_path, content, version_id) where: + source_path is the S3 path of the object (bucket/key) + content is the object metadata + version_id is the version ID of the object + + :param bucket: The name of the bucket to list versions for. + :param prefix: The prefix to filter objects by. If provided, only versions of objects + with keys starting with this prefix will be listed. + :param page_size: The number of versions to list per page. + :param extra_args: Additional arguments to pass to the list_object_versions_v2 operation. + :yields: Tuples of (source_path, content, version_id) + + """ + kwargs = { + 'Bucket': bucket, + 'PaginationConfig': {'PageSize': page_size}, + } + if prefix is not None: + kwargs['Prefix'] = prefix + if extra_args is not None: + kwargs.update(extra_args) + + paginator = self._client.get_paginator('list_object_versions') + pages = paginator.paginate(**kwargs) + for page in pages: + versions = page.get('Versions', []) + + for ( + source_path, + content, + version_id, + ) in self._process_object_versions(bucket, versions): + yield source_path, content, version_id + + delete_markers = page.get('DeleteMarkers', []) + for ( + source_path, + content, + version_id, + ) in self._process_object_versions(bucket, delete_markers): + yield source_path, content, version_id + + def _process_object_versions(self, bucket, object_versions): + for version in object_versions: + source_path = bucket + '/' + version['Key'] + version['LastModified'] = self._date_parser( + version['LastModified'] + ) + yield source_path, version, version['VersionId'] + class PrintTask( namedtuple('PrintTask', ['message', 'error', 'total_parts', 'warning']) diff --git a/awscli/s3transfer/constants.py b/awscli/s3transfer/constants.py index 9a2ec9bb3333..c2ab77ec62c9 100644 --- a/awscli/s3transfer/constants.py +++ b/awscli/s3transfer/constants.py @@ -36,3 +36,4 @@ USER_AGENT = f's3transfer/{s3transfer.__version__}' PROCESS_USER_AGENT = f'{USER_AGENT} processpool' +MAX_BATCH_SIZE = 1000 diff --git a/awscli/s3transfer/delete.py b/awscli/s3transfer/delete.py index 74084d312a7d..baf3305258c2 100644 --- a/awscli/s3transfer/delete.py +++ b/awscli/s3transfer/delete.py @@ -10,6 +10,7 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. + from s3transfer.tasks import SubmissionTask, Task @@ -69,3 +70,78 @@ def _main(self, client, bucket, key, extra_args): """ client.delete_object(Bucket=bucket, Key=key, **extra_args) + + +class BatchDeleteSubmissionTask(SubmissionTask): + """Task for submitting tasks to execute batch deletion of S3 objects""" + + def _submit(self, client, request_executor, transfer_future, **kwargs): + """ + :param client: The client associated with the transfer manager + + :type config: s3transfer.manager.TransferConfig + :param config: The transfer config associated with the transfer + manager + + :type osutil: s3transfer.utils.OSUtil + :param osutil: The os utility associated to the transfer manager + + :type request_executor: s3transfer.futures.BoundedExecutor + :param request_executor: The request executor associated with the + transfer manager + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future associated with the + transfer request that tasks are being submitted for + """ + call_args = transfer_future.meta.call_args + + # Extract the objects to delete from the call_args + bucket = call_args.bucket + objects = call_args.objects + extra_args = call_args.extra_args or {} + + self._transfer_coordinator.submit( + request_executor, + BatchDeleteObjectsTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': bucket, + 'objects': objects, + 'extra_args': extra_args, + }, + is_final=True, + ), + ) + + +class BatchDeleteObjectsTask(Task): + def _main(self, client, bucket, objects, extra_args): + """ + Delete multiple objects in a single API call + + :param client: The S3 client to use when calling DeleteObjects + + :type bucket: str + :param bucket: The name of the bucket. + + :type objects: list + :param objects: The list of objects to delete. + + :type extra_args: dict + :param extra_args: Extra arguments to pass to the DeleteObjects call. + """ + # Create a copy of extra_args to avoid modifying the original + extra_args_copy = extra_args.copy() + + # Create the request body + delete_request = { + 'Objects': objects, + 'Quiet': False, + } + + response = client.delete_objects( + Bucket=bucket, Delete=delete_request, **extra_args_copy + ) + return response diff --git a/awscli/s3transfer/manager.py b/awscli/s3transfer/manager.py index baaafe1d37cb..1360454230fb 100644 --- a/awscli/s3transfer/manager.py +++ b/awscli/s3transfer/manager.py @@ -24,7 +24,7 @@ MB, ) from s3transfer.copies import CopySubmissionTask -from s3transfer.delete import DeleteSubmissionTask +from s3transfer.delete import BatchDeleteSubmissionTask, DeleteSubmissionTask from s3transfer.download import DownloadSubmissionTask from s3transfer.exceptions import CancelledError, FatalError from s3transfer.futures import ( @@ -665,6 +665,26 @@ def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError): self._request_executor.shutdown() self._io_executor.shutdown() + def batch_delete(self, bucket, objects, extra_args=None, subscribers=None): + """Delete multiple S3 objects in batches""" + if extra_args is None: + extra_args = {} + if subscribers is None: + subscribers = [] + # Version ID is not included in input shape. + allowed_delete_objects_args = [ + arg for arg in self.ALLOWED_DELETE_ARGS if arg != 'VersionId' + ] + self._validate_all_known_args(extra_args, allowed_delete_objects_args) + self._validate_if_bucket_supported(bucket) + call_args = CallArgs( + bucket=bucket, + objects=objects, + extra_args=extra_args, + subscribers=subscribers, + ) + return self._submit_transfer(call_args, BatchDeleteSubmissionTask) + class TransferCoordinatorController: def __init__(self): diff --git a/tests/functional/s3/__init__.py b/tests/functional/s3/__init__.py index 7705071ea752..20814ddd8d03 100644 --- a/tests/functional/s3/__init__.py +++ b/tests/functional/s3/__init__.py @@ -257,6 +257,74 @@ def mp_copy_responses(self): self.complete_mpu_response(), ] + def list_object_versions_request( + self, bucket, prefix=None, **override_kwargs + ): + params = { + 'Bucket': bucket, + } + if prefix is not None: + params['Prefix'] = prefix + params.update(override_kwargs) + return 'ListObjectVersions', params + + def list_object_versions_response(self, keys, **override_kwargs): + versions = [] + delete_markers = [] + for key in keys: + version = { + 'Key': key, + 'VersionId': 'version1', + 'LastModified': '00:00:00Z', + 'Size': 100, + 'ETag': '"foo-1"', + 'IsLatest': False, + } + delete_marker = { + 'Key': key, + 'VersionId': 'deletemarker1', + 'LastModified': '00:00:00Z', + 'IsLatest': True, + } + if override_kwargs: + version.update(override_kwargs) + delete_marker.update(override_kwargs) + + versions.append(version) + delete_markers.append(delete_marker) + return {'Versions': versions, 'DeleteMarkers': delete_markers} + + def delete_objects_request( + self, bucket, objects_to_delete, **override_kwargs + ): + params = { + 'Bucket': bucket, + 'Delete': { + 'Objects': objects_to_delete, + 'Quiet': False, + }, + } + params.update(override_kwargs) + return 'DeleteObjects', params + + def delete_objects_response(self, keys, **override_kwargs): + deleted = [] + + for key in keys: + # Create deleted entries for each version that was deleted + deleted_objects = [ + {'Key': key, 'VersionId': 'version1'}, + {'Key': key, 'VersionId': 'deletemarker1'}, + ] + + if override_kwargs: + for obj in deleted_objects: + obj.update(override_kwargs) + + deleted.extend(deleted_objects) + + return {'Deleted': deleted} + class BaseS3CLIRunnerTest(unittest.TestCase): def setUp(self): diff --git a/tests/functional/s3/test_rm_command.py b/tests/functional/s3/test_rm_command.py index 21d97e96d6d5..850f0b65682a 100644 --- a/tests/functional/s3/test_rm_command.py +++ b/tests/functional/s3/test_rm_command.py @@ -71,6 +71,49 @@ def test_recursive_delete_with_requests(self): ] ) + def test_delete_all_versions_of_single_object(self): + self.parsed_responses = [ + self.list_object_versions_response(['mykey']), + self.delete_objects_response(['mykey']), + ] + cmdline = f'{self.prefix} s3://mybucket/mykey --all-versions' + self.run_cmd(cmdline, expected_rc=0) + + self.assert_operations_called( + [ + self.list_object_versions_request('mybucket', prefix='mykey'), + self.delete_objects_request( + 'mybucket', + [ + {'Key': 'mykey', 'VersionId': 'version1'}, + {'Key': 'mykey', 'VersionId': 'deletemarker1'}, + ], + ), + ] + ) + + def test_recursive_delete_all_versions_of_all_objects(self): + self.parsed_responses = [ + self.list_object_versions_response(['mykey1', 'mykey2']), + self.delete_objects_response(['mykey1', 'mykey2']), + ] + cmdline = f'{self.prefix} s3://mybucket/ --all-versions --recursive' + self.run_cmd(cmdline, expected_rc=0) + self.assert_operations_called( + [ + self.list_object_versions_request('mybucket', prefix=''), + self.delete_objects_request( + 'mybucket', + [ + {'Key': 'mykey1', 'VersionId': 'version1'}, + {'Key': 'mykey2', 'VersionId': 'version1'}, + {'Key': 'mykey1', 'VersionId': 'deletemarker1'}, + {'Key': 'mykey2', 'VersionId': 'deletemarker1'}, + ], + ), + ] + ) + class TestRmWithCRTClient(BaseCRTTransferClientTest): def test_delete_using_crt_client(self): diff --git a/tests/functional/s3transfer/test_delete.py b/tests/functional/s3transfer/test_delete.py index d8816d1bb47b..836cf77d9fb8 100644 --- a/tests/functional/s3transfer/test_delete.py +++ b/tests/functional/s3transfer/test_delete.py @@ -74,3 +74,76 @@ def test_raise_exception_on_s3_object_lambda_resource(self): ) with self.assertRaisesRegex(ValueError, 'methods do not support'): self.manager.delete(s3_object_lambda_arn, self.key) + + +class TestBatchDelete(BaseGeneralInterfaceTest): + __test__ = True + + def setUp(self): + super().setUp() + self.bucket = 'mybucket' + self.key = 'mykey' + self.objects = [ + {'Key': self.key, 'VersionId': 'version1'}, + {'Key': self.key, 'VersionId': 'deletemarker1'}, + ] + self.manager = TransferManager(self.client) + + @property + def method(self): + """The transfer manager method to invoke i.e. upload()""" + return self.manager.batch_delete + + def create_call_kwargs(self): + """The kwargs to be passed to the transfer manager method""" + return { + 'bucket': self.bucket, + 'objects': self.objects, + } + + def create_invalid_extra_args(self): + return { + 'BadKwargs': True, + } + + def create_stubbed_responses(self): + """A list of stubbed responses that will cause the request to succeed + + The elements of this list is a dictionary that will be used as key + word arguments to botocore.Stubber.add_response(). For example:: + + [{'method': 'delete_objects', 'service_response': {}}] + """ + return [ + { + 'method': 'delete_objects', + 'service_response': {}, + 'expected_params': { + 'Bucket': self.bucket, + 'Delete': {'Objects': self.objects, 'Quiet': False}, + }, + } + ] + + def create_expected_progress_callback_info(self): + return [] + + def test_known_allowed_args_in_input_shape(self): + op_model = self.client.meta.service_model.operation_model( + 'DeleteObjects' + ) + allowed_delete_objects_args = [ + arg + for arg in self.manager.ALLOWED_DELETE_ARGS + if arg != 'VersionId' + ] + for allowed_arg in allowed_delete_objects_args: + self.assertIn(allowed_arg, op_model.input_shape.members) + + def test_raise_exception_on_s3_object_lambda_resource(self): + s3_object_lambda_arn = ( + 'arn:aws:s3-object-lambda:us-west-2:123456789012:' + 'accesspoint:my-accesspoint' + ) + with self.assertRaisesRegex(ValueError, 'methods do not support'): + self.manager.batch_delete(s3_object_lambda_arn, self.key) diff --git a/tests/integration/s3transfer/test_delete.py b/tests/integration/s3transfer/test_delete.py index ee16fecfa47d..dba2035ffff9 100644 --- a/tests/integration/s3transfer/test_delete.py +++ b/tests/integration/s3transfer/test_delete.py @@ -26,3 +26,22 @@ def test_can_delete_object(self): future.result() self.assertTrue(self.object_not_exists(key_name)) + + +class TestBatchDeleteObject(BaseTransferManagerIntegTest): + def test_can_delete_versioned_objects(self): + key_name = 'mykey' + self.client.put_object( + Bucket=self.bucket_name, Key=key_name, Body=b'hello world' + ) + response = self.client.list_object_versions(Bucket=self.bucket_name) + version_id = response['Versions'][0]['VersionId'] + objects = [{'Key': key_name, 'VersionId': version_id}] + transfer_manager = self.create_transfer_manager() + future = transfer_manager.batch_delete( + bucket=self.bucket_name, objects=objects + ) + + future.result() + + self.assertTrue(self.object_not_exists(key_name)) diff --git a/tests/unit/customizations/s3/test_s3handler.py b/tests/unit/customizations/s3/test_s3handler.py index 765f66b2c8e9..76159d46eb6a 100644 --- a/tests/unit/customizations/s3/test_s3handler.py +++ b/tests/unit/customizations/s3/test_s3handler.py @@ -12,10 +12,11 @@ # language governing permissions and limitations under the License. import os +import pytest from s3transfer.manager import TransferManager from awscli.compat import queue -from awscli.customizations.s3.fileinfo import FileInfo +from awscli.customizations.s3.fileinfo import FileInfo, VersionedFileInfo from awscli.customizations.s3.results import ( CommandResultRecorder, DoneResultSubscriber, @@ -29,6 +30,9 @@ SuccessResult, ) from awscli.customizations.s3.s3handler import ( + BatchDelete, + BatchDeleteRequestSubmitter, + BatchS3TransferHandler, CopyRequestSubmitter, DeleteRequestSubmitter, DownloadRequestSubmitter, @@ -57,6 +61,7 @@ StdoutBytesWriter, WarningResult, ) +from awscli.s3transfer.constants import MAX_BATCH_SIZE from awscli.testutils import FileCreator, mock, unittest @@ -78,6 +83,15 @@ def test_call(self): S3TransferHandler, ) + def test_call_returns_batch_s3_handler_with_all_versions(self): + self.cli_params['all_versions'] = True + factory = S3TransferHandlerFactory(self.cli_params) + self.assertTrue(factory._requires_batch_handler()) + self.assertIsInstance( + factory(self.transfer_manager, self.result_queue), + BatchS3TransferHandler, + ) + class TestS3TransferHandler(unittest.TestCase): def setUp(self): @@ -1253,3 +1267,170 @@ def test_dry_run(self): self.assertEqual(result.transfer_type, 'delete') self.assertTrue(result.src.endswith(self.filename)) self.assertIsNone(result.dest) + + +class TestBatchS3TransferHandler: + @pytest.fixture(autouse=True) + def setUp(self): + self.result_queue = queue.Queue() + self.result_recorder = ResultRecorder() + self.processed_results = [] + self.result_processor = ResultProcessor( + self.result_queue, + [self.result_recorder, self.processed_results.append], + ) + self.command_result_recorder = CommandResultRecorder( + self.result_queue, self.result_recorder, self.result_processor + ) + + self.transfer_manager = mock.Mock(spec=TransferManager) + self.transfer_manager.__enter__ = mock.Mock() + self.transfer_manager.__exit__ = mock.Mock() + self.parameters = {} + self.batch_s3_handler = BatchS3TransferHandler( + self.transfer_manager, + self.parameters, + self.command_result_recorder, + ) + + def test_call_return_command_result(self): + num_failures = 5 + num_warnings = 3 + self.result_recorder.files_failed = num_failures + self.result_recorder.files_warned = num_warnings + command_result = self.batch_s3_handler.call([]) + assert command_result == (num_failures, num_warnings) + + def test_enqueue_batch_deletes(self): + fileinfos = [] + num_transfers = 5 + for i in range(num_transfers): + fileinfo = VersionedFileInfo( + src=f'bucket/key{i}', + dest=None, + operation_name='delete', + src_type='s3', + version_id=f'version{i}', + ) + fileinfos.append(fileinfo) + + self.batch_s3_handler.call(iter(fileinfos)) + self.transfer_manager.batch_delete.assert_called_once() + + def test_notifies_total_submissions(self): + fileinfos = [] + num_transfers = 5 + for i in range(num_transfers): + fileinfo = VersionedFileInfo( + src=f'bucket/key{i}', + dest=None, + operation_name='delete', + src_type='s3', + version_id=f'version{i}', + ) + fileinfos.append(fileinfo) + + self.batch_s3_handler.call(iter(fileinfos)) + assert ( + self.result_recorder.final_expected_files_transferred + == num_transfers + ) + + +class TestBatchDelete: + def test_create_batches_with_multiple_objects(self): + self.result_queue = queue.Queue() + self.delete_batch = BatchDelete(self.result_queue) + self.bucket = 'mybucket' + self.key = 'mykey' + fileinfos = [] + num_objects = MAX_BATCH_SIZE + 5 + for i in range(num_objects): + fileinfo = VersionedFileInfo( + src=self.bucket + f'/key{i}', + dest=None, + operation_name='delete', + src_type='s3', + version_id=f'version{i}', + ) + fileinfos.append(fileinfo) + + batches = self.delete_batch.create_batches(fileinfos) + assert len(batches) == 2 + assert len(batches[0]['delete_request']['Objects']) == MAX_BATCH_SIZE + assert len(batches[1]['delete_request']['Objects']) == 5 + for batch in batches: + assert batch['bucket'] == self.bucket + + +class TestBatchDeleteRequestSubmitter: + @pytest.fixture(autouse=True) + def setUp(self): + self._transfer_manager = mock.Mock(spec=TransferManager) + self._result_queue = queue.Queue() + self._cli_params = {} + self.bucket = 'mybucket' + self.key = 'mykey' + self.delete_batch_submitter = BatchDeleteRequestSubmitter( + self._transfer_manager, self._result_queue, self._cli_params + ) + + def test_can_submit_with_delete_operation_and_version_id(self): + fileinfo = VersionedFileInfo( + src=self.bucket + "/" + self.key, + dest=None, + operation_name='delete', + src_type='s3', + version_id='version123', + ) + assert self.delete_batch_submitter.can_submit(fileinfo) + + def test_cannot_submit_local_src_type_for_delete(self): + fileinfo = VersionedFileInfo( + src=self.bucket + "/" + self.key, + dest=None, + operation_name='delete', + src_type='local', + version_id='', + ) + assert not self.delete_batch_submitter.can_submit(fileinfo) + + def test_submit_batch_with_multiple_objects(self): + fileinfos = [] + num_files = 5 + for i in range(num_files): + fileinfo = VersionedFileInfo( + src=self.bucket + f'/key{i}', + dest=None, + operation_name='delete', + src_type='s3', + version_id=f'version{i}', + ) + fileinfos.append(fileinfo) + + mock_future = mock.Mock() + mock_future.result.return_value = None + self._transfer_manager.batch_delete.return_value = mock_future + total_objects = self.delete_batch_submitter.submit_batch(fileinfos) + assert total_objects == num_files + self._transfer_manager.batch_delete.assert_called_once() + + def test_dry_run(self): + self._cli_params['dryrun'] = True + fileinfos = [] + num_files = 5 + for i in range(num_files): + fileinfo = VersionedFileInfo( + src=self.bucket + f'/key{i}', + dest=None, + operation_name='delete', + src_type='s3', + version_id=f'version{i}', + ) + fileinfos.append(fileinfo) + + total_objects = self.delete_batch_submitter.submit_batch(fileinfos) + assert total_objects == num_files + self._transfer_manager.batch_delete.assert_not_called() + result = self._result_queue.get() + assert isinstance(result, DryRunResult) diff --git a/tests/unit/customizations/s3/test_utils.py b/tests/unit/customizations/s3/test_utils.py index 47e006d73910..392da9919673 100644 --- a/tests/unit/customizations/s3/test_utils.py +++ b/tests/unit/customizations/s3/test_utils.py @@ -1031,3 +1031,94 @@ def test_has_underlying_s3_path( ): has_underlying_s3_path = S3PathResolver.has_underlying_s3_path(path) assert has_underlying_s3_path == expected_has_underlying_s3_path + + +class TestBucketVersionLister: + @pytest.fixture(autouse=True) + def setUp(self): + self.client = mock.Mock() + self.emitter = HierarchicalEmitter() + self.client.meta.events = self.emitter + self.date_parser = mock.Mock() + self.date_parser.return_value = mock.sentinel.now + self.responses = [] + + def fake_paginate(self, *args, **kwargs): + for response in self.responses: + self.emitter.emit( + 'after-call.s3.ListObjectsVersion', parsed=response + ) + return self.responses + + def test_list_object_versions(self): + now = mock.sentinel.now + self.client.get_paginator.return_value.paginate = self.fake_paginate + versions = [ + { + 'LastModified': '2015-08-05T04:20:38.000Z', + 'Key': 'a', + 'Size': 1, + 'VersionId': 'version1', + }, + { + 'LastModified': '2015-08-05T04:20:38.000Z', + 'Key': 'b', + 'Size': 2, + 'VersionId': 'version2', + }, + ] + + delete_markers = [ + { + 'LastModified': '2015-08-05T04:20:38.000Z', + 'Key': 'b', + 'VersionId': 'delete1', + }, + ] + + self.responses = [ + {'Versions': versions[0:1], 'DeleteMarkers': []}, + {'Versions': [versions[1]], 'DeleteMarkers': delete_markers}, + ] + + lister = BucketLister(self.client, self.date_parser) + objects = list(lister.list_object_versions(bucket='foo')) + expected_version_a = versions[0].copy() + expected_version_b = versions[1].copy() + expected_delete_marker_a = delete_markers[0].copy() + + assert objects == [ + ('foo/a', expected_version_a, 'version1'), + ('foo/b', expected_version_b, 'version2'), + ('foo/b', expected_delete_marker_a, 'delete1'), + ] + for version in versions: + assert version['LastModified'] == now + for delete_marker in delete_markers: + assert delete_marker['LastModified'] == now + + def test_list_object_versions_with_extra_args(self): + self.client.get_paginator.return_value.paginate.return_value = [ + { + 'Versions': [ + { + 'LastModified': '2015-08-05T04:20:38.000Z', + 'Key': 'a', + 'Size': 1, + 'VersionId': 'version1', + }, + ], + 'DeleteMarkers': [], + } + ] + lister = BucketLister(self.client, self.date_parser) + list( + lister.list_object_versions( + bucket='mybucket', extra_args={'RequestPayer': 'requester'} + ) + ) + self.client.get_paginator.return_value.paginate.assert_called_with( + Bucket='mybucket', + PaginationConfig={'PageSize': None}, + RequestPayer='requester', + )