Skip to content

Commit 96710e8

Browse files
committed
Add heartbeat_key_prefix in RQOrchestrator
Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
1 parent 664a8db commit 96710e8

File tree

2 files changed

+5
-6
lines changed

2 files changed

+5
-6
lines changed

docling_jobkit/orchestrators/rq/orchestrator.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class RQOrchestratorConfig(BaseModel):
3636
results_ttl: int = 3_600 * 4
3737
failure_ttl: int = 3_600 * 4
3838
results_prefix: str = "docling:results"
39+
heartbeat_key_prefix: str = "docling:job:alive"
3940
sub_channel: str = "docling:updates"
4041
scratch_dir: Optional[Path] = None
4142
redis_max_connections: int = 50
@@ -50,7 +51,6 @@ class _TaskUpdate(BaseModel):
5051
error_message: Optional[str] = None
5152

5253

53-
_HEARTBEAT_KEY_PREFIX = "docling:job:alive"
5454
_HEARTBEAT_TTL = 60 # seconds before an unrefreshed key expires
5555
_HEARTBEAT_INTERVAL = 20 # seconds between heartbeat writes
5656
_WATCHDOG_INTERVAL = 30.0 # seconds between watchdog scans
@@ -282,8 +282,8 @@ async def _watchdog_task(self) -> None:
282282
283283
Runs every _WATCHDOG_INTERVAL seconds. For each task in STARTED state
284284
that is older than _WATCHDOG_GRACE_PERIOD, checks whether the worker's
285-
liveness key (docling:job:alive:{task_id}) still exists in Redis. If the
286-
key is absent the worker process has died — publishes a FAILURE update to
285+
liveness key ({heartbeat_key_prefix}:{task_id}) still exists in Redis. If
286+
the key is absent the worker process has died — publishes a FAILURE update to
287287
the pub/sub channel so that polling clients and WebSocket subscribers are
288288
notified within ~90 seconds of the kill instead of waiting 4 hours.
289289
@@ -352,7 +352,7 @@ async def _watchdog_task(self) -> None:
352352
)
353353

354354
for task_id in candidates:
355-
key = f"{_HEARTBEAT_KEY_PREFIX}:{task_id}"
355+
key = f"{self.config.heartbeat_key_prefix}:{task_id}"
356356
alive = await self._async_redis_conn.exists(key)
357357
age = (now - first_seen_started[task_id]).total_seconds()
358358
_log.warning(

docling_jobkit/orchestrators/rq/worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from docling_jobkit.datamodel.task_meta import TaskStatus, TaskType
2424
from docling_jobkit.orchestrators.rq.orchestrator import (
2525
_HEARTBEAT_INTERVAL,
26-
_HEARTBEAT_KEY_PREFIX,
2726
_HEARTBEAT_TTL,
2827
RQOrchestrator,
2928
RQOrchestratorConfig,
@@ -92,7 +91,7 @@ def _heartbeat_loop(self, job_id: str, stop_event: threading.Event) -> None:
9291
worker process is killed the thread dies with it and the key expires
9392
naturally, allowing the orchestrator watchdog to detect the dead job.
9493
"""
95-
key = f"{_HEARTBEAT_KEY_PREFIX}:{job_id}"
94+
key = f"{self.orchestrator_config.heartbeat_key_prefix}:{job_id}"
9695
conn = None
9796
try:
9897
conn = sync_redis.Redis.from_url(self.orchestrator_config.redis_url)

0 commit comments

Comments
 (0)