Skip to content

Commit 03c45f5

Browse files
authored
Merge pull request #758 from Portkey-AI/feat/openai-realtime-api
Feat: openai and azure-openai realtime api support
2 parents 34e63eb + 1694e7d commit 03c45f5

File tree

15 files changed

+560
-30
lines changed

15 files changed

+560
-30
lines changed

package-lock.json

Lines changed: 41 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@
4242
"dependencies": {
4343
"@aws-crypto/sha256-js": "^5.2.0",
4444
"@hono/node-server": "^1.3.3",
45+
"@hono/node-ws": "^1.0.4",
4546
"@portkey-ai/mustache": "^2.1.2",
4647
"@smithy/signature-v4": "^2.1.1",
4748
"@types/mustache": "^4.2.5",
4849
"async-retry": "^1.3.3",
49-
"hono": "^3.12.0",
50+
"hono": "^4.6.10",
51+
"ws": "^8.18.0",
5052
"zod": "^3.22.4"
5153
},
5254
"devDependencies": {
@@ -57,6 +59,7 @@
5759
"@types/async-retry": "^1.4.5",
5860
"@types/jest": "^29.5.12",
5961
"@types/node": "20.8.3",
62+
"@types/ws": "^8.5.12",
6063
"husky": "^9.1.4",
6164
"jest": "^29.7.0",
6265
"prettier": "3.2.5",

src/handlers/handlerUtils.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ export async function tryPostProxy(
262262
providerOptions: providerOption,
263263
fn,
264264
gatewayRequestBody: params,
265+
gatewayRequestURL: c.req.url,
265266
});
266267

267268
const url = endpoint
@@ -517,6 +518,7 @@ export async function tryPost(
517518
providerOptions: providerOption,
518519
fn,
519520
gatewayRequestBody: params,
521+
gatewayRequestURL: c.req.url,
520522
});
521523
const url = `${baseUrl}${endpoint}`;
522524

@@ -1004,6 +1006,7 @@ export function updateResponseHeaders(
10041006
// Delete content-length header to avoid conflicts with hono compress middleware
10051007
// workerd environment handles this authomatically
10061008
response.headers.delete('content-length');
1009+
response.headers.delete('transfer-encoding');
10071010
}
10081011

10091012
export function constructConfigFromRequestHeaders(
@@ -1021,6 +1024,9 @@ export function constructConfigFromRequestHeaders(
10211024
requestHeaders[`x-${POWERED_BY}-azure-entra-client-secret`],
10221025
azureEntraTenantId: requestHeaders[`x-${POWERED_BY}-azure-entra-tenant-id`],
10231026
azureModelName: requestHeaders[`x-${POWERED_BY}-azure-model-name`],
1027+
openaiBeta:
1028+
requestHeaders[`x-${POWERED_BY}-openai-beta`] ||
1029+
requestHeaders[`openai-beta`],
10241030
};
10251031

10261032
const stabilityAiConfig = {
@@ -1058,6 +1064,9 @@ export function constructConfigFromRequestHeaders(
10581064
const openAiConfig = {
10591065
openaiOrganization: requestHeaders[`x-${POWERED_BY}-openai-organization`],
10601066
openaiProject: requestHeaders[`x-${POWERED_BY}-openai-project`],
1067+
openaiBeta:
1068+
requestHeaders[`x-${POWERED_BY}-openai-beta`] ||
1069+
requestHeaders[`openai-beta`],
10611070
};
10621071

10631072
const huggingfaceConfig = {

src/handlers/realtimeHandler.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { Context } from 'hono';
2+
import { constructConfigFromRequestHeaders } from './handlerUtils';
3+
import { ProviderAPIConfig } from '../providers/types';
4+
import Providers from '../providers';
5+
import { Options } from '../types/requestBody';
6+
import {
7+
addListeners,
8+
getOptionsForOutgoingConnection,
9+
getURLForOutgoingConnection,
10+
} from './websocketUtils';
11+
import { RealtimeLlmEventParser } from '../services/realtimeLlmEventParser';
12+
13+
const getOutgoingWebSocket = async (url: string, options: RequestInit) => {
14+
let outgoingWebSocket: WebSocket | null = null;
15+
try {
16+
let response = await fetch(url, options);
17+
outgoingWebSocket = response.webSocket;
18+
} catch (error) {
19+
console.log(error);
20+
}
21+
22+
if (!outgoingWebSocket) {
23+
throw new Error('WebSocket connection failed');
24+
}
25+
26+
outgoingWebSocket.accept();
27+
return outgoingWebSocket;
28+
};
29+
30+
export async function realTimeHandler(c: Context): Promise<Response> {
31+
try {
32+
const requestHeaders = Object.fromEntries(c.req.raw.headers);
33+
34+
const providerOptions = constructConfigFromRequestHeaders(
35+
requestHeaders
36+
) as Options;
37+
const provider = providerOptions.provider ?? '';
38+
const apiConfig: ProviderAPIConfig = Providers[provider].api;
39+
const url = getURLForOutgoingConnection(
40+
apiConfig,
41+
providerOptions,
42+
c.req.url
43+
);
44+
const options = await getOptionsForOutgoingConnection(
45+
apiConfig,
46+
providerOptions,
47+
url,
48+
c
49+
);
50+
51+
const sessionOptions = {
52+
id: crypto.randomUUID(),
53+
providerOptions: {
54+
...providerOptions,
55+
requestURL: url,
56+
rubeusURL: 'realtime',
57+
},
58+
requestHeaders,
59+
requestParams: {},
60+
};
61+
62+
const webSocketPair = new WebSocketPair();
63+
const client = webSocketPair[0];
64+
const server = webSocketPair[1];
65+
66+
server.accept();
67+
68+
let outgoingWebSocket: WebSocket = await getOutgoingWebSocket(url, options);
69+
const eventParser = new RealtimeLlmEventParser();
70+
addListeners(outgoingWebSocket, eventParser, server, c, sessionOptions);
71+
72+
return new Response(null, {
73+
status: 101,
74+
webSocket: client,
75+
});
76+
} catch (err: any) {
77+
console.log('realtimeHandler error', err.message);
78+
return new Response(
79+
JSON.stringify({
80+
status: 'failure',
81+
message: 'Something went wrong',
82+
}),
83+
{
84+
status: 500,
85+
headers: {
86+
'content-type': 'application/json',
87+
},
88+
}
89+
);
90+
}
91+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { Context } from 'hono';
2+
import { constructConfigFromRequestHeaders } from './handlerUtils';
3+
import WebSocket from 'ws';
4+
import { ProviderAPIConfig } from '../providers/types';
5+
import Providers from '../providers';
6+
import { Options } from '../types/requestBody';
7+
import { RealtimeLlmEventParser } from '../services/realtimeLlmEventParser';
8+
import { WSContext, WSEvents } from 'hono/ws';
9+
10+
export async function realTimeHandlerNode(
11+
c: Context
12+
): Promise<WSEvents<unknown>> {
13+
try {
14+
let incomingWebsocket: WSContext<unknown> | null = null;
15+
const requestHeaders = Object.fromEntries(c.req.raw.headers);
16+
const camelCaseConfig = constructConfigFromRequestHeaders(requestHeaders);
17+
18+
const provider = camelCaseConfig?.provider ?? '';
19+
const apiConfig: ProviderAPIConfig = Providers[provider].api;
20+
const providerOptions = camelCaseConfig as Options;
21+
const baseUrl = apiConfig.getBaseURL({ providerOptions });
22+
const endpoint = apiConfig.getEndpoint({
23+
providerOptions,
24+
fn: 'realtime',
25+
gatewayRequestBody: {},
26+
gatewayRequestURL: c.req.url,
27+
});
28+
let url = `${baseUrl}${endpoint}`;
29+
url = url.replace('https://', 'wss://');
30+
const headers = await apiConfig.headers({
31+
c,
32+
providerOptions,
33+
fn: 'realtime',
34+
transformedRequestUrl: url,
35+
transformedRequestBody: {},
36+
});
37+
38+
const sessionOptions = {
39+
id: crypto.randomUUID(),
40+
providerOptions: {
41+
...providerOptions,
42+
requestURL: url,
43+
rubeusURL: 'realtime',
44+
},
45+
requestHeaders,
46+
requestParams: {},
47+
};
48+
49+
const outgoingWebSocket = new WebSocket(url, {
50+
headers,
51+
});
52+
const eventParser = new RealtimeLlmEventParser();
53+
54+
outgoingWebSocket.addEventListener('message', (event) => {
55+
incomingWebsocket?.send(event.data as string);
56+
try {
57+
const parsedData = JSON.parse(event.data as string);
58+
eventParser.handleEvent(c, parsedData, sessionOptions);
59+
} catch (err: any) {
60+
console.error(`eventParser.handleEvent error: ${err.message}`);
61+
}
62+
});
63+
64+
outgoingWebSocket.addEventListener('close', (event) => {
65+
incomingWebsocket?.close(event.code, event.reason);
66+
});
67+
68+
outgoingWebSocket.addEventListener('error', (event) => {
69+
console.error(`outgoingWebSocket error: ${event.message}`);
70+
incomingWebsocket?.close();
71+
});
72+
73+
return {
74+
onOpen(evt, ws) {
75+
incomingWebsocket = ws;
76+
},
77+
onMessage(event) {
78+
outgoingWebSocket?.send(event.data as string);
79+
},
80+
onError(evt) {
81+
console.error(`incomingWebsocket error: ${evt.type}`);
82+
outgoingWebSocket?.close();
83+
},
84+
onClose() {
85+
outgoingWebSocket?.close();
86+
},
87+
};
88+
} catch (err) {
89+
c.set('websocketError', true);
90+
return {
91+
onOpen() {},
92+
onMessage() {},
93+
onError() {},
94+
onClose() {},
95+
};
96+
}
97+
}

0 commit comments

Comments
 (0)