|
29 | 29 | ReRunRequest,
|
30 | 30 | ScheduleQueriesSearchRequest,
|
31 | 31 | Workflow,
|
32 |
| - WorkflowMetadata, |
33 | 32 | WorkflowResponse,
|
34 | 33 | WorkflowRunResponse,
|
35 | 34 | WorkflowSchedule,
|
@@ -417,37 +416,39 @@ def monitor(
|
417 | 416 | Monitor the status of the workflow's run.
|
418 | 417 |
|
419 | 418 | :param workflow_response: The workflow_response returned from running the workflow
|
420 |
| - :param workflow_name: name of the workflow to be monitored |
421 | 419 | :param logger: the logger to log status information
|
422 |
| - (logging.INFO for summary info. logging:DEBUG for detail info) |
| 420 | + (logging.INFO for summary info. logging.DEBUG for detail info) |
| 421 | + :param workflow_name: name of the workflow to be monitored |
423 | 422 | :returns: the status at completion or None if the workflow wasn't run
|
424 | 423 | :raises ValidationError: If the provided `workflow_response`, `logger` is invalid
|
425 | 424 | :raises AtlanError: on any API communication issue
|
426 | 425 | """
|
427 |
| - if workflow_name and not workflow_response: |
428 |
| - workflow_response = WorkflowResponse( |
429 |
| - metadata=WorkflowMetadata(name=workflow_name) |
430 |
| - ) |
| 426 | + name = workflow_name or ( |
| 427 | + workflow_response.metadata.name |
| 428 | + if workflow_response and workflow_response.metadata |
| 429 | + else None |
| 430 | + ) |
431 | 431 |
|
432 |
| - if workflow_response.metadata and workflow_response.metadata.name: # type: ignore |
433 |
| - name = workflow_response.metadata.name # type: ignore |
434 |
| - status: Optional[AtlanWorkflowPhase] = None |
435 |
| - while status not in { |
436 |
| - AtlanWorkflowPhase.SUCCESS, |
437 |
| - AtlanWorkflowPhase.ERROR, |
438 |
| - AtlanWorkflowPhase.FAILED, |
439 |
| - }: |
440 |
| - sleep(MONITOR_SLEEP_SECONDS) |
441 |
| - if run_details := self._get_run_details(name): |
442 |
| - status = run_details.status |
| 432 | + if not name: |
| 433 | + if logger: |
| 434 | + logger.info("Skipping workflow monitoring — nothing to monitor.") |
| 435 | + return None |
| 436 | + |
| 437 | + status: Optional[AtlanWorkflowPhase] = None |
| 438 | + while status not in { |
| 439 | + AtlanWorkflowPhase.SUCCESS, |
| 440 | + AtlanWorkflowPhase.ERROR, |
| 441 | + AtlanWorkflowPhase.FAILED, |
| 442 | + }: |
| 443 | + sleep(MONITOR_SLEEP_SECONDS) |
| 444 | + if run_details := self._get_run_details(name): |
| 445 | + status = run_details.status |
443 | 446 | if logger:
|
444 | 447 | logger.debug("Workflow status: %s", status)
|
445 |
| - if logger: |
446 |
| - logger.info("Workflow completion status: %s", status) |
447 |
| - return status |
| 448 | + |
448 | 449 | if logger:
|
449 |
| - logger.info("Skipping workflow monitoring — nothing to monitor.") |
450 |
| - return None |
| 450 | + logger.info("Workflow completion status: %s", status) |
| 451 | + return status |
451 | 452 |
|
452 | 453 | def _get_run_details(self, name: str) -> Optional[WorkflowSearchResult]:
|
453 | 454 | return self._find_latest_run(workflow_name=name)
|
|
0 commit comments