Skip to content
20 changes: 20 additions & 0 deletions awscli/botocore/httpchecksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def update(self, chunk):
def digest(self):
return self._int_crc32.to_bytes(4, byteorder="big")

@property
def int_crc(self):
return self._int_crc32


class CrtCrc32Checksum(BaseChecksum):
# Note: This class is only used if the CRT is available
Expand All @@ -88,6 +92,10 @@ def update(self, chunk):
def digest(self):
return self._int_crc32.to_bytes(4, byteorder="big")

@property
def int_crc(self):
return self._int_crc32


class CrtCrc32cChecksum(BaseChecksum):
# Note: This class is only used if the CRT is available
Expand All @@ -101,6 +109,10 @@ def update(self, chunk):
def digest(self):
return self._int_crc32c.to_bytes(4, byteorder="big")

@property
def int_crc(self):
return self._int_crc32


class CrtCrc64NvmeChecksum(BaseChecksum):
# Note: This class is only used if the CRT is available
Expand All @@ -114,6 +126,10 @@ def update(self, chunk):
def digest(self):
return self._int_crc64nvme.to_bytes(8, byteorder="big")

@property
def int_crc(self):
return self._int_crc32


class Sha1Checksum(BaseChecksum):
def __init__(self):
Expand Down Expand Up @@ -225,6 +241,10 @@ def __init__(self, raw_stream, content_length, checksum, expected):
self._checksum = checksum
self._expected = expected

@property
def checksum(self):
return self._checksum

def read(self, amt=None):
chunk = super().read(amt=amt)
self._checksum.update(chunk)
Expand Down
1 change: 1 addition & 0 deletions awscli/customizations/s3/filegenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ def _list_single_object(self, s3_path):
try:
params = {'Bucket': bucket, 'Key': key}
params.update(self.request_parameters.get('HeadObject', {}))
params["ChecksumMode"] = "ENABLED"
response = self._client.head_object(**params)
except ClientError as e:
# We want to try to give a more helpful error message.
Expand Down
4 changes: 4 additions & 0 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
DeleteSourceFileSubscriber,
DeleteSourceObjectSubscriber,
DirectoryCreatorSubscriber,
ProvideChecksumSubscriber,
ProvideETagSubscriber,
ProvideLastModifiedTimeSubscriber,
ProvideSizeSubscriber,
Expand Down Expand Up @@ -421,6 +422,9 @@ def _add_additional_subscribers(self, subscribers, fileinfo):
subscribers.append(
DeleteSourceObjectSubscriber(fileinfo.source_client)
)
subscribers.append(
ProvideChecksumSubscriber(fileinfo.associated_response_data)
)

def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
bucket, key = find_bucket_key(fileinfo.src)
Expand Down
21 changes: 21 additions & 0 deletions awscli/customizations/s3/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import time

from botocore.utils import percent_encode_sequence
from s3transfer.checksums import provide_checksum_to_meta
from s3transfer.subscribers import BaseSubscriber

from awscli.customizations.s3 import utils
Expand Down Expand Up @@ -99,6 +100,26 @@ def on_queued(self, future, **kwargs):
)


class ProvideChecksumSubscriber(BaseSubscriber):
"""
A subscriber which provides the object stored checksum and algorithm.
"""

def __init__(self, response_data):
self.response_data = response_data

def on_queued(self, future, **kwargs):
if hasattr(future.meta, 'provide_stored_checksum') and hasattr(
future.meta, 'provide_checksum_algorithm'
):
provide_checksum_to_meta(self.response_data, future.meta)
else:
LOGGER.debug(
f"Not providing stored checksum. Future: {future} does not "
"offer the capability to notify the checksum of an object",
)


class DeleteSourceSubscriber(OnDoneFilteredSubscriber):
"""A subscriber which deletes the source of the transfer."""

Expand Down
222 changes: 222 additions & 0 deletions awscli/s3transfer/checksums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""
NOTE: All classes and functions in this module are considered private and are
subject to abrupt breaking changes. Please do not use them directly.
"""

import base64
import logging
from copy import copy
from functools import cached_property

from botocore.httpchecksum import CrtCrc32Checksum
from s3transfer.exceptions import S3ValidationError

logger = logging.getLogger(__name__)


class PartStreamingChecksumBody:
def __init__(self, stream, starting_index, full_object_checksum):
self._stream = stream
self._starting_index = starting_index
self._checksum = CRC_CHECKSUM_CLS[
full_object_checksum.checksum_algorithm
]()
self._full_object_checksum = full_object_checksum
# If the underlying stream already has a checksum object
# it's updating (eg `botocore.httpchecksum.StreamingChecksumBody`),
# reuse its calculated value.
self._reuse_checksum = hasattr(self._stream, 'checksum')

@property
def checksum(self):
return self._checksum

def read(self, *args, **kwargs):
value = self._stream.read(*args, **kwargs)
if not self._reuse_checksum:
self._checksum.update(value)
if not value:
self._set_part_checksum()
return value

def _set_part_checksum(self):
if not self._reuse_checksum:
value = self._checksum.int_crc
else:
value = self._stream.checksum.int_crc
self._full_object_checksum.set_part_checksum(
self._starting_index,
value,
)


class FullObjectChecksum:
def __init__(self, checksum_algorithm, content_length):
self.checksum_algorithm = checksum_algorithm
self._content_length = content_length
self._combine_function = _CRC_CHECKSUM_TO_COMBINE_FUNCTION[
self.checksum_algorithm
]
self._stored_checksum = None
self._part_checksums = None
self._calculated_checksum = None

@cached_property
def calculated_checksum(self):
if self._calculated_checksum is None:
self._combine_part_checksums()
return self._calculated_checksum

def set_stored_checksum(self, stored_checksum):
self._stored_checksum = stored_checksum

def set_part_checksum(self, offset, checksum):
if self._part_checksums is None:
self._part_checksums = {}
self._part_checksums[offset] = checksum

def _combine_part_checksums(self):
if self._part_checksums is None:
return

sorted_offsets = sorted(self._part_checksums.keys())
# Initialize the combined checksum to the first part's checksum value.
combined = self._part_checksums[sorted_offsets[0]]
# To calculate part length, take the current offset and subtract from
# the next offset. If the current offset is the start of the last part,
# then subtract from the total content length. eg,
# (8388608, 16777216)
# (16777216, self._content_length)
remaining_offsets = sorted_offsets[1:]
next_offsets = sorted_offsets[2:] + [self._content_length]
for offset, next_offset in zip(remaining_offsets, next_offsets):
part_checksum = self._part_checksums[offset]
offset_len = next_offset - offset
combined = self._combine_function(
combined, part_checksum, offset_len
)

self._calculated_checksum = base64.b64encode(
combined.to_bytes(4, byteorder='big')
).decode('ascii')
Comment on lines +111 to +113
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually dependent on the checksum algorithm. I think instead I'll instantiate the appropriate checksum object, set the int_crc property to the combined value, then call the digest() method


def validate(self):
if self.calculated_checksum != self._stored_checksum:
raise S3ValidationError(
f"Calculated checksum {self.calculated_checksum} does not match "
f"stored checksum {self._stored_checksum}"
)
logger.debug(
f"Successfully validated stored checksum {self._stored_checksum} "
f"against calculated checksum {self.calculated_checksum}"
)


def provide_checksum_to_meta(response, transfer_meta):
stored_checksum = None
checksum_algorithm = None
checksum_type = response.get("ChecksumType")
if checksum_type and checksum_type == "FULL_OBJECT":
for crc_checksum in CRC_CHECKSUMS:
if checksum_value := response.get(crc_checksum):
stored_checksum = checksum_value
checksum_algorithm = crc_checksum
break
transfer_meta.provide_checksum_algorithm(checksum_algorithm)
transfer_meta.provide_stored_checksum(stored_checksum)


def combine_crc32(crc1, crc2, len2):
"""Combine two CRC32 values.

:type crc1: int
:param crc1: Current CRC32 integer value.

:type crc2: int
:param crc2: Second CRC32 integer value to combine.

:type len2: int
:param len2: Length of data that produced `crc2`.

:rtype: int
:returns: Combined CRC32 integer value.
"""
_GF2_DIM = 32
_CRC32_POLY = 0xEDB88320
_MASK_32BIT = 0xFFFFFFFF

def _gf2_matrix_times(mat, vec):
res = 0
idx = 0
while vec != 0:
if vec & 1:
res ^= mat[idx]
vec >>= 1
idx += 1
return res

def _gf2_matrix_square(square, mat):
res = copy(square)
for n in range(_GF2_DIM):
d = mat[n]
res[n] = _gf2_matrix_times(mat, d)
return res

even = [0] * _GF2_DIM
odd = [0] * _GF2_DIM

if len2 <= 0:
return crc1

odd[0] = _CRC32_POLY
row = 1
for i in range(1, _GF2_DIM):
odd[i] = row
row <<= 1

even = _gf2_matrix_square(even, odd)
odd = _gf2_matrix_square(odd, even)

while True:
even = _gf2_matrix_square(even, odd)
if len2 & 1:
crc1 = _gf2_matrix_times(even, crc1)
len2 >>= 1

if len2 == 0:
break

odd = _gf2_matrix_square(odd, even)
if len2 & 1:
crc1 = _gf2_matrix_times(odd, crc1)
len2 >>= 1

if len2 == 0:
break

return (crc1 ^ crc2) & _MASK_32BIT


_CRC_CHECKSUM_TO_COMBINE_FUNCTION = {
"ChecksumCRC32": combine_crc32,
}


CRC_CHECKSUM_CLS = {
"ChecksumCRC32": CrtCrc32Checksum,
}


CRC_CHECKSUMS = _CRC_CHECKSUM_TO_COMBINE_FUNCTION.keys()
Loading
Loading