Skip to content

Commit 6c3705f

Browse files
committed
resolve and display causes of error as is best possible
currently we just get out cleanly, and make sure the causation is preserved. Later (v4.0) we may introduce a TransferInterrupted or similar which can be more useful than RuntimeError for indicating what happened. (in place of the RuntimeError("xxx failed.") in irods/manager/data_object_manager.)
1 parent db71ef2 commit 6c3705f

File tree

2 files changed

+35
-26
lines changed

2 files changed

+35
-26
lines changed

irods/manager/data_object_manager.py

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -246,15 +246,19 @@ def _download(self, obj, local_path, num_threads, updatables=(), **options):
246246
if self.should_parallelize_transfer(
247247
num_threads, o, open_options=options.items()
248248
):
249-
if not self.parallel_get(
250-
(obj, o),
251-
local_file,
252-
num_threads=num_threads,
253-
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
254-
data_open_returned_values=data_open_returned_values_,
255-
updatables=updatables,
256-
):
257-
raise RuntimeError("parallel get failed")
249+
error = RuntimeError("parallel get failed")
250+
try:
251+
if not self.parallel_get(
252+
(obj, o),
253+
local_file,
254+
num_threads=num_threads,
255+
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
256+
data_open_returned_values=data_open_returned_values_,
257+
updatables=updatables,
258+
):
259+
raise error
260+
except BaseException as e:
261+
raise error from e
258262
else:
259263
with open(local_file, "wb") as f:
260264
for chunk in chunks(o, self.READ_BUFFER_SIZE):
@@ -354,17 +358,21 @@ def put(
354358
):
355359
o = deferred_call(self.open, (obj, "w"), options)
356360
f.close()
357-
if not self.parallel_put(
358-
local_path,
359-
(obj, o),
360-
total_bytes=sizelist[0],
361-
num_threads=num_threads,
362-
target_resource_name=options.get(kw.RESC_NAME_KW, "")
363-
or options.get(kw.DEST_RESC_NAME_KW, ""),
364-
open_options=options,
365-
updatables=updatables,
366-
):
367-
raise RuntimeError("parallel put failed")
361+
error = RuntimeError("parallel put failed")
362+
try:
363+
if not self.parallel_put(
364+
local_path,
365+
(obj, o),
366+
total_bytes=sizelist[0],
367+
num_threads=num_threads,
368+
target_resource_name=options.get(kw.RESC_NAME_KW, "")
369+
or options.get(kw.DEST_RESC_NAME_KW, ""),
370+
open_options=options,
371+
updatables=updatables,
372+
):
373+
raise error
374+
except BaseException as e:
375+
raise error from e
368376
else:
369377
with self.open(obj, "w", **options) as o:
370378
# Set operation type to trigger acPostProcForPut

irods/parallel.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk):
473473
transfer_managers[mgr] = (_quit_current_transfer, [id(mgr)])
474474

475475
try:
476-
transfer_aborted = False
476+
thread_setup_error = None
477477

478478
for byte_range in ranges:
479479
if Io is None:
@@ -510,22 +510,23 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk):
510510
)
511511
except RuntimeError as error:
512512
# Executor was probably shut down before parallel transfer could be initiated.
513-
transfer_aborted = True
513+
thread_setup_error = error
514514
break
515515
else:
516516
mgr.add_future(f)
517517

518518
counter += 1
519519
Io = File = None
520520

521-
if transfer_aborted:
522-
return ((bytes_transferred:=0), total_size)
521+
if thread_setup_error:
522+
raise thread_setup_error
523+
524+
bytes_transferred = 0
523525

524526
if Operation.isNonBlocking():
525527
transfer_managers[mgr] = None
526528
return (futures, mgr, queueObject)
527529
else:
528-
bytes_transferred = 0
529530
# Enable user attempts to cancel the current synchronous transfer.
530531
# At any given time, only one transfer manager key should map to a tuple object T.
531532
# You should be able to quit all threads of the current transfer by calling T[0](*T[1]).
@@ -538,7 +539,7 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk):
538539
return (bytes_transferred, total_size)
539540

540541
except BaseException as e:
541-
if isinstance(e, (SystemExit, KeyboardInterrupt)):
542+
if isinstance(e, (SystemExit, KeyboardInterrupt, RuntimeError)):
542543
mgr.quit()
543544
raise
544545

0 commit comments

Comments
 (0)