@@ -407,37 +407,48 @@ def update_owner(self, workflow_name: str, username: str) -> WorkflowResponse:
407
407
408
408
@validate_arguments (config = dict (arbitrary_types_allowed = True ))
409
409
def monitor (
410
- self , workflow_response : WorkflowResponse , logger : Optional [Logger ] = None
410
+ self ,
411
+ workflow_response : Optional [WorkflowResponse ] = None ,
412
+ logger : Optional [Logger ] = None ,
413
+ workflow_name : Optional [str ] = None ,
411
414
) -> Optional [AtlanWorkflowPhase ]:
412
415
"""
413
416
Monitor the status of the workflow's run.
414
417
415
418
:param workflow_response: The workflow_response returned from running the workflow
416
419
:param logger: the logger to log status information
417
- (logging.INFO for summary info. logging:DEBUG for detail info)
420
+ (logging.INFO for summary info. logging.DEBUG for detail info)
421
+ :param workflow_name: name of the workflow to be monitored
418
422
:returns: the status at completion or None if the workflow wasn't run
419
423
:raises ValidationError: If the provided `workflow_response`, `logger` is invalid
420
424
:raises AtlanError: on any API communication issue
421
425
"""
422
- if workflow_response .metadata and workflow_response .metadata .name :
423
- name = workflow_response .metadata .name
424
- status : Optional [AtlanWorkflowPhase ] = None
425
- while status not in {
426
- AtlanWorkflowPhase .SUCCESS ,
427
- AtlanWorkflowPhase .ERROR ,
428
- AtlanWorkflowPhase .FAILED ,
429
- }:
430
- sleep (MONITOR_SLEEP_SECONDS )
431
- if run_details := self ._get_run_details (name ):
432
- status = run_details .status
426
+ name = workflow_name or (
427
+ workflow_response .metadata .name
428
+ if workflow_response and workflow_response .metadata
429
+ else None
430
+ )
431
+
432
+ if not name :
433
+ if logger :
434
+ logger .info ("Skipping workflow monitoring — nothing to monitor." )
435
+ return None
436
+
437
+ status : Optional [AtlanWorkflowPhase ] = None
438
+ while status not in {
439
+ AtlanWorkflowPhase .SUCCESS ,
440
+ AtlanWorkflowPhase .ERROR ,
441
+ AtlanWorkflowPhase .FAILED ,
442
+ }:
443
+ sleep (MONITOR_SLEEP_SECONDS )
444
+ if run_details := self ._get_run_details (name ):
445
+ status = run_details .status
433
446
if logger :
434
447
logger .debug ("Workflow status: %s" , status )
435
- if logger :
436
- logger .info ("Workflow completion status: %s" , status )
437
- return status
448
+
438
449
if logger :
439
- logger .info ("Skipping workflow monitoring — nothing to monitor." )
440
- return None
450
+ logger .info ("Workflow completion status: %s" , status )
451
+ return status
441
452
442
453
def _get_run_details (self , name : str ) -> Optional [WorkflowSearchResult ]:
443
454
return self ._find_latest_run (workflow_name = name )
0 commit comments