Skip to content

Commit 9f527fd

Browse files
authored
[ROB-2990] Scheduled prompts working (#1420)
## Summary Created scheduled prompts feature for automated prompt execution with heartbeat monitoring. This feature is enabled by default but will only run if connected to SAS. ## Changes - **Implemented scheduled prompts executor** in `holmes/core/scheduled_prompts/` package - `executor.py` - Background thread polls for pending prompts and executes them - `models.py` - Pydantic model for scheduled prompt validation - `heartbeat_tracer.py` - Heartbeat span for long-running execution monitoring - **Added heartbeat mechanism** that updates run status to RUNNING every 60 seconds during execution (prevents timeouts on long prompts) - **Integrated with ChatRequest** via optional `trace_span` parameter for heartbeat callbacks during tool execution - **Configurable intervals** via environment variables: - `ENABLED_SCHEDULED_PROMPTS` (default: true) - `SCHEDULED_PROMPTS_POLL_INTERVAL_SECONDS` (default: 60) - might want to change to 5 mins - `SCHEDULED_PROMPTS_HEARTBEAT_INTERVAL_SECONDS` (default: 60) - might want to change to 5 mins - **Error handling** with proper status updates (RUNNING, COMPLETED, FAILED) - **Comprehensive test coverage** - 25 pytest tests covering executor lifecycle, prompt processing, heartbeat functionality, and error scenarios (all passing) ## Testing - ✅ Pytest - ✅ Tested on staging environment - ✅ Deployed on beta environment <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Background scheduled prompts execution with configurable active/inactive polling and heartbeat intervals * Heartbeat updates for long-running scheduled runs for improved reliability and visibility * Option to fetch additional system prompts from a configurable URL * Trace-span propagation through LLM requests for better request tracing * New scheduled prompt data model and run lifecycle states (including a non-retry failure state) * Toggle to enable/disable scheduled prompts * **Chore** * Default toolset status refresh interval changed to 300s * **Tests** * Comprehensive tests covering scheduled prompts lifecycle, heartbeats, fetching, validation, and error paths <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: avi@robusta.dev <avi@robusta.dev>
1 parent 6b27853 commit 9f527fd

File tree

9 files changed

+1121
-0
lines changed

9 files changed

+1121
-0
lines changed

holmes/common/env_vars.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,25 @@ def load_bool(env_var, default: Optional[bool]) -> Optional[bool]:
128128
KEEPALIVE_INTVL = int(os.environ.get("KEEPALIVE_INTVL", 2))
129129
KEEPALIVE_CNT = int(os.environ.get("KEEPALIVE_CNT", 5))
130130

131+
# Controls whether scheduled prompts executor runs at startup (defaults to on)
132+
ENABLED_SCHEDULED_PROMPTS = load_bool("ENABLED_SCHEDULED_PROMPTS", True)
133+
# Polling interval in seconds for accounts with active scheduled prompts (defaults to 60 seconds)
134+
SCHEDULED_PROMPTS_ACTIVE_POLL_INTERVAL_SECONDS = int(
135+
os.environ.get("SCHEDULED_PROMPTS_ACTIVE_POLL_INTERVAL_SECONDS", 60)
136+
)
137+
# Polling interval in seconds for accounts without scheduled prompts (defaults to 15 minutes)
138+
SCHEDULED_PROMPTS_INACTIVE_POLL_INTERVAL_SECONDS = int(
139+
os.environ.get("SCHEDULED_PROMPTS_INACTIVE_POLL_INTERVAL_SECONDS", 900)
140+
)
141+
# Heartbeat interval in seconds for updating scheduled prompt run status during execution
142+
SCHEDULED_PROMPTS_HEARTBEAT_INTERVAL_SECONDS = int(
143+
os.environ.get("SCHEDULED_PROMPTS_HEARTBEAT_INTERVAL_SECONDS", 60)
144+
)
145+
# for embedds
146+
ROBUSTA_UI_DOMAIN = os.environ.get(
147+
"ROBUSTA_UI_DOMAIN",
148+
"https://platform.robusta.dev",
149+
)
131150
# Periodic refresh interval for toolset status in server mode (in seconds)
132151
# Set to 0 to disable periodic refresh
133152
TOOLSET_STATUS_REFRESH_INTERVAL_SECONDS = int(

holmes/core/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ class ChatRequestBaseModel(BaseModel):
202202
)
203203
tool_decisions: Optional[List[ToolApprovalDecision]] = None
204204
additional_system_prompt: Optional[str] = None
205+
trace_span: Optional[Any] = (
206+
None # Optional span for tracing and heartbeat callbacks
207+
)
205208

206209
# In our setup with litellm, the first message in conversation_history
207210
# should follow the structure [{"role": "system", "content": ...}],
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from holmes.core.scheduled_prompts.executor import ScheduledPromptsExecutor
2+
from holmes.core.scheduled_prompts.heartbeat_tracer import (
3+
ScheduledPromptsHeartbeatSpan,
4+
)
5+
from holmes.core.scheduled_prompts.models import ScheduledPrompt
6+
7+
__all__ = [
8+
"ScheduledPromptsExecutor",
9+
"ScheduledPromptsHeartbeatSpan",
10+
"ScheduledPrompt",
11+
]
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
import json
2+
import logging
3+
import os
4+
import threading
5+
import time
6+
from typing import TYPE_CHECKING, Callable, Optional, Union
7+
from urllib.error import HTTPError, URLError
8+
from urllib.request import urlopen
9+
10+
from pydantic import ValidationError
11+
from starlette.requests import Request
12+
13+
from holmes import get_version
14+
from holmes.common.env_vars import (
15+
ROBUSTA_UI_DOMAIN,
16+
SCHEDULED_PROMPTS_ACTIVE_POLL_INTERVAL_SECONDS,
17+
SCHEDULED_PROMPTS_INACTIVE_POLL_INTERVAL_SECONDS,
18+
)
19+
from holmes.core.models import ChatRequest, ChatResponse
20+
from holmes.core.scheduled_prompts.heartbeat_tracer import (
21+
ScheduledPromptsHeartbeatSpan,
22+
)
23+
from holmes.core.scheduled_prompts.models import ScheduledPrompt
24+
from holmes.core.supabase_dal import RunStatus
25+
26+
# to prevent circular imports due to type hints
27+
if TYPE_CHECKING:
28+
from fastapi.responses import StreamingResponse
29+
30+
from holmes.config import Config
31+
from holmes.core.supabase_dal import SupabaseDal
32+
33+
ChatFunction = Callable[[ChatRequest, Request], Union["ChatResponse", "StreamingResponse"]]
34+
35+
ADDITIONAL_SYSTEM_PROMPT_URL = f"{ROBUSTA_UI_DOMAIN}/api/additional-system-prompt.json"
36+
37+
class ScheduledPromptsExecutor:
38+
def __init__(
39+
self,
40+
dal: "SupabaseDal",
41+
config: "Config",
42+
chat_function: ChatFunction,
43+
):
44+
self.dal = dal
45+
self.config = config
46+
self.chat_function = chat_function
47+
self.running = False
48+
self.thread: Optional[threading.Thread] = None
49+
# this is pod name in kubernetes
50+
self.holmes_id = os.environ.get("HOSTNAME") or str(os.getpid())
51+
# Dynamic polling interval based on whether account has scheduled prompts
52+
self.poll_interval_seconds = SCHEDULED_PROMPTS_INACTIVE_POLL_INTERVAL_SECONDS
53+
54+
def start(self):
55+
if not self.dal.enabled:
56+
logging.info(
57+
"ScheduledPromptsExecutor not started - Supabase DAL not enabled"
58+
)
59+
return
60+
61+
if self.running:
62+
logging.warning("ScheduledPromptsExecutor is already running")
63+
return
64+
65+
self.running = True
66+
self.thread = threading.Thread(target=self._run_loop, daemon=True)
67+
self.thread.start()
68+
logging.info("ScheduledPromptsExecutor started")
69+
70+
def stop(self):
71+
self.running = False
72+
if self.thread:
73+
self.thread.join(timeout=5)
74+
logging.info("ScheduledPromptsExecutor stopped")
75+
76+
def _run_loop(self):
77+
while self.running:
78+
try:
79+
had_payload = self._process_next_prompt()
80+
if not had_payload:
81+
# Update polling interval based on current state (may change if prompt deffinition added/removed)
82+
self._update_poll_interval()
83+
time.sleep(self.poll_interval_seconds)
84+
except Exception as exc:
85+
logging.exception(
86+
"Error in ScheduledPromptsExecutor loop: %s", exc, exc_info=True
87+
)
88+
89+
def _update_poll_interval(self):
90+
"""
91+
Update the polling interval based on whether the account has scheduled prompts.
92+
Only logs when the interval actually changes to avoid log spam.
93+
"""
94+
has_scheduled_prompts = self.dal.has_scheduled_prompt_definitions()
95+
new_interval = (
96+
SCHEDULED_PROMPTS_ACTIVE_POLL_INTERVAL_SECONDS
97+
if has_scheduled_prompts
98+
else SCHEDULED_PROMPTS_INACTIVE_POLL_INTERVAL_SECONDS
99+
)
100+
101+
if new_interval != self.poll_interval_seconds:
102+
old_interval = self.poll_interval_seconds
103+
self.poll_interval_seconds = new_interval
104+
logging.info(
105+
f"Polling interval changed from {old_interval}s to {new_interval}s "
106+
f"(account {'has' if has_scheduled_prompts else 'has no'} scheduled prompts)"
107+
)
108+
109+
def _process_next_prompt(self) -> bool:
110+
"""
111+
Process the next scheduled prompt, if available.
112+
113+
Returns:
114+
bool: True if a payload was found and processed, False if no payload available.
115+
"""
116+
payload = self.dal.claim_scheduled_prompt_run(self.holmes_id)
117+
if not payload:
118+
return False
119+
120+
try:
121+
sp = ScheduledPrompt(**payload)
122+
except ValidationError as exc:
123+
# due to the rpc call to supabase this row will not be pulled again on the next call of claim_scheduled_prompt_run so there is no worry of an endless loop here
124+
logging.exception(
125+
"Skipping invalid scheduled prompt payload: %s",
126+
exc,
127+
exc_info=True,
128+
)
129+
# Mark as failed_no_retry since the payload is invalid and retrying won't help
130+
run_id = payload.get("id") if isinstance(payload, dict) else None
131+
if run_id:
132+
self.dal.update_run_status(
133+
run_id=run_id,
134+
status=RunStatus.FAILED_NO_RETRY,
135+
msg=f"Invalid scheduled prompt payload: {str(exc)}",
136+
)
137+
138+
# Return True since we did find a payload, even if it was invalid
139+
return True
140+
141+
try:
142+
self._execute_scheduled_prompt(sp)
143+
except Exception as exc:
144+
logging.exception(
145+
"Error executing scheduled %s prompt: %s",
146+
sp.id,
147+
exc,
148+
exc_info=True,
149+
)
150+
self._finish_run(
151+
status=RunStatus.FAILED,
152+
result={"error": str(exc)},
153+
sp=sp,
154+
)
155+
156+
return True
157+
158+
def _execute_scheduled_prompt(self, sp: ScheduledPrompt):
159+
run_id = sp.id
160+
available_models = self.config.get_models_list()
161+
if sp.model_name not in available_models:
162+
error_msg = f"Model '{sp.model_name}' not found in available models: {available_models}"
163+
logging.warning(
164+
"Pending run %s has invalid model_name '%s', marking as failed",
165+
run_id,
166+
sp.model_name,
167+
)
168+
self._finish_run(
169+
status=RunStatus.FAILED,
170+
result={"error": error_msg},
171+
sp=sp,
172+
)
173+
return
174+
175+
logging.info(
176+
"Found pending run %s, executing with model %s", run_id, sp.model_name
177+
)
178+
self._execute_prompt(sp)
179+
logging.info("Successfully completed run %s", run_id)
180+
181+
def _execute_prompt(
182+
self,
183+
sp: ScheduledPrompt,
184+
):
185+
start = time.perf_counter()
186+
additional_system_prompt = self._fetch_additional_system_prompt(
187+
sp.prompt.get("additional_system_prompt")
188+
)
189+
190+
# Create heartbeat span
191+
heartbeat_span = ScheduledPromptsHeartbeatSpan(sp=sp, dal=self.dal)
192+
193+
# Create chat request with heartbeat span
194+
chat_request = ChatRequest(
195+
ask=self._extract_prompt_text(sp.prompt),
196+
model=sp.model_name,
197+
conversation_history=None,
198+
stream=False,
199+
additional_system_prompt=additional_system_prompt,
200+
trace_span=heartbeat_span,
201+
)
202+
203+
empty_request = Request(scope={"type": "http", "headers": []})
204+
response = self.chat_function(chat_request, empty_request)
205+
duration_seconds = time.perf_counter() - start
206+
207+
if isinstance(response, ChatResponse):
208+
response.metadata = dict(response.metadata or {})
209+
response.metadata["duration_seconds"] = duration_seconds
210+
211+
result_data = (
212+
response.model_dump() if isinstance(response, ChatResponse) else {}
213+
)
214+
215+
self._finish_run(status=RunStatus.COMPLETED, result=result_data, sp=sp)
216+
217+
return response
218+
219+
def _fetch_additional_system_prompt(
220+
self, fallback: Optional[str] = None
221+
) -> Optional[str]:
222+
"""
223+
Fetches the additional system prompt from the Robusta platform.
224+
Falls back to the provided value if the fetch fails.
225+
"""
226+
try:
227+
with urlopen(ADDITIONAL_SYSTEM_PROMPT_URL, timeout=10) as resp:
228+
if resp.status != 200:
229+
logging.warning(
230+
"Failed to fetch additional system prompt, status: %s",
231+
resp.status,
232+
)
233+
return fallback
234+
data = json.loads(resp.read().decode("utf-8"))
235+
return data.get("additional_system_prompt", fallback)
236+
except (HTTPError, URLError, TimeoutError, ValueError) as exc:
237+
logging.warning(
238+
"Error fetching additional system prompt, using fallback: %s", exc
239+
)
240+
return fallback
241+
242+
def _finish_run(
243+
self,
244+
status: RunStatus,
245+
result: dict,
246+
sp: ScheduledPrompt,
247+
) -> None:
248+
self.dal.finish_scheduled_prompt_run(
249+
status=status,
250+
result=result,
251+
run_id=sp.id,
252+
scheduled_prompt_definition_id=sp.scheduled_prompt_definition_id,
253+
version=get_version(),
254+
metadata=sp.metadata,
255+
)
256+
257+
def _extract_prompt_text(self, prompt: Union[str, dict]) -> str:
258+
"""
259+
Extracts the prompt text from the prompt.
260+
Any additional changes to the prompt object or how we refactor it in the future should be handled here.
261+
"""
262+
if isinstance(prompt, dict):
263+
raw = prompt.get("raw_prompt")
264+
if raw:
265+
return raw
266+
return str(prompt)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import logging
2+
import time
3+
from typing import TYPE_CHECKING, Optional
4+
5+
from holmes.common.env_vars import SCHEDULED_PROMPTS_HEARTBEAT_INTERVAL_SECONDS
6+
from holmes.core.supabase_dal import RunStatus
7+
from holmes.core.tracing import DummySpan
8+
9+
if TYPE_CHECKING:
10+
from holmes.core.scheduled_prompts.models import ScheduledPrompt
11+
from holmes.core.supabase_dal import SupabaseDal
12+
13+
14+
class ScheduledPromptsHeartbeatSpan(DummySpan):
15+
"""A span that sends heartbeats for scheduled prompt execution."""
16+
17+
def __init__(
18+
self,
19+
sp: "ScheduledPrompt",
20+
dal: "SupabaseDal",
21+
heartbeat_interval_seconds: int = SCHEDULED_PROMPTS_HEARTBEAT_INTERVAL_SECONDS,
22+
):
23+
"""
24+
Args:
25+
sp: The scheduled prompt being executed
26+
dal: Database access layer for updating run status
27+
heartbeat_interval_seconds: Minimum seconds between heartbeat calls
28+
"""
29+
self.sp = sp
30+
self.dal = dal
31+
self.heartbeat_interval_seconds = heartbeat_interval_seconds
32+
self.last_heartbeat_time = time.time()
33+
34+
def start_span(self, name: Optional[str] = None, span_type=None, **kwargs):
35+
"""Override start_span to trigger heartbeat on activity. Typically called during tool calls"""
36+
self._maybe_heartbeat()
37+
return ScheduledPromptsHeartbeatSpan(
38+
sp=self.sp,
39+
dal=self.dal,
40+
heartbeat_interval_seconds=self.heartbeat_interval_seconds,
41+
)
42+
43+
def log(self, *args, **kwargs):
44+
"""Override log to trigger heartbeat on activity."""
45+
self._maybe_heartbeat()
46+
47+
def _maybe_heartbeat(self):
48+
"""Send heartbeat if enough time has elapsed."""
49+
current_time = time.time()
50+
if current_time - self.last_heartbeat_time >= self.heartbeat_interval_seconds:
51+
try:
52+
self.dal.update_run_status(run_id=self.sp.id, status=RunStatus.RUNNING)
53+
self.last_heartbeat_time = current_time
54+
logging.debug(f"Heartbeat for SP - {self.sp.id}")
55+
except Exception as e:
56+
logging.warning(f"Heartbeat callback failed for SP - {self.sp.id}: {e}")
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from datetime import datetime
2+
from typing import Any, Dict, Optional
3+
4+
from pydantic import BaseModel
5+
6+
7+
class ScheduledPrompt(BaseModel):
8+
id: str
9+
scheduled_prompt_definition_id: Optional[str] = None
10+
account_id: str
11+
cluster_name: str
12+
model_name: str
13+
prompt: Dict[str, Any]
14+
status: str
15+
msg: Optional[str] = None
16+
created_at: datetime
17+
last_heartbeat_at: Optional[datetime] = None
18+
metadata: Optional[dict] = None

0 commit comments

Comments
 (0)