@@ -30,6 +30,18 @@ def wait_for_pipeline_execution(pipeline_name, initial_wait_seconds=10, poll_int
3030 logger .info (f"Waiting { initial_wait_seconds } seconds for pipeline '{ pipeline_name } ' to start..." )
3131 time .sleep (initial_wait_seconds )
3232
33+ # Get the latest execution ID to track the correct execution
34+ try :
35+ response = client .list_pipeline_executions (pipelineName = pipeline_name , maxResults = 1 )
36+ executions = response .get ('pipelineExecutionSummaries' , [])
37+ if not executions :
38+ raise Exception (f"No executions found for pipeline '{ pipeline_name } '" )
39+
40+ target_execution_id = executions [0 ]['pipelineExecutionId' ]
41+ logger .info (f"Monitoring execution: { target_execution_id } " )
42+ except Exception as e :
43+ raise Exception (f"Failed to get pipeline execution ID: { str (e )} " )
44+
3345 start_time = time .time ()
3446 max_wait_seconds = max_wait_minutes * 60
3547
@@ -39,40 +51,41 @@ def wait_for_pipeline_execution(pipeline_name, initial_wait_seconds=10, poll_int
3951 if elapsed_time > max_wait_seconds :
4052 raise Exception (f"Pipeline '{ pipeline_name } ' execution timed out after { max_wait_minutes } minutes" )
4153
42- # Get the latest execution
54+ # Get the specific execution status
4355 try :
44- response = client .get_pipeline_state (name = pipeline_name )
45- stage_states = response .get ('stageStates' , [])
56+ response = client .get_pipeline_execution (
57+ pipelineName = pipeline_name ,
58+ pipelineExecutionId = target_execution_id
59+ )
4660
47- # Check if pipeline is still running
48- pipeline_running = False
49- failed_stages = []
61+ execution = response .get ('pipelineExecution' , {})
62+ status = execution .get ('status' , 'Unknown' )
5063
51- for stage in stage_states :
52- stage_name = stage .get ('stageName' , 'Unknown' )
53- latest_execution = stage .get ('latestExecution' , {})
54- status = latest_execution .get ('status' , 'Unknown' )
55-
56- if status == 'InProgress' :
57- pipeline_running = True
58- logger .info (f"Pipeline '{ pipeline_name } ' is running. Current stage: { stage_name } " )
59- break
60- elif status == 'Failed' :
61- failed_stages .append (f"{ stage_name } ({ latest_execution .get ('message' , 'No message' )} )" )
62-
63- # If any stages failed, raise an exception
64- if failed_stages :
65- message = f"Pipeline '{ pipeline_name } ' execution failed in stages: { ', ' .join (failed_stages )} "
66- logger .error (message )
67- raise Exception (message )
64+ logger .info (f"Pipeline '{ pipeline_name } ' execution { target_execution_id } status: { status } " )
6865
69- # If pipeline is not running and no stages failed, it must have succeeded
70- if not pipeline_running :
66+ if status == 'Succeeded' :
7167 logger .success (f"Pipeline '{ pipeline_name } ' execution completed successfully!" )
7268 return
69+ elif status == 'Failed' :
70+ failure_reason = execution .get ('statusReason' , 'No reason provided' )
71+ message = f"Pipeline '{ pipeline_name } ' execution failed: { failure_reason } "
72+ logger .error (message )
73+ raise Exception (message )
74+ elif status in ['InProgress' , 'Stopping' ]:
75+ # Pipeline is still running, continue monitoring
76+ pass
77+ elif status == 'Stopped' :
78+ message = f"Pipeline '{ pipeline_name } ' execution was stopped"
79+ logger .error (message )
80+ raise Exception (message )
81+ elif status == 'Superseded' :
82+ message = f"Pipeline '{ pipeline_name } ' execution was superseded by a newer execution"
83+ logger .error (message )
84+ raise Exception (message )
85+ else :
86+ logger .warning (f"Unknown pipeline status: { status } " )
7387
7488 # Wait before checking again
75- logger .info (f"Waiting { poll_interval_seconds } seconds before checking pipeline status again..." )
7689 time .sleep (poll_interval_seconds )
7790
7891 except Exception as e :
0 commit comments