Skip to content

Commit 0276b4d

Browse files
Fix ECS observer incorrectly crashing suspended flow runs (#20682)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: alex.s@prefect.io <ajstreed1@gmail.com>
1 parent 3d5fdd2 commit 0276b4d

File tree

2 files changed

+156
-5
lines changed

2 files changed

+156
-5
lines changed

src/integrations/prefect-aws/prefect_aws/observers/ecs.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -534,24 +534,43 @@ async def mark_runs_as_crashed(event: dict[str, Any], tags: dict[str, str]):
534534

535535
assert flow_run.state is not None, "Expected flow run state to be set"
536536

537-
# Exit early for final or scheduled states
538-
if flow_run.state.is_final() or flow_run.state.is_scheduled():
537+
# Exit early for final, scheduled, or paused states
538+
if (
539+
flow_run.state.is_final()
540+
or flow_run.state.is_scheduled()
541+
or flow_run.state.is_paused()
542+
):
539543
logger.debug(
540-
f"Flow run {flow_run_id} is in final or scheduled state, skipping"
544+
f"Flow run {flow_run_id} is in final, scheduled, or paused state, skipping"
541545
)
542546
return
543547

544548
containers = event.get("detail", {}).get("containers", [])
545549

550+
orchestration_container = next(
551+
(
552+
container
553+
for container in containers
554+
if container.get("name") == _ECS_DEFAULT_CONTAINER_NAME
555+
),
556+
None,
557+
)
558+
559+
if orchestration_container is not None:
560+
containers_to_check = [orchestration_container]
561+
else:
562+
containers_to_check = containers
563+
546564
containers_with_non_zero_exit_codes = [
547565
container
548-
for container in containers
566+
for container in containers_to_check
549567
if container.get("exitCode") is None or container.get("exitCode") != 0
550568
]
551569

552570
if any(containers_with_non_zero_exit_codes):
553571
container_identifiers = [
554-
c.get("name") or c.get("containerArn") for c in containers
572+
c.get("name") or c.get("containerArn")
573+
for c in containers_with_non_zero_exit_codes
555574
]
556575
handler_logger.info(
557576
"The following containers stopped with a non-zero exit code: %s. Marking flow run %s as crashed",

src/integrations/prefect-aws/tests/observers/test_ecs_observer.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,6 +1125,138 @@ async def test_mark_runs_as_crashed_skips_scheduled_states(
11251125
# Should not propose state for scheduled states
11261126
mock_propose_state.assert_not_called()
11271127

1128+
@patch("prefect_aws.observers.ecs.prefect.get_client")
1129+
@patch("prefect_aws.observers.ecs.propose_state")
1130+
async def test_mark_runs_as_crashed_skips_paused_states(
1131+
self, mock_propose_state, mock_get_client, sample_event, sample_tags
1132+
):
1133+
flow_run_id = uuid.UUID(sample_tags["prefect.io/flow-run-id"])
1134+
mock_client = AsyncMock()
1135+
mock_context = AsyncMock()
1136+
mock_context.__aenter__.return_value = mock_client
1137+
mock_get_client.return_value = mock_context
1138+
1139+
flow_run = FlowRun(
1140+
id=flow_run_id,
1141+
name="test-flow-run",
1142+
flow_id=uuid.uuid4(),
1143+
state=State(type="PAUSED", name="Suspended"),
1144+
)
1145+
mock_client.read_flow_run.return_value = flow_run
1146+
1147+
await mark_runs_as_crashed(sample_event, sample_tags)
1148+
1149+
mock_client.read_flow_run.assert_called_once_with(flow_run_id=flow_run_id)
1150+
mock_propose_state.assert_not_called()
1151+
1152+
@patch("prefect_aws.observers.ecs.prefect.get_client")
1153+
@patch("prefect_aws.observers.ecs.propose_state")
1154+
async def test_mark_runs_as_crashed_ignores_sidecar_exit_codes(
1155+
self, mock_propose_state, mock_get_client, sample_tags
1156+
):
1157+
event = {
1158+
"detail": {
1159+
"taskArn": "arn:aws:ecs:us-east-1:123456789:task/cluster/task-id",
1160+
"containers": [
1161+
{"name": "prefect", "exitCode": 0},
1162+
{"name": "vmagent", "exitCode": 0},
1163+
{"name": "vector", "exitCode": 0},
1164+
{"name": "ecs-exporter", "exitCode": 2},
1165+
],
1166+
}
1167+
}
1168+
1169+
flow_run_id = uuid.UUID(sample_tags["prefect.io/flow-run-id"])
1170+
mock_client = AsyncMock()
1171+
mock_context = AsyncMock()
1172+
mock_context.__aenter__.return_value = mock_client
1173+
mock_get_client.return_value = mock_context
1174+
1175+
flow_run = FlowRun(
1176+
id=flow_run_id,
1177+
name="test-flow-run",
1178+
flow_id=uuid.uuid4(),
1179+
state=State(type="RUNNING", name="Running"),
1180+
)
1181+
mock_client.read_flow_run.return_value = flow_run
1182+
1183+
await mark_runs_as_crashed(event, sample_tags)
1184+
1185+
mock_propose_state.assert_not_called()
1186+
1187+
@patch("prefect_aws.observers.ecs.prefect.get_client")
1188+
@patch("prefect_aws.observers.ecs.propose_state")
1189+
async def test_mark_runs_as_crashed_checks_orchestration_container(
1190+
self, mock_propose_state, mock_get_client, sample_tags
1191+
):
1192+
event = {
1193+
"detail": {
1194+
"taskArn": "arn:aws:ecs:us-east-1:123456789:task/cluster/task-id",
1195+
"containers": [
1196+
{"name": "prefect", "exitCode": 1},
1197+
{"name": "sidecar", "exitCode": 0},
1198+
],
1199+
}
1200+
}
1201+
1202+
flow_run_id = uuid.UUID(sample_tags["prefect.io/flow-run-id"])
1203+
mock_client = AsyncMock()
1204+
mock_context = AsyncMock()
1205+
mock_context.__aenter__.return_value = mock_client
1206+
mock_get_client.return_value = mock_context
1207+
1208+
flow_run = FlowRun(
1209+
id=flow_run_id,
1210+
name="test-flow-run",
1211+
flow_id=uuid.uuid4(),
1212+
state=State(type="RUNNING", name="Running"),
1213+
)
1214+
mock_client.read_flow_run.return_value = flow_run
1215+
1216+
await mark_runs_as_crashed(event, sample_tags)
1217+
1218+
mock_propose_state.assert_called_once()
1219+
call_args = mock_propose_state.call_args[1]
1220+
proposed_state = call_args["state"]
1221+
assert proposed_state.type == StateType.CRASHED
1222+
assert "prefect" in proposed_state.message
1223+
1224+
@patch("prefect_aws.observers.ecs.prefect.get_client")
1225+
@patch("prefect_aws.observers.ecs.propose_state")
1226+
async def test_mark_runs_as_crashed_falls_back_to_all_containers(
1227+
self, mock_propose_state, mock_get_client, sample_tags
1228+
):
1229+
event = {
1230+
"detail": {
1231+
"taskArn": "arn:aws:ecs:us-east-1:123456789:task/cluster/task-id",
1232+
"containers": [
1233+
{"name": "custom-container", "exitCode": 1},
1234+
{"name": "sidecar", "exitCode": 0},
1235+
],
1236+
}
1237+
}
1238+
1239+
flow_run_id = uuid.UUID(sample_tags["prefect.io/flow-run-id"])
1240+
mock_client = AsyncMock()
1241+
mock_context = AsyncMock()
1242+
mock_context.__aenter__.return_value = mock_client
1243+
mock_get_client.return_value = mock_context
1244+
1245+
flow_run = FlowRun(
1246+
id=flow_run_id,
1247+
name="test-flow-run",
1248+
flow_id=uuid.uuid4(),
1249+
state=State(type="RUNNING", name="Running"),
1250+
)
1251+
mock_client.read_flow_run.return_value = flow_run
1252+
1253+
await mark_runs_as_crashed(event, sample_tags)
1254+
1255+
mock_propose_state.assert_called_once()
1256+
call_args = mock_propose_state.call_args[1]
1257+
proposed_state = call_args["state"]
1258+
assert proposed_state.type == StateType.CRASHED
1259+
11281260

11291261
class TestDeregisterTaskDefinition:
11301262
@pytest.fixture

0 commit comments

Comments
 (0)