Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions lib/crewai/src/crewai/events/signal_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""System signal manager for CrewAI.

This module provides a singleton manager that bridges OS signals to the CrewAI
event bus, independent of telemetry settings. This ensures that signal events
(SigTermEvent, SigIntEvent, etc.) are always emitted when signals are received,
regardless of whether telemetry is enabled or disabled.
"""

from __future__ import annotations

import logging
import signal
import threading
from typing import TYPE_CHECKING, Any

from typing_extensions import Self

from crewai.events.event_bus import crewai_event_bus


if TYPE_CHECKING:
from collections.abc import Callable

from crewai.events.base_events import BaseEvent

EventFactory = Callable[[], BaseEvent]

logger = logging.getLogger(__name__)


class SystemSignalManager:
"""Singleton manager for bridging OS signals to the CrewAI event bus.

This class registers signal handlers that emit corresponding events to the
event bus, allowing any code to listen for system signals via the event
system. It operates independently of telemetry settings.

The manager supports handler chaining: when a signal handler is registered,
it preserves any previously registered handler and calls it after emitting
the event. This allows user code to register handlers before or after
CrewAI initialization.

Attributes:
_instance: Singleton instance of the manager.
_lock: Thread lock for singleton initialization.
_original_handlers: Mapping of signals to their original handlers.
_registered_signals: Set of signals that have been registered.
"""

_instance: Self | None = None
_lock: threading.Lock = threading.Lock()

def __new__(cls) -> Self:
"""Create or return the singleton instance.

Returns:
The singleton SystemSignalManager instance.
"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance

def __init__(self) -> None:
"""Initialize the signal manager.

This is safe to call multiple times; initialization only happens once.
"""
if getattr(self, "_initialized", False):
return

self._initialized: bool = True
self._original_handlers: dict[signal.Signals, Any] = {}
self._registered_signals: set[signal.Signals] = set()
self._handler_lock = threading.Lock()

def register_signal(
self,
sig: signal.Signals,
event_factory: EventFactory,
shutdown: bool = False,
) -> None:
"""Register a signal handler that emits an event to the event bus.

This method can be called multiple times for the same signal. Each call
will re-read the current signal handler and wrap it, ensuring that any
handlers registered after the initial setup are still called.

Args:
sig: The signal to handle (e.g., signal.SIGTERM).
event_factory: A callable that creates the event to emit.
shutdown: If True, raise SystemExit(0) after handling if there
was no original handler to call.
"""
with self._handler_lock:
try:
original_handler = signal.getsignal(sig)
self._original_handlers[sig] = original_handler

def handler(signum: int, frame: Any) -> None:
crewai_event_bus.emit(self, event_factory())

if original_handler not in (signal.SIG_DFL, signal.SIG_IGN, None):
if callable(original_handler):
original_handler(signum, frame)
elif shutdown:
raise SystemExit(0)

signal.signal(sig, handler)
self._registered_signals.add(sig)
except ValueError as e:
logger.warning(
f"Cannot register {sig.name} handler: not running in main thread",
exc_info=e,
)
except OSError as e:
logger.warning(f"Cannot register {sig.name} handler: {e}", exc_info=e)

def ensure_handlers_installed(self) -> None:
"""Ensure signal handlers are installed, re-wrapping if necessary.

This method can be called to reinstall signal handlers, which is useful
when user code has registered handlers after CrewAI's initial setup.
The handlers will be re-registered to wrap any new handlers that were
installed since the last registration.

This is a no-op if called before any signals have been registered via
register_signal().
"""


system_signal_manager: SystemSignalManager = SystemSignalManager()
19 changes: 19 additions & 0 deletions lib/crewai/src/crewai/events/types/system_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,22 @@ def on_signal(func: T) -> T:
for event_type in SIGNAL_EVENT_TYPES:
crewai_event_bus.on(event_type)(func)
return func


def _register_default_system_signal_handlers() -> None:
"""Register default signal handlers that emit events to the event bus.

This function is called at module import time to ensure signal events
are always available, regardless of telemetry settings. It bridges OS
signals to the CrewAI event bus so that @on_signal handlers work.
"""
from crewai.events.signal_manager import system_signal_manager

system_signal_manager.register_signal(signal.SIGTERM, SigTermEvent, shutdown=True)
system_signal_manager.register_signal(signal.SIGINT, SigIntEvent, shutdown=True)
system_signal_manager.register_signal(signal.SIGHUP, SigHupEvent, shutdown=False)
system_signal_manager.register_signal(signal.SIGTSTP, SigTStpEvent, shutdown=False)
system_signal_manager.register_signal(signal.SIGCONT, SigContEvent, shutdown=False)


_register_default_system_signal_handlers()
66 changes: 16 additions & 50 deletions lib/crewai/src/crewai/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import logging
import os
import platform
import signal
import threading
from typing import TYPE_CHECKING, Any

Expand All @@ -35,11 +34,9 @@

from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.system_events import (
SigContEvent,
SigHupEvent,
SigIntEvent,
SigTStpEvent,
SigTermEvent,
SignalEvent,
)
from crewai.telemetry.constants import (
CREWAI_TELEMETRY_BASE_URL,
Expand Down Expand Up @@ -131,7 +128,7 @@ def __init__(self) -> None:
)

self.provider.add_span_processor(processor)
self._register_shutdown_handlers()
self._register_telemetry_shutdown_handlers()
self.ready = True
except Exception as e:
if isinstance(
Expand Down Expand Up @@ -166,55 +163,24 @@ def set_tracer(self) -> None:
self.ready = False
self.trace_set = False

def _register_shutdown_handlers(self) -> None:
"""Register handlers for graceful shutdown on process exit and signals."""
atexit.register(self._shutdown)
def _register_telemetry_shutdown_handlers(self) -> None:
"""Register handlers for graceful telemetry shutdown on process exit and signals.

self._original_handlers: dict[int, Any] = {}
This method registers:
1. An atexit handler for normal process exit
2. Event bus handlers for SIGTERM and SIGINT to flush telemetry before shutdown

self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True)
self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True)
self._register_signal_handler(signal.SIGHUP, SigHupEvent, shutdown=False)
self._register_signal_handler(signal.SIGTSTP, SigTStpEvent, shutdown=False)
self._register_signal_handler(signal.SIGCONT, SigContEvent, shutdown=False)
Note: The actual OS signal handlers are registered by SystemSignalManager
(via system_events module import), which emits events to the event bus.
Telemetry subscribes to these events to perform cleanup.
"""
atexit.register(self._shutdown)

def _register_signal_handler(
self,
sig: signal.Signals,
event_class: type,
shutdown: bool = False,
) -> None:
"""Register a signal handler that emits an event.
def _on_shutdown_signal(source: object, event: SignalEvent) -> None:
self._shutdown()

Args:
sig: The signal to handle.
event_class: The event class to instantiate and emit.
shutdown: Whether to trigger shutdown on this signal.
"""
try:
original_handler = signal.getsignal(sig)
self._original_handlers[sig] = original_handler

def handler(signum: int, frame: Any) -> None:
crewai_event_bus.emit(self, event_class())

if shutdown:
self._shutdown()

if original_handler not in (signal.SIG_DFL, signal.SIG_IGN, None):
if callable(original_handler):
original_handler(signum, frame)
elif shutdown:
raise SystemExit(0)

signal.signal(sig, handler)
except ValueError as e:
logger.warning(
f"Cannot register {sig.name} handler: not running in main thread",
exc_info=e,
)
except OSError as e:
logger.warning(f"Cannot register {sig.name} handler: {e}", exc_info=e)
crewai_event_bus.on(SigTermEvent)(_on_shutdown_signal)
crewai_event_bus.on(SigIntEvent)(_on_shutdown_signal)

def _shutdown(self) -> None:
"""Flush and shutdown the telemetry provider on process exit.
Expand Down
Loading
Loading