Skip to content

Commit 2a4ddfa

Browse files
authored
Waiting for multithreaded futures to finish and adding basic logging to helper functions. (#56)
1 parent b7d262d commit 2a4ddfa

File tree

2 files changed

+49
-5
lines changed

2 files changed

+49
-5
lines changed

ds3/ds3Helpers.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# specific language governing permissions and limitations under the License.
1111
import concurrent.futures
1212
import hashlib
13+
import logging
1314
import time
1415
import zlib
1516
from os import walk, path
@@ -124,6 +125,14 @@ def calculate_checksum_header(object_data_stream, checksum_type: str, length: in
124125
return {header_key: encoded_checksum}
125126

126127

128+
def done_callback(future: concurrent.futures.Future):
129+
try:
130+
result = future.result()
131+
logging.info(f'Finished transferring blob name={result[0]}, offset={result[1]}')
132+
except Exception as ex:
133+
logging.error(f'{ex}')
134+
135+
127136
class Helper(object):
128137
"""A class that moves data to and from a Black Pearl"""
129138

@@ -186,6 +195,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
186195
PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects, name=job_name))
187196

188197
job_id = bulk_put.result['JobId']
198+
logging.info(f'Created put job {job_id} with {len(put_objects)} objects.')
189199

190200
blob_set: Set[Blob] = set()
191201
for chunk in bulk_put.result['ObjectsList']:
@@ -197,6 +207,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
197207
blob_set.add(cur_blob)
198208

199209
# send until all blobs have been transferred
210+
error_count: int = 0
200211
while len(blob_set) > 0:
201212
available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3(
202213
GetJobChunksReadyForClientProcessingSpectraS3Request(job_id))
@@ -208,6 +219,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
208219
continue
209220

210221
# retrieve all available blobs concurrently
222+
futures: List[concurrent.futures.Future] = list()
211223
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
212224
for chunk in chunks:
213225
for blob in chunk['ObjectList']:
@@ -220,8 +232,21 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
220232
blob_set.remove(cur_blob)
221233
put_object = put_objects_map[cur_blob.name]
222234

223-
executor.submit(self.put_blob, bucket, put_object, cur_blob.length, cur_blob.offset, job_id,
224-
checksum_type)
235+
future = executor.submit(self.put_blob, bucket, put_object, cur_blob.length,
236+
cur_blob.offset, job_id, checksum_type)
237+
future.add_done_callback(done_callback)
238+
futures.append(future)
239+
240+
# Wait for all futures to finish
241+
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
242+
for future in futures:
243+
if future.exception() is not None:
244+
error_count += 1
245+
246+
if error_count > 0:
247+
logging.warning(f'Completed job {job_id} with {error_count} errors.')
248+
else:
249+
logging.info(f'Completed job {job_id} with no errors.')
225250

226251
return job_id
227252

@@ -244,6 +269,7 @@ def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset
244269
job=job_id,
245270
headers=headers))
246271
stream.close()
272+
return put_object.object_name, offset
247273

248274
def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000,
249275
max_threads: int = 5, calculate_checksum: bool = False,
@@ -329,6 +355,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
329355
name=job_name))
330356

331357
job_id = bulk_get.result['JobId']
358+
logging.info(f'Created get job {job_id} with {len(get_objects)} objects.')
332359

333360
blob_set: Set[Blob] = set()
334361
for chunk in bulk_get.result['ObjectsList']:
@@ -340,6 +367,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
340367
blob_set.add(cur_blob)
341368

342369
# retrieve until all blobs have been transferred
370+
error_count: int = 0
343371
while len(blob_set) > 0:
344372
available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3(
345373
GetJobChunksReadyForClientProcessingSpectraS3Request(job_id))
@@ -351,6 +379,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
351379
continue
352380

353381
# retrieve all available blobs concurrently
382+
futures: List[concurrent.futures.Future] = list()
354383
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
355384
for chunk in chunks:
356385
for blob in chunk['ObjectList']:
@@ -363,7 +392,20 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
363392
blob_set.remove(cur_blob)
364393
get_object = get_objects_map[cur_blob.name]
365394

366-
executor.submit(self.get_blob, bucket, get_object, offset, job_id)
395+
future = executor.submit(self.get_blob, bucket, get_object, offset, job_id)
396+
future.add_done_callback(done_callback)
397+
futures.append(future)
398+
399+
# Wait for all futures to finish
400+
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
401+
for future in futures:
402+
if future.exception() is not None:
403+
error_count += 1
404+
405+
if error_count > 0:
406+
logging.warning(f'Completed job {job_id} with {error_count} errors.')
407+
else:
408+
logging.info(f'Completed job {job_id} with no errors.')
367409

368410
return job_id
369411

@@ -376,6 +418,7 @@ def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id
376418
job=job_id,
377419
version_id=get_object.version_id))
378420
stream.close()
421+
return get_object.object_name, offset
379422

380423
def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000,
381424
max_threads: int = 5, job_name: str = None) -> List[str]:

tests/helpersTests.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
99
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
1010
# specific language governing permissions and limitations under the License.
11-
11+
import logging
1212
import unittest
1313
import os
1414
import tempfile
@@ -21,6 +21,8 @@
2121

2222
import xml.etree.ElementTree as xmlDom
2323

24+
logging.basicConfig(level=logging.INFO)
25+
2426

2527
def create_files_in_directory(directory: str, num_files: int, root_dir: str,
2628
include_dirs: bool = True) -> List[ds3Helpers.HelperPutObject]:
@@ -253,7 +255,6 @@ def put_all_objects_in_directory_with_checksum(self, checksum_type: str):
253255
# fetch existing storage domain
254256
storage_domain = client.get_storage_domains_spectra_s3(ds3.GetStorageDomainsSpectraS3Request())
255257
storage_domain_id = storage_domain.result['StorageDomainList'][0]['Id']
256-
print("test")
257258

258259
data_persistence_rule = client.put_data_persistence_rule_spectra_s3(
259260
ds3.PutDataPersistenceRuleSpectraS3Request(data_policy_id=data_policy_id, isolation_level='standard',

0 commit comments

Comments
 (0)