Skip to content

Commit 79496a0

Browse files
authored
Merge pull request #283 from tbrandenburg/codex/improve-crontab-task-execution-reliability
2 parents 280fdbe + 379ee35 commit 79496a0

File tree

6 files changed

+145
-7
lines changed

6 files changed

+145
-7
lines changed

packages/pybackend/app.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from harness_service import is_process_running, list_harnesses, run_harness
5353
from workflow_service import list_workspace_workflows, read_workflows, write_workflows
5454
from cron_service import (
55+
get_cron_job_diagnostics,
5556
get_cron_job_last_runs,
5657
refresh_cron_clock,
5758
start_cron_clock,
@@ -601,7 +602,10 @@ def save_global_workflows(payload: dict = Body(...)):
601602
def workspace_workflows():
602603
try:
603604
logger.info("Listing workspace workflows")
604-
return list_workspace_workflows(get_cron_job_last_runs())
605+
return list_workspace_workflows(
606+
get_cron_job_last_runs(),
607+
get_cron_job_diagnostics(),
608+
)
605609
except Exception as exc:
606610
logger.exception("Failed to list workspace workflows")
607611
raise HTTPException(

packages/pybackend/cron_service.py

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
_state_lock = Lock()
1919
_started_jobs = 0
2020
_successful_jobs = 0
21+
_failed_jobs = 0
2122
_configured_jobs = 0
2223
_invalid_jobs = 0
2324
_started_at: datetime | None = None
2425
_last_run_by_job: dict[str, datetime] = {}
26+
_last_finished_by_job: dict[str, datetime] = {}
27+
_last_duration_ms_by_job: dict[str, int] = {}
28+
_last_exit_code_by_job: dict[str, int] = {}
29+
_last_error_by_job: dict[str, str] = {}
2530
_running_process_by_job: dict[str, subprocess.Popen[str]] = {}
2631

2732

@@ -48,7 +53,9 @@ def _resolve_script_path(repo_path: Path, shell_script_path: str) -> Path:
4853

4954

5055
def _run_workflow_script(repo_path: Path, workflow_id: str, script_path: Path) -> None:
51-
global _started_jobs, _successful_jobs
56+
global _started_jobs, _successful_jobs, _failed_jobs
57+
58+
started_at = datetime.now(timezone.utc)
5259

5360
with _state_lock:
5461
_terminate_running_job(workflow_id)
@@ -66,28 +73,42 @@ def _run_workflow_script(repo_path: Path, workflow_id: str, script_path: Path) -
6673
logger.info("Running cron workflow '%s' in '%s'", workflow_id, repo_path)
6774
stdout, stderr = process.communicate()
6875
returncode = process.returncode
76+
finished_at = datetime.now(timezone.utc)
77+
duration_ms = int((finished_at - started_at).total_seconds() * 1000)
6978

7079
with _state_lock:
7180
if _running_process_by_job.get(workflow_id) is process:
7281
_running_process_by_job.pop(workflow_id, None)
82+
_last_finished_by_job[workflow_id] = finished_at
83+
_last_duration_ms_by_job[workflow_id] = duration_ms
84+
_last_exit_code_by_job[workflow_id] = returncode
7385

7486
if returncode == 0:
7587
with _state_lock:
7688
_successful_jobs += 1
89+
_last_error_by_job.pop(workflow_id, None)
7790
logger.info("Cron workflow '%s' completed", workflow_id)
7891
if stdout.strip():
7992
logger.info("Cron workflow '%s' stdout: %s", workflow_id, stdout.strip())
8093
return
8194

95+
with _state_lock:
96+
_failed_jobs += 1
97+
8298
logger.warning("Cron workflow '%s' failed with exit code %s", workflow_id, returncode)
8399
if stdout.strip():
84100
logger.warning("Cron workflow '%s' stdout: %s", workflow_id, stdout.strip())
85101
if stderr.strip():
102+
with _state_lock:
103+
_last_error_by_job[workflow_id] = stderr.strip()
86104
logger.warning("Cron workflow '%s' stderr: %s", workflow_id, stderr.strip())
105+
else:
106+
with _state_lock:
107+
_last_error_by_job[workflow_id] = f"Exit code {returncode} without stderr"
87108

88109

89110
def start_cron_clock() -> None:
90-
global _scheduler, _started_jobs, _successful_jobs, _configured_jobs, _invalid_jobs, _started_at
111+
global _scheduler, _started_jobs, _successful_jobs, _failed_jobs, _configured_jobs, _invalid_jobs, _started_at
91112

92113
if _scheduler is not None:
93114
return
@@ -110,12 +131,20 @@ def start_cron_clock() -> None:
110131
shell_script_path = workflow.get("shellScriptPath")
111132
workflow_id = workflow.get("id") or "workflow"
112133
if not isinstance(schedule, str) or not schedule.strip():
134+
logger.warning("Skipping workflow '%s' in '%s': missing schedule", workflow_id, repo_name)
113135
continue
114136
if not isinstance(shell_script_path, str) or not shell_script_path.strip():
137+
logger.warning("Skipping workflow '%s' in '%s': missing shellScriptPath", workflow_id, repo_name)
115138
continue
116139

117140
script_path = _resolve_script_path(repo_path, shell_script_path)
118141
if not script_path.exists() or not script_path.is_file():
142+
logger.warning(
143+
"Skipping workflow '%s' in '%s': script not found at '%s'",
144+
workflow_id,
145+
repo_name,
146+
script_path,
147+
)
119148
continue
120149

121150
job_id = f"{repo_name}:{workflow_id}"
@@ -125,8 +154,9 @@ def start_cron_clock() -> None:
125154
CronTrigger.from_crontab(schedule),
126155
id=job_id,
127156
replace_existing=True,
128-
max_instances=2,
157+
max_instances=1,
129158
coalesce=True,
159+
misfire_grace_time=300,
130160
args=[repo_path, job_id, script_path],
131161
)
132162
configured_jobs += 1
@@ -145,10 +175,15 @@ def start_cron_clock() -> None:
145175
with _state_lock:
146176
_started_jobs = 0
147177
_successful_jobs = 0
178+
_failed_jobs = 0
148179
_configured_jobs = configured_jobs
149180
_invalid_jobs = invalid_jobs
150181
_started_at = datetime.now(timezone.utc)
151182
_last_run_by_job.clear()
183+
_last_finished_by_job.clear()
184+
_last_duration_ms_by_job.clear()
185+
_last_exit_code_by_job.clear()
186+
_last_error_by_job.clear()
152187
_running_process_by_job.clear()
153188

154189
logger.info(
@@ -186,6 +221,7 @@ def get_cron_clock_status() -> dict[str, object]:
186221
with _state_lock:
187222
started_jobs = _started_jobs
188223
successful_jobs = _successful_jobs
224+
failed_jobs = _failed_jobs
189225
configured_jobs = _configured_jobs
190226
invalid_jobs = _invalid_jobs
191227
started_at = _started_at
@@ -213,6 +249,7 @@ def get_cron_clock_status() -> dict[str, object]:
213249
"invalidSchedules": invalid_jobs,
214250
"startedJobsSinceStartup": started_jobs,
215251
"successfulJobsSinceStartup": successful_jobs,
252+
"failedJobsSinceStartup": failed_jobs,
216253
}
217254

218255

@@ -231,3 +268,44 @@ def get_cron_job_last_runs() -> dict[str, str | None]:
231268
job_last_runs[job.id] = last_runs.get(job.id)
232269

233270
return job_last_runs
271+
272+
273+
def get_cron_job_diagnostics() -> dict[str, dict[str, object | None]]:
274+
if _scheduler is None:
275+
return {}
276+
277+
with _state_lock:
278+
last_runs = {
279+
workflow_id: timestamp.isoformat()
280+
for workflow_id, timestamp in _last_run_by_job.items()
281+
}
282+
finished_runs = {
283+
workflow_id: timestamp.isoformat()
284+
for workflow_id, timestamp in _last_finished_by_job.items()
285+
}
286+
durations = dict(_last_duration_ms_by_job)
287+
exit_codes = dict(_last_exit_code_by_job)
288+
errors = dict(_last_error_by_job)
289+
running = {
290+
workflow_id
291+
for workflow_id, process in _running_process_by_job.items()
292+
if process.poll() is None
293+
}
294+
295+
diagnostics: dict[str, dict[str, object | None]] = {}
296+
for job in _scheduler.get_jobs():
297+
next_run_time = None
298+
if job.next_run_time is not None:
299+
next_run_time = job.next_run_time.isoformat()
300+
301+
diagnostics[job.id] = {
302+
"lastStartedAt": last_runs.get(job.id),
303+
"lastFinishedAt": finished_runs.get(job.id),
304+
"lastDurationMs": durations.get(job.id),
305+
"lastExitCode": exit_codes.get(job.id),
306+
"lastError": errors.get(job.id),
307+
"nextRunAt": next_run_time,
308+
"running": job.id in running,
309+
}
310+
311+
return diagnostics

packages/pybackend/tests/unit/test_api.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,18 +1095,24 @@ def test_save_global_workflows_success(self, mock_write, mock_refresh):
10951095
mock_write.assert_called_once_with({"workflows": []}, None)
10961096
mock_refresh.assert_called_once_with()
10971097

1098+
@patch("app.get_cron_job_diagnostics")
10981099
@patch("app.get_cron_job_last_runs")
10991100
@patch("app.list_workspace_workflows")
1100-
def test_workspace_workflows_success(self, mock_list, mock_last_runs):
1101+
def test_workspace_workflows_success(self, mock_list, mock_last_runs, mock_diagnostics):
11011102
mock_last_runs.return_value = {"sample:wf_1": "2026-01-02T03:04:05+00:00"}
1103+
mock_diagnostics.return_value = {"sample:wf_1": {"lastExitCode": 0, "running": False}}
11021104
mock_list.return_value = {"workflows": [{"repository": "sample", "id": "wf_1", "name": "Release", "enabled": True, "schedule": None}]}
11031105

11041106
response = client.get("/api/workspace/workflows")
11051107

11061108
assert response.status_code == 200
11071109
assert response.json()["workflows"][0]["repository"] == "sample"
11081110
mock_last_runs.assert_called_once_with()
1109-
mock_list.assert_called_once_with({"sample:wf_1": "2026-01-02T03:04:05+00:00"})
1111+
mock_diagnostics.assert_called_once_with()
1112+
mock_list.assert_called_once_with(
1113+
{"sample:wf_1": "2026-01-02T03:04:05+00:00"},
1114+
{"sample:wf_1": {"lastExitCode": 0, "running": False}},
1115+
)
11101116

11111117
@patch("app.read_workflows")
11121118
@patch("app._repository_path")

packages/pybackend/tests/unit/test_cron_service.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
def teardown_function():
77
cron_service.stop_cron_clock()
88
cron_service._running_process_by_job = {}
9+
cron_service._last_run_by_job = {}
10+
cron_service._last_finished_by_job = {}
11+
cron_service._last_duration_ms_by_job = {}
12+
cron_service._last_exit_code_by_job = {}
13+
cron_service._last_error_by_job = {}
914

1015

1116
@patch("cron_service.CronTrigger.from_crontab")
@@ -227,3 +232,42 @@ def test_get_cron_job_last_runs_includes_only_registered_jobs():
227232
assert result["repo-a:wf-1"] == "2026-01-02T03:04:05+00:00"
228233
assert result["repo-a:wf-2"] is None
229234
assert "repo-a:other" not in result
235+
236+
237+
def test_get_cron_job_diagnostics_includes_runtime_metadata():
238+
cron_service._scheduler = MagicMock()
239+
cron_service._scheduler.get_jobs.return_value = [
240+
MagicMock(id="repo-a:wf-1", next_run_time=cron_service.datetime(2026, 1, 2, 4, 5, 6, tzinfo=cron_service.timezone.utc)),
241+
MagicMock(id="repo-a:wf-2", next_run_time=None),
242+
]
243+
244+
active_process = MagicMock()
245+
active_process.poll.return_value = None
246+
cron_service._running_process_by_job = {"repo-a:wf-1": active_process}
247+
cron_service._last_run_by_job = {
248+
"repo-a:wf-1": cron_service.datetime(2026, 1, 2, 3, 4, 5, tzinfo=cron_service.timezone.utc),
249+
}
250+
cron_service._last_finished_by_job = {
251+
"repo-a:wf-1": cron_service.datetime(2026, 1, 2, 3, 4, 8, tzinfo=cron_service.timezone.utc),
252+
}
253+
cron_service._last_duration_ms_by_job = {"repo-a:wf-1": 3123}
254+
cron_service._last_exit_code_by_job = {"repo-a:wf-1": 0}
255+
cron_service._last_error_by_job = {"repo-a:wf-2": "boom"}
256+
257+
result = cron_service.get_cron_job_diagnostics()
258+
259+
assert result["repo-a:wf-1"]["lastStartedAt"] == "2026-01-02T03:04:05+00:00"
260+
assert result["repo-a:wf-1"]["lastFinishedAt"] == "2026-01-02T03:04:08+00:00"
261+
assert result["repo-a:wf-1"]["lastDurationMs"] == 3123
262+
assert result["repo-a:wf-1"]["lastExitCode"] == 0
263+
assert result["repo-a:wf-1"]["nextRunAt"] == "2026-01-02T04:05:06+00:00"
264+
assert result["repo-a:wf-1"]["running"] is True
265+
assert result["repo-a:wf-2"]["lastError"] == "boom"
266+
267+
268+
def test_get_cron_job_diagnostics_returns_empty_when_scheduler_not_running():
269+
cron_service._scheduler = None
270+
271+
result = cron_service.get_cron_job_diagnostics()
272+
273+
assert result == {}

packages/pybackend/tests/unit/test_workflow_service.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ def test_list_workspace_workflows_collects_repository_workflows(
8585
{"workflows": []},
8686
]
8787

88-
result = list_workspace_workflows({"repo-a:wf_a": "2026-01-02T03:04:05+00:00"})
88+
result = list_workspace_workflows(
89+
{"repo-a:wf_a": "2026-01-02T03:04:05+00:00"},
90+
{"repo-a:wf_a": {"lastExitCode": 0, "running": False}},
91+
)
8992

9093
assert result == {
9194
"workflows": [
@@ -97,6 +100,7 @@ def test_list_workspace_workflows_collects_repository_workflows(
97100
"schedule": "* * * * *",
98101
"shellScriptPath": None,
99102
"lastRun": "2026-01-02T03:04:05+00:00",
103+
"diagnostics": {"lastExitCode": 0, "running": False},
100104
}
101105
]
102106
}

packages/pybackend/workflow_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def write_workflows(
116116

117117
def list_workspace_workflows(
118118
last_runs_by_job: dict[str, str | None] | None = None,
119+
diagnostics_by_job: dict[str, dict[str, Any]] | None = None,
119120
) -> dict[str, list[dict[str, Any]]]:
120121
workspace_home = get_workspace_home()
121122
workflows: list[dict[str, Any]] = []
@@ -138,6 +139,7 @@ def list_workspace_workflows(
138139
"schedule": workflow.get("schedule"),
139140
"shellScriptPath": workflow.get("shellScriptPath"),
140141
"lastRun": (last_runs_by_job or {}).get(job_id),
142+
"diagnostics": (diagnostics_by_job or {}).get(job_id),
141143
}
142144
)
143145

0 commit comments

Comments
 (0)