Skip to content

Commit 4a0e23f

Browse files
committed
Add logging for watchdog
Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
1 parent d9b8f8b commit 4a0e23f

File tree

1 file changed

+28
-1
lines changed

1 file changed

+28
-1
lines changed

docling_jobkit/orchestrators/rq/orchestrator.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ async def _watchdog_task(self) -> None:
276276
may recreate Task objects on every poll, resetting started_at to the current
277277
time and making a task.started_at-based check unreliable.
278278
"""
279-
_log.debug("Watchdog starting")
279+
_log.warning("Watchdog starting")
280280
# Maps task_id -> time the watchdog first observed the task in STARTED state.
281281
# Independent of task.started_at which may be reset by polling machinery.
282282
first_seen_started: dict[str, datetime.datetime] = {}
@@ -285,6 +285,14 @@ async def _watchdog_task(self) -> None:
285285
try:
286286
now = datetime.datetime.now(datetime.timezone.utc)
287287

288+
all_statuses = {
289+
tid: t.task_status for tid, t in list(self.tasks.items())
290+
}
291+
_log.warning(
292+
f"Watchdog scan: {len(self.tasks)} tasks in memory, "
293+
f"statuses={all_statuses}"
294+
)
295+
288296
# Determine which tasks are currently in STARTED state.
289297
currently_started = {
290298
task_id
@@ -295,11 +303,17 @@ async def _watchdog_task(self) -> None:
295303
# Remove tasks that are no longer STARTED (completed, failed, gone).
296304
for task_id in list(first_seen_started.keys()):
297305
if task_id not in currently_started:
306+
_log.warning(
307+
f"Watchdog: task {task_id} left STARTED, removing from tracking"
308+
)
298309
del first_seen_started[task_id]
299310

300311
# Record first observation time for newly STARTED tasks.
301312
for task_id in currently_started:
302313
if task_id not in first_seen_started:
314+
_log.warning(
315+
f"Watchdog: first observation of STARTED task {task_id}"
316+
)
303317
first_seen_started[task_id] = now
304318

305319
# Check tasks that have been STARTED long enough to be past grace period.
@@ -309,9 +323,22 @@ async def _watchdog_task(self) -> None:
309323
if (now - first_seen).total_seconds() > _WATCHDOG_GRACE_PERIOD
310324
]
311325

326+
_log.warning(
327+
f"Watchdog: {len(currently_started)} started, "
328+
f"{len(first_seen_started)} tracked, "
329+
f"{len(candidates)} past grace period"
330+
)
331+
312332
for task_id in candidates:
313333
key = f"{_HEARTBEAT_KEY_PREFIX}:{task_id}"
314334
alive = await self._async_redis_conn.exists(key)
335+
age = (
336+
now - first_seen_started[task_id]
337+
).total_seconds()
338+
_log.warning(
339+
f"Watchdog: checking task {task_id} "
340+
f"(age={age:.0f}s), heartbeat key alive={bool(alive)}"
341+
)
315342
if not alive:
316343
_log.warning(
317344
f"Task {task_id} heartbeat key missing — "

0 commit comments

Comments
 (0)