Skip to content

Commit 7634192

Browse files
author
Taniya Mathur
committed
fix: implement robust pipeline tracking using version ID to eliminate race conditions
1 parent 7c495d4 commit 7634192

File tree

1 file changed

+74
-35
lines changed

1 file changed

+74
-35
lines changed

scripts/integration_test_deployment.py

Lines changed: 74 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -92,53 +92,92 @@ def upload_to_s3(bucket_name):
9292
sys.exit(1)
9393

9494

95-
def monitor_pipeline(pipeline_name, max_wait=7200):
96-
"""Monitor CodePipeline execution until completion"""
97-
print(f"Monitoring pipeline: {pipeline_name}")
98-
95+
def find_pipeline_execution_by_version(pipeline_name, version_id, max_wait=300):
96+
"""Find pipeline execution that corresponds to specific S3 version ID"""
97+
print(f"Finding pipeline execution for version: {version_id}")
98+
99+
codepipeline = boto3.client("codepipeline")
100+
start_time = time.time()
101+
102+
while time.time() - start_time < max_wait:
103+
try:
104+
response = codepipeline.list_pipeline_executions(
105+
pipelineName=pipeline_name, maxResults=10
106+
)
107+
108+
for execution in response["pipelineExecutionSummaries"]:
109+
execution_id = execution["pipelineExecutionId"]
110+
111+
# Get execution details to check source version
112+
details = codepipeline.get_pipeline_execution(
113+
pipelineName=pipeline_name,
114+
pipelineExecutionId=execution_id
115+
)
116+
117+
# Check if this execution matches our version ID
118+
for artifact in details["pipelineExecution"].get("artifactRevisions", []):
119+
if artifact.get("revisionId") == version_id:
120+
print(f"✅ Found matching execution: {execution_id}")
121+
return execution_id
122+
123+
except Exception as e:
124+
print(f"Error finding execution: {e}")
125+
126+
time.sleep(10)
127+
128+
print(f"❌ Could not find pipeline execution for version {version_id}")
129+
return None
130+
131+
132+
def monitor_pipeline_execution(pipeline_name, execution_id, max_wait=7200):
133+
"""Monitor specific pipeline execution until completion"""
134+
print(f"Monitoring execution: {execution_id}")
135+
99136
codepipeline = boto3.client("codepipeline")
100137
wait_time = 0
101138
poll_interval = 30
102-
103-
# Initial wait for pipeline to start
104-
print("Waiting for pipeline to start...")
105-
time.sleep(30)
106-
139+
107140
while wait_time < max_wait:
108141
try:
109-
# Get latest pipeline execution
110-
response = codepipeline.list_pipeline_executions(
111-
pipelineName=pipeline_name, maxResults=1
142+
response = codepipeline.get_pipeline_execution(
143+
pipelineName=pipeline_name,
144+
pipelineExecutionId=execution_id
112145
)
113-
114-
if not response["pipelineExecutionSummaries"]:
115-
print("⏳ No pipeline executions found, waiting...")
116-
else:
117-
execution = response["pipelineExecutionSummaries"][0]
118-
execution_id = execution["pipelineExecutionId"]
119-
status = execution["status"]
120-
121-
print(f"Pipeline execution {execution_id}: {status}")
122-
123-
if status == "Succeeded":
124-
print("✅ Pipeline completed successfully!")
125-
return True
126-
elif status in ["Failed", "Cancelled", "Superseded"]:
127-
print(f"❌ Pipeline failed with status: {status}")
128-
return False
129-
elif status == "InProgress":
130-
print(f"⏳ Pipeline still running... ({wait_time}s elapsed)")
131-
146+
147+
status = response["pipelineExecution"]["status"]
148+
print(f"Pipeline execution {execution_id}: {status}")
149+
150+
if status == "Succeeded":
151+
print("✅ Pipeline completed successfully!")
152+
return True
153+
elif status in ["Failed", "Cancelled", "Superseded"]:
154+
print(f"❌ Pipeline failed with status: {status}")
155+
return False
156+
elif status == "InProgress":
157+
print(f"⏳ Pipeline still running... ({wait_time}s elapsed)")
158+
132159
except Exception as e:
133160
print(f"Error checking pipeline status: {e}")
134-
161+
135162
time.sleep(poll_interval)
136163
wait_time += poll_interval
137-
164+
138165
print(f"❌ Pipeline monitoring timed out after {max_wait} seconds")
139166
return False
140167

141168

169+
def monitor_pipeline(pipeline_name, version_id, max_wait=7200):
170+
"""Monitor pipeline using version-based tracking"""
171+
# First find the execution that matches our version
172+
execution_id = find_pipeline_execution_by_version(pipeline_name, version_id)
173+
174+
if not execution_id:
175+
return False
176+
177+
# Then monitor that specific execution
178+
return monitor_pipeline_execution(pipeline_name, execution_id, max_wait)
179+
180+
142181
def main():
143182
"""Main execution function"""
144183
print("Starting integration test deployment...")
@@ -156,9 +195,9 @@ def main():
156195

157196
# Execute deployment steps
158197
create_deployment_package()
159-
upload_to_s3(bucket_name)
198+
version_id = upload_to_s3(bucket_name)
160199

161-
success = monitor_pipeline(pipeline_name)
200+
success = monitor_pipeline(pipeline_name, version_id)
162201

163202
if success:
164203
print("🎉 Integration test deployment completed successfully!")

0 commit comments

Comments
 (0)