Skip to content

Commit 657cff3

Browse files
iparaskIoannis Paraskevakos
andauthored
Fail a transfer when globus task has faults (#136)
* outgoing transfers failed * queue item fails * formatting * test running again * linting * faults can be recoverable errors * Cancel a task if a specific type of event occurs * isorting * fixing constant name --------- Co-authored-by: Ioannis Paraskevakos <ip8725@pulibrarian2.princeton.edu>
1 parent d5a03ae commit 657cff3

File tree

8 files changed

+89
-3
lines changed

8 files changed

+89
-3
lines changed

hera_librarian/async_transfers/core.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,10 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
7171
to set (e.g.) internal flags as required.
7272
"""
7373
raise NotImplementedError
74+
75+
@abc.abstractmethod
76+
def fail_transfer(self, settings: "ServerSettings") -> bool:
77+
"""
78+
Fail the current transfer.
79+
"""
80+
raise NotImplementedError

hera_librarian/async_transfers/globus.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import globus_sdk
99

1010
from hera_librarian.transfer import TransferStatus
11+
from hera_librarian.utils import GLOBUS_ERROR_EVENTS
1112

1213
from .core import CoreAsyncTransferManager
1314

@@ -148,6 +149,8 @@ def _get_transfer_data(self, label: str, settings: "ServerSettings"):
148149
verify_checksum=True, # We do this ourselves, but globus will auto-retry if it detects failed files
149150
preserve_timestamp=True,
150151
notify_on_succeeded=False,
152+
skip_source_errors=False,
153+
fail_on_quota_errors=True,
151154
)
152155

153156
return transfer_data
@@ -273,7 +276,9 @@ def batch_transfer(
273276
# Globus transfer.
274277
relative_local_path = self._subtract_local_root(local_path, settings)
275278
transfer_data.add_item(
276-
str(relative_local_path), str(remote_path), recursive=local_path.is_dir()
279+
str(relative_local_path),
280+
str(remote_path),
281+
recursive=local_path.is_dir(),
277282
)
278283

279284
# submit the transfer
@@ -328,5 +333,49 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
328333
return TransferStatus.COMPLETED
329334
elif task_doc["status"] == "FAILED":
330335
return TransferStatus.FAILED
336+
# When there are errors, better fail the task and try again. There is
337+
# a different check for faults to make the state transition as clear as
338+
# possible.
339+
elif task_doc["faults"] > 0:
340+
task_event_list = transfer_client.task_event_list(self.task_id)
341+
for event in task_event_list:
342+
if event["code"] in GLOBUS_ERROR_EVENTS and event["is_error"]:
343+
return TransferStatus.FAILED
344+
return TransferStatus.FAILED
331345
else: # "status" == "ACTIVE"
332346
return TransferStatus.INITIATED
347+
348+
def fail_transfer(self, settings: "ServerSettings") -> bool:
349+
"""
350+
A GLobus task needs to be canceled because it has errors.
351+
352+
Parameters
353+
----------
354+
settings : ServerSettings object
355+
The settings for the Librarian server. These settings should include
356+
the Globus login information.
357+
358+
Returns
359+
-------
360+
bool
361+
Whether we could successfully cancelled a transfer (True) or not
362+
(False).
363+
364+
"""
365+
authorizer = self.authorize(settings=settings)
366+
if authorizer is None:
367+
# We *should* be able to just assume that we have already
368+
# authenticated and should be able to query the status of our
369+
# transfer. However, if for whatever reason we're not able to talk
370+
# to Globus (network issues, Globus outage, etc.), we won't be able
371+
# to find out our transfer's status -- let's bail and assume we
372+
# failed
373+
return False
374+
375+
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)
376+
377+
try:
378+
_ = transfer_client.cancel_task(self.task_id)
379+
except globus_sdk.TransferAPIError as e:
380+
return False
381+
return True

hera_librarian/async_transfers/local.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,6 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
123123
return TransferStatus.INITIATED
124124
else:
125125
return TransferStatus.FAILED
126+
127+
def fail_transfer(self, settings: "ServerSettings") -> bool:
128+
return True

hera_librarian/async_transfers/rsync.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,6 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
7171
return TransferStatus.INITIATED
7272
else:
7373
return TransferStatus.FAILED
74+
75+
def fail_transfer(self, settings: "ServerSettings") -> bool:
76+
return True

hera_librarian/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,21 @@ def get_size_from_path(path):
228228
size += os.path.getsize(dirname + "/" + f)
229229

230230
return size
231+
232+
233+
# --- Globus Events ---
234+
GLOBUS_ERROR_EVENTS = [
235+
"AMBIGUOUS_PATH",
236+
"IS_A_DIRECTORY",
237+
"EXPIRED",
238+
"FILE_NOT_FOUND",
239+
"FILE_SIZE_CHANGED",
240+
"INVALID_PATH_NAME",
241+
"INVALID_SERVICE_CREDENTIAL",
242+
"INVALID_SYMLINK",
243+
"LIMIT_EXCEEDED",
244+
"NO_CREDENTIALS",
245+
"NO_SPACE_LEFT",
246+
"PERMISSION_DENIED",
247+
"QUOTA_EXCEEDED",
248+
]

librarian_background/queues.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ def check_on_consumed(
204204
)
205205
elif current_status == TransferStatus.FAILED:
206206
logger.info("Transfer for {q.id} has failed", q=queue_item)
207-
for transfer in queue_item.transfers:
208-
transfer.fail_transfer(session=session, commit=False)
207+
queue_item.async_transfer_manager.fail_transfer(server_settings)
208+
queue_item.fail(session=session)
209209
else:
210210
logger.error(
211211
"Incompatible return value for transfer status from "

tests/background_unit_test/test_send_queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ def valid(self, *args, **kwargs):
2121
def transfer_status(self, *args, **kwargs):
2222
return self.complete_transfer_status
2323

24+
def fail_transfer(self, settings):
25+
return True
26+
2427

2528
def test_create_queue_item(test_server_with_valid_file, test_orm):
2629
"""

tests/integration_test/test_send_queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ def valid(self, *args, **kwargs):
2828
def transfer_status(self, *args, **kwargs):
2929
return self.complete_transfer_status
3030

31+
def fail_transfer(self, settings):
32+
return True
33+
3134

3235
def test_create_simple_queue_item_and_send(
3336
test_server, test_orm, mocked_admin_client, server

0 commit comments

Comments
 (0)