Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/agents/realtime/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class RealtimeToolCallItem(BaseModel):
model_config = ConfigDict(extra="allow")


RealtimeItem = RealtimeMessageItem | RealtimeToolCallItem
RealtimeItem = Union[RealtimeMessageItem, RealtimeToolCallItem]


class RealtimeResponse(BaseModel):
Expand Down
107 changes: 107 additions & 0 deletions src/agents/realtime/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import abc
from typing import Any, Literal, Union

from typing_extensions import NotRequired, TypeAlias, TypedDict

from .config import APIKeyOrKeyFunc, RealtimeClientMessage, RealtimeSessionConfig, RealtimeUserInput
from .transport_events import RealtimeTransportEvent, RealtimeTransportToolCallEvent

RealtimeModelName: TypeAlias = Union[
Literal[
"gpt-4o-realtime-preview",
"gpt-4o-mini-realtime-preview",
"gpt-4o-realtime-preview-2025-06-03",
"gpt-4o-realtime-preview-2024-12-17",
"gpt-4o-realtime-preview-2024-10-01",
"gpt-4o-mini-realtime-preview-2024-12-17",
],
str,
]
"""The name of a realtime model."""


class RealtimeTransportListener(abc.ABC):
"""A listener for realtime transport events."""

@abc.abstractmethod
async def on_event(self, event: RealtimeTransportEvent) -> None:
"""Called when an event is emitted by the realtime transport."""
pass


class RealtimeTransportConnectionOptions(TypedDict):
"""Options for connecting to a realtime transport."""

api_key: NotRequired[APIKeyOrKeyFunc]
"""The API key to use for the transport. If unset, the transport will attempt to use the
`OPENAI_API_KEY` environment variable.
"""

model: NotRequired[str]
"""The model to use."""

url: NotRequired[str]
"""The URL to use for the transport. If unset, the transport will use the default OpenAI
WebSocket URL.
"""

initial_session_config: NotRequired[RealtimeSessionConfig]


class RealtimeSessionTransport(abc.ABC):
"""A transport layer for realtime sessions."""

@abc.abstractmethod
async def connect(self, options: RealtimeTransportConnectionOptions) -> None:
"""Establish a connection to the model and keep it alive."""
pass

@abc.abstractmethod
def add_listener(self, listener: RealtimeTransportListener) -> None:
"""Add a listener to the transport."""
pass

@abc.abstractmethod
async def remove_listener(self, listener: RealtimeTransportListener) -> None:
"""Remove a listener from the transport."""
pass

@abc.abstractmethod
async def send_event(self, event: RealtimeClientMessage) -> None:
"""Send an event to the model."""
pass

@abc.abstractmethod
async def send_message(
self, message: RealtimeUserInput, other_event_data: dict[str, Any] | None = None
) -> None:
"""Send a message to the model."""
pass

@abc.abstractmethod
async def send_audio(self, audio: bytes, *, commit: bool = False) -> None:
"""Send a raw audio chunk to the model.

Args:
audio: The audio data to send.
commit: Whether to commit the audio buffer to the model. If the model does not do turn
detection, this can be used to indicate the turn is completed.
"""
pass

@abc.abstractmethod
async def send_tool_output(
self, tool_call: RealtimeTransportToolCallEvent, output: str, start_response: bool
) -> None:
"""Send tool output to the model."""
pass

@abc.abstractmethod
async def interrupt(self) -> None:
"""Interrupt the model. For example, could be triggered by a guardrail."""
pass

@abc.abstractmethod
async def close(self) -> None:
"""Close the session."""
pass
150 changes: 150 additions & 0 deletions src/agents/realtime/transport_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Literal, Union

from typing_extensions import TypeAlias

from .items import RealtimeItem

RealtimeConnectionStatus: TypeAlias = Literal["connecting", "connected", "disconnected"]


@dataclass
class RealtimeTransportErrorEvent:
"""Represents a transport‑layer error."""

error: Any

type: Literal["error"] = "error"


@dataclass
class RealtimeTransportToolCallEvent:
"""Model attempted a tool/function call."""

name: str
call_id: str
arguments: str

id: str | None = None
previous_item_id: str | None = None

type: Literal["function_call"] = "function_call"


@dataclass
class RealtimeTransportAudioEvent:
"""Raw audio bytes emitted by the model."""

data: bytes
response_id: str

type: Literal["audio"] = "audio"


@dataclass
class RealtimeTransportAudioInterruptedEvent:
"""Audio interrupted."""

type: Literal["audio_interrupted"] = "audio_interrupted"


@dataclass
class RealtimeTransportAudioDoneEvent:
"""Audio done."""

type: Literal["audio_done"] = "audio_done"


@dataclass
class RealtimeTransportInputAudioTranscriptionCompletedEvent:
"""Input audio transcription completed."""

item_id: str
transcript: str

type: Literal["conversation.item.input_audio_transcription.completed"] = (
"conversation.item.input_audio_transcription.completed"
)


@dataclass
class RealtimeTransportTranscriptDelta:
"""Partial transcript update."""

item_id: str
delta: str
response_id: str

type: Literal["transcript_delta"] = "transcript_delta"


@dataclass
class RealtimeTransportItemUpdatedEvent:
"""Item added to the history or updated."""

item: RealtimeItem

type: Literal["item_updated"] = "item_updated"


@dataclass
class RealtimeTransportItemDeletedEvent:
"""Item deleted from the history."""

item_id: str

type: Literal["item_deleted"] = "item_deleted"


@dataclass
class RealtimeTransportConnectionStatusEvent:
"""Connection status changed."""

status: RealtimeConnectionStatus

type: Literal["connection_status"] = "connection_status"


@dataclass
class RealtimeTransportTurnStartedEvent:
"""Triggered when the model starts generating a response for a turn."""

type: Literal["turn_started"] = "turn_started"


@dataclass
class RealtimeTransportTurnEndedEvent:
"""Triggered when the model finishes generating a response for a turn."""

type: Literal["turn_ended"] = "turn_ended"


@dataclass
class RealtimeTransportOtherEvent:
"""Used as a catchall for vendor-specific events."""

data: Any

type: Literal["other"] = "other"


# TODO (rm) Add usage events


RealtimeTransportEvent: TypeAlias = Union[
RealtimeTransportErrorEvent,
RealtimeTransportToolCallEvent,
RealtimeTransportAudioEvent,
RealtimeTransportAudioInterruptedEvent,
RealtimeTransportAudioDoneEvent,
RealtimeTransportInputAudioTranscriptionCompletedEvent,
RealtimeTransportTranscriptDelta,
RealtimeTransportItemUpdatedEvent,
RealtimeTransportItemDeletedEvent,
RealtimeTransportConnectionStatusEvent,
RealtimeTransportTurnStartedEvent,
RealtimeTransportTurnEndedEvent,
RealtimeTransportOtherEvent,
]
12 changes: 12 additions & 0 deletions tests/realtime/test_transport_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import get_args

from agents.realtime.transport_events import RealtimeTransportEvent


def test_all_events_have_type() -> None:
"""Test that all events have a type."""
events = get_args(RealtimeTransportEvent)
assert len(events) > 0
for event in events:
assert event.type is not None
assert isinstance(event.type, str)