Skip to content

Commit a758ef2

Browse files
committed
PYTHONSDK-99: Add optional blob checksum calculation to helpers. Also added documentation to helpers.
1 parent 2e96402 commit a758ef2

File tree

4 files changed

+252
-24
lines changed

4 files changed

+252
-24
lines changed

ds3/ds3Helpers.py

Lines changed: 159 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2021 Spectra Logic Corporation. All Rights Reserved.
1+
# Copyright 2021-2022 Spectra Logic Corporation. All Rights Reserved.
22
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use
33
# this file except in compliance with the License. A copy of the License is located at
44
#
@@ -8,14 +8,17 @@
88
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
99
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
1010
# specific language governing permissions and limitations under the License.
11-
12-
import time
1311
import concurrent.futures
14-
from .ds3 import *
12+
import hashlib
13+
import time
14+
import zlib
1515
from os import walk, path
16+
from platform import system
1617
from typing import List, Set, Dict
1718

18-
from platform import system
19+
from .ds3 import *
20+
21+
crc_byte_length = 4
1922

2023

2124
class EmptyReader(object):
@@ -87,12 +90,90 @@ def object_name_to_file_path(object_name: str) -> str:
8790
return object_name
8891

8992

93+
def calculate_checksum_header(object_data_stream, checksum_type: str, length: int):
94+
bytes_read = 0
95+
if checksum_type == 'CRC_32':
96+
checksum = 0
97+
while bytes_read < length:
98+
bytes_to_read = min(1024 * 1024, length - bytes_read)
99+
cur_bytes = object_data_stream.read(bytes_to_read)
100+
checksum = zlib.crc32(cur_bytes, checksum)
101+
bytes_read += bytes_to_read
102+
encoded_checksum = base64.b64encode(
103+
checksum.to_bytes(crc_byte_length, byteorder='big')).decode()
104+
return {'Content-CRC32': encoded_checksum}
105+
else:
106+
if checksum_type == 'MD5':
107+
checksum_calculator = hashlib.md5()
108+
header_key = 'Content-MD5'
109+
elif checksum_type == 'SHA_256':
110+
checksum_calculator = hashlib.sha256()
111+
header_key = 'Content-SHA256'
112+
elif checksum_type == 'SHA_512':
113+
checksum_calculator = hashlib.sha512()
114+
header_key = 'Content-SHA512'
115+
else:
116+
raise 'Not Implemented: calculating checksum type {0} is not currently supported in the SDK helpers'.format(
117+
checksum_type)
118+
while bytes_read < length:
119+
bytes_to_read = min(1024 * 1024, length - bytes_read)
120+
cur_bytes = object_data_stream.read(bytes_to_read)
121+
checksum_calculator.update(cur_bytes)
122+
bytes_read += bytes_to_read
123+
encoded_checksum = base64.b64encode(checksum_calculator.digest()).decode('utf-8')
124+
return {header_key: encoded_checksum}
125+
126+
90127
class Helper(object):
128+
"""A class that moves data to and from a Black Pearl"""
129+
91130
def __init__(self, client: Client, retry_delay_in_seconds: int = 60):
131+
"""
132+
Parameters
133+
----------
134+
client : ds3.Client
135+
A python client that is connected to a Black Pearl.
136+
retry_delay_in_seconds : int
137+
The number of seconds to wait between retrying a call if the Black Pearl is busy and unable to process the
138+
previous attempt (aka when BP returns error code 307).
139+
"""
92140
self.client = client
93141
self.retry_delay_in_seconds = retry_delay_in_seconds
94142

95-
def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threads: int = 5) -> str:
143+
def get_checksum_type(self, bucket_name: str) -> str:
144+
bucket_response = self.client.get_bucket_spectra_s3(GetBucketSpectraS3Request(bucket_name=bucket_name))
145+
146+
data_policy_id = bucket_response.result['DataPolicyId']
147+
policy_response = self.client.get_data_policy_spectra_s3(
148+
GetDataPolicySpectraS3Request(data_policy_id=data_policy_id))
149+
150+
return policy_response.result['ChecksumType']
151+
152+
def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threads: int = 5,
153+
calculate_checksum: bool = False) -> str:
154+
"""
155+
Puts a list of objects to a Black Pearl bucket.
156+
157+
Parameters
158+
----------
159+
put_objects : List[HelperPutObject]
160+
The list of objects to put into the BP bucket.
161+
bucket : str
162+
The name of the bucket where the objects are being landed.
163+
max_threads : int
164+
The number of concurrent objects being transferred at once (default 5).
165+
calculate_checksum : bool
166+
Weather or not the client calculates the object checksum before sending it to the BP (default False). The BP
167+
also calculates the checksum and compares it with the value the client calculates. The object put will fail
168+
if the client and BP checksums do not match. Note that calculating the checksum is processor intensive, and
169+
it also requires two reads of the object (first to calculate checksum, and secondly to send the data). The
170+
type of checksum calculated is determined by the data policy associated with the bucket.
171+
"""
172+
# If calculating checksum, then determine the checksum type from the data policy
173+
checksum_type = None
174+
if calculate_checksum is True:
175+
checksum_type = self.get_checksum_type(bucket_name=bucket)
176+
96177
ds3_put_objects: List[Ds3PutObject] = []
97178
put_objects_map: Dict[str, HelperPutObject] = dict()
98179
for entry in put_objects:
@@ -137,22 +218,56 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
137218
blob_set.remove(cur_blob)
138219
put_object = put_objects_map[cur_blob.name]
139220

140-
executor.submit(self.put_blob, bucket, put_object, cur_blob.length, cur_blob.offset, job_id)
221+
executor.submit(self.put_blob, bucket, put_object, cur_blob.length, cur_blob.offset, job_id,
222+
checksum_type)
141223

142224
return job_id
143225

144-
def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset: int, job_id: str):
226+
def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset: int, job_id: str,
227+
checksum_type: str = None):
228+
headers = None
229+
if checksum_type is not None:
230+
checksum_stream = put_object.get_data_stream(offset=offset)
231+
headers = calculate_checksum_header(object_data_stream=checksum_stream,
232+
checksum_type=checksum_type,
233+
length=length)
234+
checksum_stream.close()
235+
145236
stream = put_object.get_data_stream(offset)
146237
self.client.put_object(PutObjectRequest(bucket_name=bucket,
147238
object_name=put_object.object_name,
148239
length=length,
149240
stream=stream,
150241
offset=offset,
151-
job=job_id))
242+
job=job_id,
243+
headers=headers))
152244
stream.close()
153245

154246
def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000,
155-
max_threads: int = 5) -> List[str]:
247+
max_threads: int = 5, calculate_checksum: bool = False) -> List[str]:
248+
"""
249+
Puts all files and subdirectories to a Black Pearl bucket.
250+
251+
Parameters
252+
----------
253+
source_dir : str
254+
The directory that contains all the files and subdirectories to be put to the BP. Note that subdirectories
255+
will be represented by zero length objects whose name ends with the file path separator.
256+
bucket : str
257+
The name of the bucket where the files are being landed.
258+
objects_per_bp_job : int
259+
The number of objects per BP job (default 1000). A directory may contain more objects than a BP job can hold
260+
(max 500,000). In order to put all objects in a very large directory, multiple BP jobs may need to be
261+
created. This determines how many objects to bundle per BP job.
262+
max_threads : int
263+
The number of concurrent objects being transferred at once (default 5).
264+
calculate_checksum : bool
265+
Weather or not the client calculates the object checksum before sending it to the BP. The BP also calculates
266+
the checksum and compares it with the value the client calculates. The object put will fail if the client
267+
and BP checksums do not match. Note that calculating the checksum is processor intensive, and it also
268+
requires two reads of the object (first to calculate checksum, and secondly to send the data). The type of
269+
checksum calculated is determined by the data policy associated with the bucket.
270+
"""
156271
obj_list: List[HelperPutObject] = list()
157272
job_list: List[str] = list()
158273
for root, dirs, files in walk(top=source_dir):
@@ -162,7 +277,8 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per
162277
size = os.path.getsize(obj_path)
163278
obj_list.append(HelperPutObject(object_name=obj_name, file_path=obj_path, size=size))
164279
if len(obj_list) >= objects_per_bp_job:
165-
job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads))
280+
job_list.append(self.put_objects(
281+
obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum))
166282
obj_list = []
167283

168284
for name in dirs:
@@ -171,15 +287,29 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per
171287
path.join(path.normpath(path.relpath(path=dir_path, start=source_dir)), ""))
172288
obj_list.append(HelperPutObject(object_name=dir_name, file_path=dir_path, size=0))
173289
if len(obj_list) >= objects_per_bp_job:
174-
job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads))
290+
job_list.append(self.put_objects(
291+
obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum))
175292
obj_list = []
176293

177294
if len(obj_list) > 0:
178-
job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads))
295+
job_list.append(self.put_objects(
296+
obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum))
179297

180298
return job_list
181299

182300
def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threads: int = 5) -> str:
301+
"""
302+
Retrieves a list of objects from a Black Pearl bucket.
303+
304+
Parameters
305+
----------
306+
get_objects : List[HelperGetObject]
307+
The list of objects to retrieve from the BP bucket.
308+
bucket : str
309+
The name of the bucket where the objects are being retrieved from.
310+
max_threads : int
311+
The number of concurrent objects being transferred at once (default 5).
312+
"""
183313
ds3_get_objects: List[Ds3GetObject] = []
184314
get_objects_map: Dict[str, HelperGetObject] = dict()
185315
for entry in get_objects:
@@ -240,6 +370,22 @@ def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id
240370

241371
def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000,
242372
max_threads: int = 5) -> List[str]:
373+
"""
374+
Retrieves all objects from a Black Pearl bucket.
375+
376+
Parameters
377+
----------
378+
destination_dir : str
379+
The directory where all the objects will be landed.
380+
bucket : str
381+
The name of the bucket where the objects are being retrieved from.
382+
objects_per_bp_job : int
383+
The number of objects per BP job (default 1000). A bucket may contain more objects than a BP job can hold
384+
(max 500,000). In order to put all objects in a very large bucket, multiple BP jobs may need to be created.
385+
This determines how many objects to bundle per BP job.
386+
max_threads : int
387+
The number of concurrent objects being transferred at once (default 5).
388+
"""
243389
truncated: str = 'true'
244390
marker = ""
245391
job_ids: List[str] = []

samples/puttingDataWithCRC32.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2014-2017 Spectra Logic Corporation. All Rights Reserved.
1+
# Copyright 2014-2022 Spectra Logic Corporation. All Rights Reserved.
22
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use
33
# this file except in compliance with the License. A copy of the License is located at
44
#
@@ -9,13 +9,13 @@
99
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
1010
# specific language governing permissions and limitations under the License.
1111

12-
from ds3 import ds3
12+
import base64
1313
import os
1414
import time
15-
import sys
16-
import base64
1715
import zlib
1816

17+
from ds3 import ds3
18+
1919
client = ds3.createClientFromEnv()
2020

2121
crc_byte_length = 4
@@ -83,8 +83,18 @@ def pathForResource(resourceName):
8383
localFileName = "resources/" + obj['Name']
8484
objectDataStream = open(localFileName, "rb")
8585
objectDataStream.seek(int(obj['Offset']), 0)
86-
objectChunk = objectDataStream.read(int(obj['Length']))
87-
checksum = zlib.crc32(objectChunk)
86+
87+
# Calculate the rolling checksum by loading the file 1 MiB at a time. This allows
88+
# the calculation for very large blobs that otherwise cannot be loaded fully into memory.
89+
bytesRead = 0
90+
checksum = 0
91+
blobLength = int(obj['Length'])
92+
while bytesRead < blobLength:
93+
bytesToRead = min(1024 * 1024, blobLength - bytesRead)
94+
curBytes = objectDataStream.read(bytesToRead)
95+
checksum = zlib.crc32(curBytes, checksum)
96+
bytesRead += bytesToRead
97+
8898
encodedChecksum = base64.b64encode(
8999
checksum.to_bytes(crc_byte_length, byteorder='big')).decode()
90100
objectDataStream.seek(int(obj['Offset']), 0)

samples/puttingDataWithMD5.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,19 @@ def pathForResource(resourceName):
8080
localFileName = "resources/" + obj['Name']
8181
objectDataStream = open(localFileName, "rb")
8282
objectDataStream.seek(int(obj['Offset']), 0)
83-
objectChunk = objectDataStream.read(int(obj['Length']))
84-
checksum = hashlib.md5(objectChunk)
85-
encodedChecksum = base64.b64encode(checksum.digest()).decode('utf-8')
83+
84+
# Calculate the rolling checksum by loading the file 1 MiB at a time. This allows
85+
# the calculation for very large blobs that otherwise cannot be loaded fully into memory.
86+
bytesRead = 0
87+
blobLength = int(obj['Length'])
88+
checksumCalculator = hashlib.md5()
89+
while bytesRead < blobLength:
90+
bytesToRead = min(1024 * 1024, blobLength - bytesRead)
91+
curBytes = objectDataStream.read(bytesToRead)
92+
checksumCalculator.update(curBytes)
93+
bytesRead += bytesToRead
94+
95+
encodedChecksum = base64.b64encode(checksumCalculator.digest()).decode('utf-8')
8696
objectDataStream.seek(int(obj['Offset']), 0)
8797
client.put_object(ds3.PutObjectRequest(bucketName,
8898
obj['Name'],

tests/helpersTests.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def create_files_in_directory(directory: str, num_files: int, root_dir: str,
3232
obj_name = ds3Helpers.file_path_to_object_store_name(os.path.join(os.path.relpath(directory, root_dir), ""))
3333
put_objects.append(ds3Helpers.HelperPutObject(object_name=obj_name, file_path=directory, size=0))
3434

35-
# create an empty sub directory
35+
# create an empty subdirectory
3636
if include_dirs:
3737
dir_path = os.path.join(directory, 'empty-dir')
3838
os.mkdir(path=dir_path)
@@ -155,7 +155,7 @@ def test_put_and_get_objects(self):
155155
def test_put_and_get_all_objects_in_directory(self):
156156
bucket = f'ds3-python3-sdk-test-{uuid.uuid1()}'
157157

158-
# create temporary directory with some files and sub directories
158+
# create temporary directory with some files and subdirectories
159159
source = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-src-")
160160

161161
put_objects = create_files_in_directory(directory=source.name, num_files=5, root_dir=source.name)
@@ -204,6 +204,68 @@ def test_put_and_get_all_objects_in_directory(self):
204204
destination.cleanup()
205205
client.delete_bucket_spectra_s3(ds3.DeleteBucketSpectraS3Request(bucket_name=bucket, force=True))
206206

207+
def test_put_all_objects_in_directory_with_md5_checksum(self):
208+
self.put_all_objects_in_directory_with_checksum(checksum_type='MD5')
209+
210+
def test_put_all_objects_in_directory_with_crc32_checksum(self):
211+
self.put_all_objects_in_directory_with_checksum(checksum_type='CRC_32')
212+
213+
def test_put_all_objects_in_directory_with_sha_256_checksum(self):
214+
self.put_all_objects_in_directory_with_checksum(checksum_type='SHA_256')
215+
216+
def test_put_all_objects_in_directory_with_sha_512_checksum(self):
217+
self.put_all_objects_in_directory_with_checksum(checksum_type='SHA_512')
218+
219+
def put_all_objects_in_directory_with_checksum(self, checksum_type: str):
220+
bucket = f'ds3-python3-sdk-test-{uuid.uuid1()}'
221+
# checksum_type = 'MD5'
222+
223+
# create the BP client
224+
client = ds3.createClientFromEnv()
225+
226+
# create a data policy
227+
data_policy = client.put_data_policy_spectra_s3(ds3.PutDataPolicySpectraS3Request(
228+
name=f'sdk-test-{checksum_type}', checksum_type=checksum_type, end_to_end_crc_required=True))
229+
data_policy_id = data_policy.result['Id']
230+
231+
# fetch existing storage domain
232+
storage_domain = client.get_storage_domains_spectra_s3(ds3.GetStorageDomainsSpectraS3Request())
233+
storage_domain_id = storage_domain.result['StorageDomainList'][0]['Id']
234+
print("test")
235+
236+
data_persistence_rule = client.put_data_persistence_rule_spectra_s3(
237+
ds3.PutDataPersistenceRuleSpectraS3Request(data_policy_id=data_policy_id, isolation_level='standard',
238+
storage_domain_id=storage_domain_id, type='permanent'))
239+
240+
# create temporary directory with some files and subdirectories
241+
source = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-src-")
242+
243+
put_objects = create_files_in_directory(directory=source.name, num_files=5, root_dir=source.name,
244+
include_dirs=True)
245+
246+
# create the BP helper and perform the put all objects call
247+
client.put_bucket_spectra_s3(ds3.PutBucketSpectraS3Request(name=bucket, data_policy_id=data_policy_id))
248+
249+
helpers = ds3Helpers.Helper(client=client)
250+
job_ids = helpers.put_all_objects_in_directory(source_dir=source.name, bucket=bucket, calculate_checksum=True)
251+
self.assertGreaterEqual(len(job_ids), 1, "received at least one job id")
252+
253+
# verify all the files and directories are on the BP
254+
for put_object in put_objects:
255+
head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name=put_object.object_name))
256+
self.assertNotEqual(head_obj.result, "DOESNTEXIST")
257+
258+
# cleanup
259+
source.cleanup()
260+
261+
client.delete_bucket_spectra_s3(ds3.DeleteBucketSpectraS3Request(bucket_name=bucket, force=True))
262+
263+
client.delete_data_persistence_rule_spectra_s3(
264+
ds3.DeleteDataPersistenceRuleSpectraS3Request(data_persistence_rule_id=data_persistence_rule.result['Id']))
265+
266+
client.delete_data_policy_spectra_s3(
267+
ds3.DeleteDataPolicySpectraS3Request(data_policy_id=data_policy_id))
268+
207269

208270
if __name__ == '__main__':
209271
unittest.main()

0 commit comments

Comments
 (0)