diff --git a/src/agents/realtime/items.py b/src/agents/realtime/items.py index 117a35a02..a027b2a39 100644 --- a/src/agents/realtime/items.py +++ b/src/agents/realtime/items.py @@ -92,7 +92,7 @@ class RealtimeToolCallItem(BaseModel): model_config = ConfigDict(extra="allow") -RealtimeItem = RealtimeMessageItem | RealtimeToolCallItem +RealtimeItem = Union[RealtimeMessageItem, RealtimeToolCallItem] class RealtimeResponse(BaseModel): diff --git a/src/agents/realtime/transport.py b/src/agents/realtime/transport.py new file mode 100644 index 000000000..18290d128 --- /dev/null +++ b/src/agents/realtime/transport.py @@ -0,0 +1,107 @@ +import abc +from typing import Any, Literal, Union + +from typing_extensions import NotRequired, TypeAlias, TypedDict + +from .config import APIKeyOrKeyFunc, RealtimeClientMessage, RealtimeSessionConfig, RealtimeUserInput +from .transport_events import RealtimeTransportEvent, RealtimeTransportToolCallEvent + +RealtimeModelName: TypeAlias = Union[ + Literal[ + "gpt-4o-realtime-preview", + "gpt-4o-mini-realtime-preview", + "gpt-4o-realtime-preview-2025-06-03", + "gpt-4o-realtime-preview-2024-12-17", + "gpt-4o-realtime-preview-2024-10-01", + "gpt-4o-mini-realtime-preview-2024-12-17", + ], + str, +] +"""The name of a realtime model.""" + + +class RealtimeTransportListener(abc.ABC): + """A listener for realtime transport events.""" + + @abc.abstractmethod + async def on_event(self, event: RealtimeTransportEvent) -> None: + """Called when an event is emitted by the realtime transport.""" + pass + + +class RealtimeTransportConnectionOptions(TypedDict): + """Options for connecting to a realtime transport.""" + + api_key: NotRequired[APIKeyOrKeyFunc] + """The API key to use for the transport. If unset, the transport will attempt to use the + `OPENAI_API_KEY` environment variable. + """ + + model: NotRequired[str] + """The model to use.""" + + url: NotRequired[str] + """The URL to use for the transport. If unset, the transport will use the default OpenAI + WebSocket URL. + """ + + initial_session_config: NotRequired[RealtimeSessionConfig] + + +class RealtimeSessionTransport(abc.ABC): + """A transport layer for realtime sessions.""" + + @abc.abstractmethod + async def connect(self, options: RealtimeTransportConnectionOptions) -> None: + """Establish a connection to the model and keep it alive.""" + pass + + @abc.abstractmethod + def add_listener(self, listener: RealtimeTransportListener) -> None: + """Add a listener to the transport.""" + pass + + @abc.abstractmethod + async def remove_listener(self, listener: RealtimeTransportListener) -> None: + """Remove a listener from the transport.""" + pass + + @abc.abstractmethod + async def send_event(self, event: RealtimeClientMessage) -> None: + """Send an event to the model.""" + pass + + @abc.abstractmethod + async def send_message( + self, message: RealtimeUserInput, other_event_data: dict[str, Any] | None = None + ) -> None: + """Send a message to the model.""" + pass + + @abc.abstractmethod + async def send_audio(self, audio: bytes, *, commit: bool = False) -> None: + """Send a raw audio chunk to the model. + + Args: + audio: The audio data to send. + commit: Whether to commit the audio buffer to the model. If the model does not do turn + detection, this can be used to indicate the turn is completed. + """ + pass + + @abc.abstractmethod + async def send_tool_output( + self, tool_call: RealtimeTransportToolCallEvent, output: str, start_response: bool + ) -> None: + """Send tool output to the model.""" + pass + + @abc.abstractmethod + async def interrupt(self) -> None: + """Interrupt the model. For example, could be triggered by a guardrail.""" + pass + + @abc.abstractmethod + async def close(self) -> None: + """Close the session.""" + pass diff --git a/src/agents/realtime/transport_events.py b/src/agents/realtime/transport_events.py new file mode 100644 index 000000000..735577b17 --- /dev/null +++ b/src/agents/realtime/transport_events.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Literal, Union + +from typing_extensions import TypeAlias + +from .items import RealtimeItem + +RealtimeConnectionStatus: TypeAlias = Literal["connecting", "connected", "disconnected"] + + +@dataclass +class RealtimeTransportErrorEvent: + """Represents a transport‑layer error.""" + + error: Any + + type: Literal["error"] = "error" + + +@dataclass +class RealtimeTransportToolCallEvent: + """Model attempted a tool/function call.""" + + name: str + call_id: str + arguments: str + + id: str | None = None + previous_item_id: str | None = None + + type: Literal["function_call"] = "function_call" + + +@dataclass +class RealtimeTransportAudioEvent: + """Raw audio bytes emitted by the model.""" + + data: bytes + response_id: str + + type: Literal["audio"] = "audio" + + +@dataclass +class RealtimeTransportAudioInterruptedEvent: + """Audio interrupted.""" + + type: Literal["audio_interrupted"] = "audio_interrupted" + + +@dataclass +class RealtimeTransportAudioDoneEvent: + """Audio done.""" + + type: Literal["audio_done"] = "audio_done" + + +@dataclass +class RealtimeTransportInputAudioTranscriptionCompletedEvent: + """Input audio transcription completed.""" + + item_id: str + transcript: str + + type: Literal["conversation.item.input_audio_transcription.completed"] = ( + "conversation.item.input_audio_transcription.completed" + ) + + +@dataclass +class RealtimeTransportTranscriptDelta: + """Partial transcript update.""" + + item_id: str + delta: str + response_id: str + + type: Literal["transcript_delta"] = "transcript_delta" + + +@dataclass +class RealtimeTransportItemUpdatedEvent: + """Item added to the history or updated.""" + + item: RealtimeItem + + type: Literal["item_updated"] = "item_updated" + + +@dataclass +class RealtimeTransportItemDeletedEvent: + """Item deleted from the history.""" + + item_id: str + + type: Literal["item_deleted"] = "item_deleted" + + +@dataclass +class RealtimeTransportConnectionStatusEvent: + """Connection status changed.""" + + status: RealtimeConnectionStatus + + type: Literal["connection_status"] = "connection_status" + + +@dataclass +class RealtimeTransportTurnStartedEvent: + """Triggered when the model starts generating a response for a turn.""" + + type: Literal["turn_started"] = "turn_started" + + +@dataclass +class RealtimeTransportTurnEndedEvent: + """Triggered when the model finishes generating a response for a turn.""" + + type: Literal["turn_ended"] = "turn_ended" + + +@dataclass +class RealtimeTransportOtherEvent: + """Used as a catchall for vendor-specific events.""" + + data: Any + + type: Literal["other"] = "other" + + +# TODO (rm) Add usage events + + +RealtimeTransportEvent: TypeAlias = Union[ + RealtimeTransportErrorEvent, + RealtimeTransportToolCallEvent, + RealtimeTransportAudioEvent, + RealtimeTransportAudioInterruptedEvent, + RealtimeTransportAudioDoneEvent, + RealtimeTransportInputAudioTranscriptionCompletedEvent, + RealtimeTransportTranscriptDelta, + RealtimeTransportItemUpdatedEvent, + RealtimeTransportItemDeletedEvent, + RealtimeTransportConnectionStatusEvent, + RealtimeTransportTurnStartedEvent, + RealtimeTransportTurnEndedEvent, + RealtimeTransportOtherEvent, +] diff --git a/tests/realtime/test_transport_events.py b/tests/realtime/test_transport_events.py new file mode 100644 index 000000000..2219303d0 --- /dev/null +++ b/tests/realtime/test_transport_events.py @@ -0,0 +1,12 @@ +from typing import get_args + +from agents.realtime.transport_events import RealtimeTransportEvent + + +def test_all_events_have_type() -> None: + """Test that all events have a type.""" + events = get_args(RealtimeTransportEvent) + assert len(events) > 0 + for event in events: + assert event.type is not None + assert isinstance(event.type, str)