Skip to content

Commit 2355ec0

Browse files
feat: create sys event types and handler
feat: add system event types and handler chore: add tests and improve signal-related error logging
1 parent c925d2d commit 2355ec0

File tree

4 files changed

+386
-0
lines changed

4 files changed

+386
-0
lines changed

lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
AgentReasoningFailedEvent,
7272
AgentReasoningStartedEvent,
7373
)
74+
from crewai.events.types.system_events import SignalEvent, on_signal
7475
from crewai.events.types.task_events import (
7576
TaskCompletedEvent,
7677
TaskFailedEvent,
@@ -159,6 +160,7 @@ def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
159160
self._register_flow_event_handlers(crewai_event_bus)
160161
self._register_context_event_handlers(crewai_event_bus)
161162
self._register_action_event_handlers(crewai_event_bus)
163+
self._register_system_event_handlers(crewai_event_bus)
162164

163165
self._listeners_setup = True
164166

@@ -458,6 +460,15 @@ def on_knowledge_query_failed(
458460
) -> None:
459461
self._handle_action_event("knowledge_query_failed", source, event)
460462

463+
def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
464+
"""Register handlers for system signal events (SIGTERM, SIGINT, etc.)."""
465+
466+
@on_signal
467+
def handle_signal(source: Any, event: SignalEvent) -> None:
468+
"""Flush trace batch on system signals to prevent data loss."""
469+
if self.batch_manager.is_batch_initialized():
470+
self.batch_manager.finalize_batch()
471+
461472
def _initialize_crew_batch(self, source: Any, event: Any) -> None:
462473
"""Initialize trace batch.
463474
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
"""System signal event types for CrewAI.
2+
3+
This module contains event types for system-level signals like SIGTERM,
4+
allowing listeners to perform cleanup operations before process termination.
5+
"""
6+
7+
from collections.abc import Callable
8+
from enum import IntEnum
9+
import signal
10+
from typing import Annotated, Literal, TypeVar
11+
12+
from pydantic import Field, TypeAdapter
13+
14+
from crewai.events.base_events import BaseEvent
15+
16+
17+
class SignalType(IntEnum):
18+
"""Enumeration of supported system signals."""
19+
20+
SIGTERM = signal.SIGTERM
21+
SIGINT = signal.SIGINT
22+
SIGHUP = signal.SIGHUP
23+
SIGTSTP = signal.SIGTSTP
24+
SIGCONT = signal.SIGCONT
25+
26+
27+
class SigTermEvent(BaseEvent):
28+
"""Event emitted when SIGTERM is received."""
29+
30+
type: Literal["SIGTERM"] = "SIGTERM"
31+
signal_number: SignalType = SignalType.SIGTERM
32+
reason: str | None = None
33+
34+
35+
class SigIntEvent(BaseEvent):
36+
"""Event emitted when SIGINT is received."""
37+
38+
type: Literal["SIGINT"] = "SIGINT"
39+
signal_number: SignalType = SignalType.SIGINT
40+
reason: str | None = None
41+
42+
43+
class SigHupEvent(BaseEvent):
44+
"""Event emitted when SIGHUP is received."""
45+
46+
type: Literal["SIGHUP"] = "SIGHUP"
47+
signal_number: SignalType = SignalType.SIGHUP
48+
reason: str | None = None
49+
50+
51+
class SigTStpEvent(BaseEvent):
52+
"""Event emitted when SIGTSTP is received.
53+
54+
Note: SIGSTOP cannot be caught - it immediately suspends the process.
55+
"""
56+
57+
type: Literal["SIGTSTP"] = "SIGTSTP"
58+
signal_number: SignalType = SignalType.SIGTSTP
59+
reason: str | None = None
60+
61+
62+
class SigContEvent(BaseEvent):
63+
"""Event emitted when SIGCONT is received."""
64+
65+
type: Literal["SIGCONT"] = "SIGCONT"
66+
signal_number: SignalType = SignalType.SIGCONT
67+
reason: str | None = None
68+
69+
70+
SignalEvent = Annotated[
71+
SigTermEvent | SigIntEvent | SigHupEvent | SigTStpEvent | SigContEvent,
72+
Field(discriminator="type"),
73+
]
74+
75+
signal_event_adapter: TypeAdapter[SignalEvent] = TypeAdapter(SignalEvent)
76+
77+
SIGNAL_EVENT_TYPES: tuple[type[BaseEvent], ...] = (
78+
SigTermEvent,
79+
SigIntEvent,
80+
SigHupEvent,
81+
SigTStpEvent,
82+
SigContEvent,
83+
)
84+
85+
86+
T = TypeVar("T", bound=Callable[[object, SignalEvent], None])
87+
88+
89+
def on_signal(func: T) -> T:
90+
"""Decorator to register a handler for all signal events.
91+
92+
Args:
93+
func: Handler function that receives (source, event) arguments.
94+
95+
Returns:
96+
The original function, registered for all signal event types.
97+
"""
98+
from crewai.events.event_bus import crewai_event_bus
99+
100+
for event_type in SIGNAL_EVENT_TYPES:
101+
crewai_event_bus.on(event_type)(func)
102+
return func

lib/crewai/src/crewai/telemetry/telemetry.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
from __future__ import annotations
1010

1111
import asyncio
12+
import atexit
1213
from collections.abc import Callable
1314
from importlib.metadata import version
1415
import json
1516
import logging
1617
import os
1718
import platform
19+
import signal
1820
import threading
1921
from typing import TYPE_CHECKING, Any
2022

@@ -31,6 +33,14 @@
3133
from opentelemetry.trace import Span
3234
from typing_extensions import Self
3335

36+
from crewai.events.event_bus import crewai_event_bus
37+
from crewai.events.types.system_events import (
38+
SigContEvent,
39+
SigHupEvent,
40+
SigIntEvent,
41+
SigTStpEvent,
42+
SigTermEvent,
43+
)
3444
from crewai.telemetry.constants import (
3545
CREWAI_TELEMETRY_BASE_URL,
3646
CREWAI_TELEMETRY_SERVICE_NAME,
@@ -121,6 +131,7 @@ def __init__(self) -> None:
121131
)
122132

123133
self.provider.add_span_processor(processor)
134+
self._register_shutdown_handlers()
124135
self.ready = True
125136
except Exception as e:
126137
if isinstance(
@@ -155,6 +166,71 @@ def set_tracer(self) -> None:
155166
self.ready = False
156167
self.trace_set = False
157168

169+
def _register_shutdown_handlers(self) -> None:
170+
"""Register handlers for graceful shutdown on process exit and signals."""
171+
atexit.register(self._shutdown)
172+
173+
self._original_handlers: dict[int, Any] = {}
174+
175+
self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True)
176+
self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True)
177+
self._register_signal_handler(signal.SIGHUP, SigHupEvent, shutdown=False)
178+
self._register_signal_handler(signal.SIGTSTP, SigTStpEvent, shutdown=False)
179+
self._register_signal_handler(signal.SIGCONT, SigContEvent, shutdown=False)
180+
181+
def _register_signal_handler(
182+
self,
183+
sig: signal.Signals,
184+
event_class: type,
185+
shutdown: bool = False,
186+
) -> None:
187+
"""Register a signal handler that emits an event.
188+
189+
Args:
190+
sig: The signal to handle.
191+
event_class: The event class to instantiate and emit.
192+
shutdown: Whether to trigger shutdown on this signal.
193+
"""
194+
try:
195+
original_handler = signal.getsignal(sig)
196+
self._original_handlers[sig] = original_handler
197+
198+
def handler(signum: int, frame: Any) -> None:
199+
crewai_event_bus.emit(self, event_class())
200+
201+
if shutdown:
202+
self._shutdown()
203+
204+
if original_handler not in (signal.SIG_DFL, signal.SIG_IGN, None):
205+
if callable(original_handler):
206+
original_handler(signum, frame)
207+
elif shutdown:
208+
raise SystemExit(0)
209+
210+
signal.signal(sig, handler)
211+
except ValueError as e:
212+
logger.warning(
213+
f"Cannot register {sig.name} handler: not running in main thread",
214+
exc_info=e,
215+
)
216+
except OSError as e:
217+
logger.warning(f"Cannot register {sig.name} handler: {e}", exc_info=e)
218+
219+
def _shutdown(self) -> None:
220+
"""Flush and shutdown the telemetry provider on process exit.
221+
222+
Uses a short timeout to avoid blocking process shutdown.
223+
"""
224+
if not self.ready:
225+
return
226+
227+
try:
228+
self.provider.force_flush(timeout_millis=5000)
229+
self.provider.shutdown()
230+
self.ready = False
231+
except Exception as e:
232+
logger.debug(f"Telemetry shutdown failed: {e}")
233+
158234
def _safe_telemetry_operation(
159235
self, operation: Callable[[], Span | None]
160236
) -> Span | None:

0 commit comments

Comments
 (0)