Skip to content

Commit d13eb93

Browse files
committed
feat(agents-extensions): Cloudflare transport for Workers via fetch upgrade
- Add CloudflareRealtimeTransportLayer (factory via createWebSocket; skip open listener). - Add tests for Cloudflare transport; remove legacy fetch-upgrade tests in agents-realtime. - Add concise JSDoc with Cloudflare Response API reference. - Add Twilio transport JSDoc reference.
1 parent 3ce9513 commit d13eb93

File tree

5 files changed

+241
-25
lines changed

5 files changed

+241
-25
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'@openai/agents-realtime': minor
3+
'@openai/agents-extensions': minor
4+
---
5+
6+
feat: add factory-based Cloudflare support.
7+
8+
- Realtime (WebSocket): add `createWebSocket` and `skipOpenEventListeners` options to enable
9+
custom socket creation and connection state control for specialized runtimes.
10+
- Extensions: add `CloudflareRealtimeTransportLayer`, which performs a `fetch()`-based WebSocket
11+
upgrade on Cloudflare/workerd and integrates via the WebSocket factory.
12+
- Docs: add a Cloudflare extensions page and example snippet.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import {
2+
RealtimeTransportLayer,
3+
OpenAIRealtimeWebSocket,
4+
OpenAIRealtimeWebSocketOptions,
5+
} from '@openai/agents/realtime';
6+
7+
/**
8+
* An adapter transport for Cloudflare Workers (workerd) environments.
9+
*
10+
* Cloudflare Workers cannot open outbound client WebSockets using the global `WebSocket`
11+
* constructor. Instead, a `fetch()` request with `Upgrade: websocket` must be performed and the
12+
* returned `response.webSocket` must be `accept()`ed. This transport encapsulates that pattern and
13+
* plugs into the Realtime SDK via the factory-based `createWebSocket` option.
14+
*
15+
* It behaves like `OpenAIRealtimeWebSocket`, but establishes the connection using `fetch()` and
16+
* sets `skipOpenEventListeners: true` since workerd sockets do not emit a traditional `open`
17+
* event after acceptance.
18+
*
19+
* Reference: Response API — `response.webSocket` (Cloudflare Workers).
20+
* https://developers.cloudflare.com/workers/runtime-apis/response/.
21+
*/
22+
export class CloudflareRealtimeTransportLayer
23+
extends OpenAIRealtimeWebSocket
24+
implements RealtimeTransportLayer
25+
{
26+
protected _audioLengthMs: number = 0;
27+
28+
constructor(options: OpenAIRealtimeWebSocketOptions) {
29+
super({
30+
...options,
31+
createWebSocket: async ({ url, apiKey }) => {
32+
return await this.#buildCloudflareWebSocket({ url, apiKey });
33+
},
34+
skipOpenEventListeners: true,
35+
});
36+
}
37+
38+
/**
39+
* Builds a WebSocket using Cloudflare's `fetch()` + `Upgrade: websocket` flow and accepts it.
40+
* Transforms `ws(s)` to `http(s)` for the upgrade request and forwards standard headers.
41+
*/
42+
async #buildCloudflareWebSocket({
43+
url,
44+
apiKey,
45+
}: {
46+
url: string;
47+
apiKey: string;
48+
}): Promise<WebSocket> {
49+
const transformedUrl = url.replace(/^ws/i, 'http');
50+
if (!transformedUrl) {
51+
throw new Error('Realtime URL is not defined');
52+
}
53+
54+
const response = await fetch(transformedUrl, {
55+
method: 'GET',
56+
headers: {
57+
Authorization: `Bearer ${apiKey}`,
58+
'Sec-WebSocket-Protocol': 'realtime',
59+
Connection: 'Upgrade',
60+
Upgrade: 'websocket',
61+
...this.getCommonRequestHeaders(),
62+
},
63+
});
64+
65+
const upgradedSocket = (response as any).webSocket;
66+
if (!upgradedSocket) {
67+
const body = await response.text().catch(() => '');
68+
throw new Error(
69+
`Failed to upgrade websocket: ${response.status} ${body}`,
70+
);
71+
}
72+
73+
upgradedSocket.accept();
74+
return upgradedSocket as unknown as WebSocket;
75+
}
76+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
export * from './TwilioRealtimeTransport';
21
export * from './aiSdk';
2+
export * from './CloudflareRealtimeTransport';
3+
export * from './TwilioRealtimeTransport';
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
2+
import { CloudflareRealtimeTransportLayer } from '../src/CloudflareRealtimeTransport';
3+
4+
class FakeWorkersWebSocket {
5+
url: string;
6+
listeners: Record<string, ((ev: any) => void)[]> = {};
7+
accepted = false;
8+
constructor(url: string) {
9+
this.url = url;
10+
}
11+
addEventListener(type: string, listener: (ev: any) => void) {
12+
this.listeners[type] = this.listeners[type] || [];
13+
this.listeners[type].push(listener);
14+
}
15+
accept() {
16+
this.accepted = true;
17+
}
18+
send(_data: any) {}
19+
close() {
20+
this.emit('close', {});
21+
}
22+
emit(type: string, ev: any) {
23+
(this.listeners[type] || []).forEach((fn) => fn(ev));
24+
}
25+
}
26+
27+
describe('CloudflareRealtimeTransportLayer', () => {
28+
let savedFetch: any;
29+
30+
beforeEach(() => {
31+
savedFetch = (globalThis as any).fetch;
32+
});
33+
34+
afterEach(() => {
35+
(globalThis as any).fetch = savedFetch;
36+
});
37+
38+
it('connects via fetch upgrade and emits connection changes', async () => {
39+
const fakeSocket = new FakeWorkersWebSocket('ws://example');
40+
const fetchSpy = vi.fn().mockResolvedValue({
41+
webSocket: fakeSocket,
42+
status: 101,
43+
text: vi.fn().mockResolvedValue(''),
44+
});
45+
(globalThis as any).fetch = fetchSpy;
46+
47+
const transport = new CloudflareRealtimeTransportLayer({
48+
url: 'wss://api.openai.com/v1/realtime?model=foo',
49+
});
50+
51+
const statuses: string[] = [];
52+
transport.on('connection_change', (s) => statuses.push(s));
53+
54+
await transport.connect({ apiKey: 'ek_test', model: 'foo' });
55+
56+
expect(fetchSpy).toHaveBeenCalledTimes(1);
57+
// wss -> https
58+
expect(fetchSpy.mock.calls[0][0]).toBe(
59+
'https://api.openai.com/v1/realtime?model=foo',
60+
);
61+
const init = fetchSpy.mock.calls[0][1];
62+
expect(init.method).toBe('GET');
63+
expect(init.headers['Authorization']).toBe('Bearer ek_test');
64+
expect(init.headers['Upgrade']).toBe('websocket');
65+
expect(init.headers['Connection']).toBe('Upgrade');
66+
expect(init.headers['Sec-WebSocket-Protocol']).toBe('realtime');
67+
68+
// connected without relying on 'open' listener.
69+
expect(statuses).toEqual(['connecting', 'connected']);
70+
});
71+
72+
it('propagates fetch-upgrade failures with detailed error', async () => {
73+
const fetchSpy = vi.fn().mockResolvedValue({
74+
status: 400,
75+
text: vi.fn().mockResolvedValue('No upgrade'),
76+
});
77+
(globalThis as any).fetch = fetchSpy;
78+
79+
const transport = new CloudflareRealtimeTransportLayer({
80+
url: 'wss://api.openai.com/v1/realtime?model=bar',
81+
});
82+
83+
await expect(
84+
transport.connect({ apiKey: 'ek_x', model: 'bar' }),
85+
).rejects.toThrow('Failed to upgrade websocket: 400 No upgrade');
86+
});
87+
});

packages/agents-realtime/src/openaiRealtimeWebsocket.ts

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ export type WebSocketState =
3535
websocket: WebSocket;
3636
};
3737

38+
export interface CreateWebSocketOptions {
39+
url: string;
40+
apiKey: string;
41+
}
42+
3843
/**
3944
* The options for the OpenAI Realtime WebSocket transport layer.
4045
*/
@@ -51,6 +56,22 @@ export type OpenAIRealtimeWebSocketOptions = {
5156
* The URL to use for the WebSocket connection.
5257
*/
5358
url?: string;
59+
/**
60+
* Builds a new WebSocket connection.
61+
* @param options - The options for the WebSocket connection.
62+
* @returns The WebSocket connection.
63+
*/
64+
createWebSocket?: (options: CreateWebSocketOptions) => Promise<WebSocket>;
65+
/**
66+
* When you pass your own createWebSocket function, which completes the connection state transition,
67+
* you can set this to true to skip registering the `open` event listener for the same purpose.
68+
* If this flag is set to true, the constructor will immediately call the internal operation
69+
* to mark the internal connection state to `connected`. Otherwise, the constructor will register
70+
* the `open` event listener and wait for it to be triggered.
71+
*
72+
* By default (meaning if this property is absent), this is set to false.
73+
*/
74+
skipOpenEventListeners?: boolean;
5475
} & OpenAIRealtimeBaseOptions;
5576

5677
/**
@@ -81,11 +102,19 @@ export class OpenAIRealtimeWebSocket
81102
protected _firstAudioTimestamp: number | undefined;
82103
protected _audioLengthMs: number = 0;
83104
#ongoingResponse: boolean = false;
105+
#createWebSocket?: (options: CreateWebSocketOptions) => Promise<WebSocket>;
106+
#skipOpenEventListeners?: boolean;
84107

85108
constructor(options: OpenAIRealtimeWebSocketOptions = {}) {
86109
super(options);
87110
this.#url = options.url;
88111
this.#useInsecureApiKey = options.useInsecureApiKey ?? false;
112+
this.#createWebSocket = options.createWebSocket;
113+
this.#skipOpenEventListeners = options.skipOpenEventListeners ?? false;
114+
}
115+
116+
protected getCommonRequestHeaders() {
117+
return HEADERS;
89118
}
90119

91120
/**
@@ -128,7 +157,7 @@ export class OpenAIRealtimeWebSocket
128157
this.emit('audio', audioEvent);
129158
}
130159

131-
#setupWebSocket(
160+
async #setupWebSocket(
132161
resolve: (value: void) => void,
133162
reject: (reason?: any) => void,
134163
sessionConfig: Partial<RealtimeSessionConfig>,
@@ -154,38 +183,53 @@ export class OpenAIRealtimeWebSocket
154183
);
155184
}
156185

157-
// browsers and workerd should use the protocols argument, node should use the headers argument
158-
const websocketArguments = useWebSocketProtocols
159-
? [
160-
'realtime',
161-
// Auth
162-
'openai-insecure-api-key.' + this.#apiKey,
163-
// Version header
164-
WEBSOCKET_META,
165-
]
166-
: {
167-
headers: {
168-
Authorization: `Bearer ${this.#apiKey}`,
169-
...HEADERS,
170-
},
171-
};
186+
let ws: WebSocket | null = null;
172187

173-
const ws = new WebSocket(this.#url!, websocketArguments as any);
188+
if (this.#createWebSocket) {
189+
ws = await this.#createWebSocket({
190+
url: this.#url!,
191+
apiKey: this.#apiKey,
192+
});
193+
} else {
194+
// browsers and workerd should use the protocols argument, node should use the headers argument
195+
const websocketArguments = useWebSocketProtocols
196+
? [
197+
'realtime',
198+
// Auth
199+
'openai-insecure-api-key.' + this.#apiKey,
200+
// Version header
201+
WEBSOCKET_META,
202+
]
203+
: {
204+
headers: {
205+
Authorization: `Bearer ${this.#apiKey}`,
206+
...this.getCommonRequestHeaders(),
207+
},
208+
};
209+
210+
ws = new WebSocket(this.#url!, websocketArguments as any);
211+
}
174212
this.#state = {
175213
status: 'connecting',
176214
websocket: ws,
177215
};
178216
this.emit('connection_change', this.#state.status);
179217

180-
ws.addEventListener('open', () => {
218+
const onSocketOpenReady = () => {
181219
this.#state = {
182220
status: 'connected',
183221
websocket: ws,
184222
};
185223
this.emit('connection_change', this.#state.status);
186224
this._onOpen();
187225
resolve();
188-
});
226+
};
227+
228+
if (this.#skipOpenEventListeners === true) {
229+
onSocketOpenReady();
230+
} else {
231+
ws.addEventListener('open', onSocketOpenReady);
232+
}
189233

190234
ws.addEventListener('error', (error) => {
191235
this._onError(error);
@@ -292,11 +336,7 @@ export class OpenAIRealtimeWebSocket
292336
};
293337

294338
await new Promise<void>((resolve, reject) => {
295-
try {
296-
this.#setupWebSocket(resolve, reject, sessionConfig);
297-
} catch (error) {
298-
reject(error);
299-
}
339+
this.#setupWebSocket(resolve, reject, sessionConfig).catch(reject);
300340
});
301341

302342
await this.updateSessionConfig(sessionConfig);

0 commit comments

Comments
 (0)