-
Notifications
You must be signed in to change notification settings - Fork 497
feat(agents-realtime): add fetch-based WebSocket upgrade for Cloudflare/workerd #502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(agents-realtime): add fetch-based WebSocket upgrade for Cloudflare/workerd #502
Conversation
🦋 Changeset detectedLatest commit: eca41e6 The changes in this PR will be included in the next version bump. This PR includes changesets to release 3 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
Thanks for sending this PR. This project already has |
|
Adding it to the extensions would definitely be ideal! For context, I previously tried an approach that did not modify OpenAIRealtimeWebSocket. However, I abandoned that approach for these reasons: Aside from the timing of WebSocket creation and the post-open handling, the implementation is identical. Copy-pasting would create a large amount of duplicated code, which would be hard to maintain going forward. In particular, event emission is a core behavior of the library, so duplicating it felt wrong. Utilities like HEADERS and base64ToArrayBuffer are not exported outside the package, which makes it difficult to implement this in the extensions package (in the example above, I ended up re-implementing them inside agents-realtime). I see a few possible approaches: A. Allow the OpenAIRealtimeWebSocket constructor to accept a WebSocket factory and post-initialization hook(s), so the creation and “on open” lifecycle can be injected. (This would slightly complicate the public constructor interface.) B. Export utilities like HEADERS and base64ToArrayBuffer so they’re usable from extensions, then copy OpenAIRealtimeWebSocket and implement CloudflareRealtimeTransport on the extensions side. (This would work but increases maintenance burden due to duplication.) Which approach do you think would fit the project best? I’m also open to other suggestions if there’s a cleaner path I’m missing. |
|
I prefer the option A. I haven't verified this really works but what do you think about something like this? diff --git a/packages/agents-extensions/src/CloudflareRealtimeTransport.ts b/packages/agents-extensions/src/CloudflareRealtimeTransport.ts
new file mode 100644
index 0000000..3778f1c
--- /dev/null
+++ b/packages/agents-extensions/src/CloudflareRealtimeTransport.ts
@@ -0,0 +1,50 @@
+import {
+ RealtimeTransportLayer,
+ OpenAIRealtimeWebSocket,
+ OpenAIRealtimeWebSocketOptions,
+} from '@openai/agents/realtime';
+
+export class CloudflareRealtimeTransportLayer
+ extends OpenAIRealtimeWebSocket
+ implements RealtimeTransportLayer {
+ protected _audioLengthMs: number = 0;
+
+ constructor(options: OpenAIRealtimeWebSocketOptions) {
+ super({
+ ...options,
+ createWebSocket: async ({ url, apiKey }) => {
+ return await this.#buildCloudflareWebSocket({ url, apiKey });
+ },
+ skipOpenEventListeners: true,
+ });
+ }
+
+ async #buildCloudflareWebSocket({ url, apiKey }: { url: string, apiKey: string }): Promise<WebSocket> {
+ const transformedUrl = url.replace(/^ws/i, 'http');
+ if (!transformedUrl) {
+ throw new Error('Realtime URL is not defined');
+ }
+
+ const response = await fetch(transformedUrl, {
+ method: 'GET',
+ headers: {
+ Authorization: `Bearer ${apiKey}`,
+ 'Sec-WebSocket-Protocol': 'realtime',
+ Connection: 'Upgrade',
+ Upgrade: 'websocket',
+ ...this.getCommonRequestHeaders(),
+ },
+ });
+
+ const upgradedSocket = (response as any).webSocket;
+ if (!upgradedSocket) {
+ const body = await response.text().catch(() => '');
+ throw new Error(
+ `Failed to upgrade websocket: ${response.status} ${body}`,
+ );
+ }
+
+ upgradedSocket.accept();
+ return upgradedSocket as unknown as WebSocket;
+ }
+}
diff --git a/packages/agents-extensions/src/index.ts b/packages/agents-extensions/src/index.ts
index b1cafe0..3b571f4 100644
--- a/packages/agents-extensions/src/index.ts
+++ b/packages/agents-extensions/src/index.ts
@@ -1,2 +1,3 @@
-export * from './TwilioRealtimeTransport';
export * from './aiSdk';
+export * from './CloudflareRealtimeTransport';
+export * from './TwilioRealtimeTransport';
diff --git a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts
index 22f1ba1..6ee0d85 100644
--- a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts
+++ b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts
@@ -35,6 +35,11 @@ export type WebSocketState =
websocket: WebSocket;
};
+export interface CreateWebSocketOptions {
+ url: string;
+ apiKey: string;
+}
+
/**
* The options for the OpenAI Realtime WebSocket transport layer.
*/
@@ -51,13 +56,25 @@ export type OpenAIRealtimeWebSocketOptions = {
* The URL to use for the WebSocket connection.
*/
url?: string;
+
/**
- * Cloudflare Workers/workerd only: enable the fetch-based WebSocket upgrade.
- * Set to `true` only on Cloudflare (including Miniflare). Other environments
- * use the default `WebSocket` constructor path. See `connectViaFetchUpgrade()`
- * for details.
+ * Builds a new WebSocket connection.
+ * @param options - The options for the WebSocket connection.
+ * @returns The WebSocket connection.
+ */
+ createWebSocket?: (options: CreateWebSocketOptions) => Promise<WebSocket>;
+
+ /**
+ * When you pass your own createWebSocket function, which completes the connection state transition,
+ * you can set this to true to skip registering the `open` event listener for the same purpose.
+ * If this flag is set to true, the constructor will immediately call the internal operation
+ * to mark the internal connection state to `connected`. Otherwise, the constructor will register
+ * the `open` event listener and wait for it to be triggered.
+ *
+ * By default (meaning if this property is absent), this is set to false.
*/
- useWorkersFetchUpgrade?: boolean;
+ skipOpenEventListeners?: boolean;
+
} & OpenAIRealtimeBaseOptions;
/**
@@ -71,8 +88,6 @@ export class OpenAIRealtimeWebSocket
{
#apiKey: string | undefined;
#url: string | undefined;
- /** Cloudflare/workerd only; see `connectViaFetchUpgrade()`. */
- useWorkersFetchUpgrade?: boolean;
#state: WebSocketState = {
status: 'disconnected',
websocket: undefined,
@@ -91,11 +106,19 @@ export class OpenAIRealtimeWebSocket
protected _audioLengthMs: number = 0;
#ongoingResponse: boolean = false;
+ #createWebSocket?: (options: CreateWebSocketOptions) => Promise<WebSocket>;
+ #skipOpenEventListeners?: boolean;
+
constructor(options: OpenAIRealtimeWebSocketOptions = {}) {
super(options);
this.#url = options.url;
this.#useInsecureApiKey = options.useInsecureApiKey ?? false;
- this.useWorkersFetchUpgrade = options.useWorkersFetchUpgrade ?? false;
+ this.#createWebSocket = options.createWebSocket;
+ this.#skipOpenEventListeners = options.skipOpenEventListeners ?? false;
+ }
+
+ protected getCommonRequestHeaders() {
+ return HEADERS;
}
/**
@@ -166,8 +189,8 @@ export class OpenAIRealtimeWebSocket
let ws: WebSocket | null = null;
- if (this.useWorkersFetchUpgrade) {
- ws = await this.connectViaFetchUpgrade();
+ if (this.#createWebSocket) {
+ ws = await this.#createWebSocket({ url: this.#url!, apiKey: this.#apiKey });
} else {
// browsers and workerd should use the protocols argument, node should use the headers argument
const websocketArguments = useWebSocketProtocols
@@ -181,7 +204,7 @@ export class OpenAIRealtimeWebSocket
: {
headers: {
Authorization: `Bearer ${this.#apiKey}`,
- ...HEADERS,
+ ...this.getCommonRequestHeaders(),
},
};
@@ -203,7 +226,9 @@ export class OpenAIRealtimeWebSocket
resolve();
};
- if (this.useWorkersFetchUpgrade) {
+ if (this.#skipOpenEventListeners === true) {
+ // Some platforms like Cloudflare's fetch-upgrade mode do not rely on `open` events
+ // for transitioning the connection state.
onSocketOpenReady();
} else {
ws.addEventListener('open', onSocketOpenReady);
@@ -439,46 +464,4 @@ export class OpenAIRealtimeWebSocket
this._audioLengthMs = 0;
this.#currentAudioContentIndex = undefined;
}
-
- /**
- * Connect using the Cloudflare/workerd Fetch-based WebSocket upgrade path.
- *
- * workerd does not support creating client WebSockets via the global `WebSocket`
- * constructor. Instead, a `fetch()` request with `Upgrade: websocket` is issued and
- * the upgraded socket is returned as `response.webSocket`, which must be accepted
- * via `accept()`.
- *
- * This method is intended for use when `useWorkersFetchUpgrade` is `true` and the
- * code is running under Cloudflare Workers (or compatible workerd runtimes like
- * Miniflare). Other environments should use the normal constructor-based path.
- */
- async connectViaFetchUpgrade(): Promise<WebSocket> {
- const transformedUrl = this.#url?.replace(/^ws/i, 'http');
- if (!transformedUrl) {
- throw new Error('Realtime URL is not defined');
- }
-
- const response = await fetch(transformedUrl, {
- method: 'GET',
- headers: {
- Authorization: `Bearer ${this.#apiKey}`,
- // 'OpenAI-Beta': 'realtime=v1',
- 'Sec-WebSocket-Protocol': 'realtime',
- Connection: 'Upgrade',
- Upgrade: 'websocket',
- ...HEADERS,
- },
- });
-
- const upgradedSocket = (response as any).webSocket;
- if (!upgradedSocket) {
- const body = await response.text().catch(() => '');
- throw new Error(
- `Failed to upgrade websocket: ${response.status} ${body}`,
- );
- }
-
- upgradedSocket.accept();
- return upgradedSocket as unknown as WebSocket;
- }
}
|
|
I thought this was an ideal solution. I actually applied it in a real project, and it’s working without any issues. Thank you very much. Will this fix be incorporated on your side, or should I go ahead and make the changes myself? |
|
If you update this PR, we're happy to have the change as your contribution! 🙌 If you're busy for other things now, I can take time to complete this on behalf of you. |
cc4fb84 to
b2fe19d
Compare
| @@ -0,0 +1,48 @@ | |||
| --- | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have document updates in this PR, we have to release a new version including the change right after merging the PR. Can you make document changes in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I’ll create a separate PR.
| @@ -0,0 +1,48 @@ | |||
| --- | |||
| title: Using Realtime Agents on Cloudflare Workers | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks a bit long in the side bar; how about "Realtime Agents on Cloudflare"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I’ll do that.
|
|
||
| // Create a transport that connects to OpenAI Realtime via Cloudflare/workerd's fetch-based upgrade. | ||
| const cfTransport = new CloudflareRealtimeTransportLayer({ | ||
| url: 'wss://api.openai.com/v1/realtime?model=gpt-4o-realtime', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the latest version supports only gpt-realtime model
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ll fix it.
| sendSpy.mockRestore(); | ||
| }); | ||
|
|
||
| // Cloudflare/workerd fetch-upgrade tests removed. Use CloudflareRealtimeTransportLayer in extensions instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve fixed it.
09ad12c to
d13eb93
Compare
…pgrade - Add CloudflareRealtimeTransportLayer (factory via createWebSocket; skip open listener). - Add tests for Cloudflare transport; remove legacy fetch-upgrade tests in agents-realtime. - Add concise JSDoc with Cloudflare Response API reference. - Add Twilio transport JSDoc reference.
d13eb93 to
37e8b64
Compare
|
I’ve incorporated your patch and recreated the pull request. In addition to the patch, I’ve added tests and updated the JSDoc. |
Summary
useWorkersFetchUpgradeoption toOpenAIRealtimeWebSocketfor Cloudflare/workerd (Cloudflare Workers: WebSockets — Make outbound WebSocket connections).connectViaFetchUpgrade()to establish WS viafetch()+Upgrade: websocket(see same docs above).Motivation
Cloudflare Workers and other workerd runtimes (e.g., Miniflare) cannot create client WebSockets via the global
WebSocketconstructor. Instead, Cloudflare’s documentation specifies using an HTTPfetch()upgrade flow that returnsresponse.webSocket, which must beaccept()ed (docs). This PR enables first-class support for that environment. For additional runtime context, see Durable Objects and WebSockets.Changes
useWorkersFetchUpgrade?: booleanonOpenAIRealtimeWebSocketOptions.connectViaFetchUpgrade()performs the GET + upgrade headers and acceptsresponse.webSocket.connectedwithout relying on an'open'event.connectViaFetchUpgrade().Before/After
useWorkersFetchUpgrade: trueto connect under Cloudflare/workerd while preserving default behavior elsewhere.Example:
Tests
packages/agents-realtime/test/openaiRealtimeWebsocket.test.tsws:→http:), required headers, and connection transitions in fetch-upgrade mode.response.webSocketis missing.Docs
connectViaFetchUpgrade().Breaking Impact
Checklist