Skip to content

Commit 626cb3c

Browse files
refactor(api): make WebSocket proxy generic over STT providers (#2080)
* refactor(api): make WebSocket proxy generic over STT providers - Rename DeepgramProxyConnection to WsProxyConnection - Add WsProxyOptions for configurable headers and control message types - Add x-owh-upstream-url and x-owh-upstream-auth headers for Rust client to specify upstream - Add createProxyFromRequest factory that reads headers to determine upstream - Keep createDeepgramProxy and buildDeepgramUrl for backwards compatibility - Update listen.ts to use the new generic createProxyFromRequest Co-Authored-By: yujonglee <[email protected]> * fix(api): wrap createProxyFromRequest in try block for URL parsing errors - Move createProxyFromRequest call inside try block to catch URL parsing errors - Change error string from 'upstream_unavailable' to 'upstream_connect_failed' Co-Authored-By: yujonglee <[email protected]> * rename * split files --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent b52a71d commit 626cb3c

File tree

8 files changed

+238
-101
lines changed

8 files changed

+238
-101
lines changed

apps/api/src/env.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export const env = createEnv({
1414
STRIPE_WEBHOOK_SECRET: z.string().min(1),
1515
OPENROUTER_API_KEY: z.string().min(1),
1616
DEEPGRAM_API_KEY: z.string().min(1),
17+
ASSEMBLYAI_API_KEY: z.string().min(1).optional(),
18+
SONIOX_API_KEY: z.string().min(1).optional(),
1719
},
1820
runtimeEnv: Bun.env,
1921
emptyStringAsUndefined: true,

apps/api/src/listen.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,24 @@ import type { Handler } from "hono";
22
import { upgradeWebSocket } from "hono/bun";
33

44
import {
5-
buildDeepgramUrl,
6-
DeepgramProxyConnection,
5+
createProxyFromRequest,
76
normalizeWsData,
8-
} from "./deepgram";
7+
WsProxyConnection,
8+
} from "./stt";
99

1010
export const listenSocketHandler: Handler = async (c, next) => {
1111
const clientUrl = new URL(c.req.url, "http://localhost");
12-
const deepgramUrl = buildDeepgramUrl(clientUrl).toString();
1312

14-
const connection = new DeepgramProxyConnection(deepgramUrl);
13+
let connection: WsProxyConnection;
1514
try {
15+
connection = createProxyFromRequest(clientUrl, c.req.raw.headers);
1616
await connection.preconnectUpstream();
1717
} catch (error) {
1818
console.error("Failed to establish upstream connection", error);
1919
const detail =
20-
error instanceof Error ? error.message : "upstream_unavailable";
20+
error instanceof Error ? error.message : "upstream_connect_failed";
2121
const status = detail === "upstream_connect_timeout" ? 504 : 502;
22-
return c.json({ error: "upstream_unavailable", detail }, status);
22+
return c.json({ error: "upstream_connect_failed", detail }, status);
2323
}
2424

2525
const handler = upgradeWebSocket(() => {

apps/api/src/stt/assemblyai.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { env } from "../env";
2+
import { WsProxyConnection } from "./connection";
3+
4+
const CONTROL_MESSAGE_TYPES = new Set(["Terminate"]);
5+
6+
export const buildAssemblyAIUrl = (incomingUrl: URL) => {
7+
const target = new URL("wss://streaming.assemblyai.com/v3/ws");
8+
9+
incomingUrl.searchParams.forEach((value, key) => {
10+
if (key !== "provider") {
11+
target.searchParams.set(key, value);
12+
}
13+
});
14+
15+
return target;
16+
};
17+
18+
export const createAssemblyAIProxy = (incomingUrl: URL): WsProxyConnection => {
19+
const apiKey = env.ASSEMBLYAI_API_KEY;
20+
if (!apiKey) {
21+
throw new Error("ASSEMBLYAI_API_KEY not configured");
22+
}
23+
24+
const target = buildAssemblyAIUrl(incomingUrl);
25+
return new WsProxyConnection(target.toString(), {
26+
headers: {
27+
authorization: apiKey,
28+
},
29+
controlMessageTypes: CONTROL_MESSAGE_TYPES,
30+
});
31+
};
Lines changed: 34 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
import type { ServerWebSocket, WebSocketOptions } from "bun";
22

3-
import { env } from "./env";
3+
import {
4+
getPayloadSize,
5+
normalizeWsData,
6+
payloadIsControlMessage,
7+
type WsPayload,
8+
} from "./utils";
49

510
const DEFAULT_CLOSE_CODE = 1011;
611
const UPSTREAM_ERROR_TIMEOUT = 1000;
712
const UPSTREAM_CONNECT_TIMEOUT = 5000;
813
const MAX_PENDING_QUEUE_BYTES = 5 * 1024 * 1024; // 5 MiB
9-
const TEXT_ENCODER = new TextEncoder();
1014

1115
type QueuedPayload = { payload: WsPayload; size: number };
1216

13-
// https://developers.deepgram.com/docs/lower-level-websockets
14-
export class DeepgramProxyConnection {
17+
export type WsProxyOptions = {
18+
headers?: Record<string, string>;
19+
controlMessageTypes?: ReadonlySet<string>;
20+
};
21+
22+
const DEFAULT_CONTROL_MESSAGE_TYPES = new Set<string>();
23+
24+
export class WsProxyConnection {
1525
private upstream?: InstanceType<typeof WebSocket>;
1626
private upstreamReady = false;
1727
private shuttingDown = false;
@@ -25,7 +35,17 @@ export class DeepgramProxyConnection {
2535
private upstreamReadyResolve: (() => void) | null = null;
2636
private upstreamReadyReject: ((error: Error) => void) | null = null;
2737

28-
constructor(private deepgramUrl: string) {}
38+
private readonly headers?: Record<string, string>;
39+
private readonly controlMessageTypes: ReadonlySet<string>;
40+
41+
constructor(
42+
private upstreamUrl: string,
43+
options: WsProxyOptions = {},
44+
) {
45+
this.headers = options.headers;
46+
this.controlMessageTypes =
47+
options.controlMessageTypes ?? DEFAULT_CONTROL_MESSAGE_TYPES;
48+
}
2949

3050
private clearErrorTimeout() {
3151
if (this.upstreamErrorTimeout) {
@@ -80,18 +100,17 @@ export class DeepgramProxyConnection {
80100
return;
81101
}
82102

83-
const wsOptions: WebSocketOptions = {
84-
headers: {
85-
Authorization: `Token ${env.DEEPGRAM_API_KEY}`,
86-
},
87-
};
103+
const wsOptions: WebSocketOptions =
104+
this.headers && Object.keys(this.headers).length > 0
105+
? { headers: this.headers }
106+
: {};
88107

89108
this.upstream = new (globalThis.WebSocket as {
90109
new (
91110
url: string | URL,
92111
options?: WebSocketOptions,
93112
): InstanceType<typeof WebSocket>;
94-
})(this.deepgramUrl, wsOptions);
113+
})(this.upstreamUrl, wsOptions);
95114

96115
this.upstream.binaryType = "arraybuffer";
97116
this.setupUpstreamHandlers();
@@ -324,7 +343,10 @@ export class DeepgramProxyConnection {
324343
return;
325344
}
326345

327-
const isControlPayload = payloadIsControlMessage(payload);
346+
const isControlPayload = payloadIsControlMessage(
347+
payload,
348+
this.controlMessageTypes,
349+
);
328350

329351
if (!this.upstreamReady) {
330352
this.enqueuePendingPayload(payload, isControlPayload);
@@ -360,85 +382,3 @@ export class DeepgramProxyConnection {
360382
this.pendingBytes += size;
361383
}
362384
}
363-
364-
export const buildDeepgramUrl = (incomingUrl: URL) => {
365-
const target = new URL("wss://api.deepgram.com/v1/listen");
366-
367-
incomingUrl.searchParams.forEach((value, key) => {
368-
target.searchParams.set(key, value);
369-
});
370-
target.searchParams.set("model", "nova-3-general");
371-
target.searchParams.set("mip_opt_out", "false");
372-
373-
return target;
374-
};
375-
376-
export type WsPayload = string | Uint8Array;
377-
378-
// https://bun.com/docs/runtime/http/websockets
379-
export const normalizeWsData = async (
380-
data: unknown,
381-
): Promise<WsPayload | null> => {
382-
if (typeof data === "string") {
383-
return data;
384-
}
385-
386-
if (data instanceof Uint8Array) {
387-
return cloneBinaryPayload(data);
388-
}
389-
390-
if (data instanceof ArrayBuffer) {
391-
return new Uint8Array(data);
392-
}
393-
394-
if (ArrayBuffer.isView(data)) {
395-
return cloneBinaryPayload(data);
396-
}
397-
398-
if (typeof Blob !== "undefined" && data instanceof Blob) {
399-
const arrayBuffer = await data.arrayBuffer();
400-
return new Uint8Array(arrayBuffer);
401-
}
402-
403-
return null;
404-
};
405-
406-
const cloneBinaryPayload = (input: ArrayBuffer | ArrayBufferView) => {
407-
const view =
408-
input instanceof ArrayBuffer
409-
? new Uint8Array(input)
410-
: new Uint8Array(input.buffer, input.byteOffset, input.byteLength);
411-
const copy = new Uint8Array(view.byteLength);
412-
copy.set(view);
413-
return copy;
414-
};
415-
416-
const getPayloadSize = (payload: WsPayload) => {
417-
if (typeof payload === "string") {
418-
return TEXT_ENCODER.encode(payload).byteLength;
419-
}
420-
return payload.byteLength;
421-
};
422-
423-
const CONTROL_MESSAGE_TYPES = new Set(["KeepAlive", "CloseStream", "Finalize"]);
424-
425-
const payloadIsControlMessage = (payload: WsPayload) => {
426-
if (typeof payload !== "string") {
427-
return false;
428-
}
429-
430-
try {
431-
const parsed = JSON.parse(payload);
432-
if (
433-
typeof parsed === "object" &&
434-
parsed !== null &&
435-
CONTROL_MESSAGE_TYPES.has(parsed.type)
436-
) {
437-
return true;
438-
}
439-
} catch {
440-
// ignore parse errors
441-
}
442-
443-
return false;
444-
};

apps/api/src/stt/deepgram.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { env } from "../env";
2+
import { WsProxyConnection } from "./connection";
3+
4+
const CONTROL_MESSAGE_TYPES = new Set(["KeepAlive", "CloseStream", "Finalize"]);
5+
6+
export const buildDeepgramUrl = (incomingUrl: URL) => {
7+
const target = new URL("wss://api.deepgram.com/v1/listen");
8+
9+
incomingUrl.searchParams.forEach((value, key) => {
10+
if (key !== "provider") {
11+
target.searchParams.set(key, value);
12+
}
13+
});
14+
target.searchParams.set("model", "nova-3-general");
15+
target.searchParams.set("mip_opt_out", "false");
16+
17+
return target;
18+
};
19+
20+
export const createDeepgramProxy = (incomingUrl: URL): WsProxyConnection => {
21+
const target = buildDeepgramUrl(incomingUrl);
22+
return new WsProxyConnection(target.toString(), {
23+
headers: {
24+
Authorization: `Token ${env.DEEPGRAM_API_KEY}`,
25+
},
26+
controlMessageTypes: CONTROL_MESSAGE_TYPES,
27+
});
28+
};

apps/api/src/stt/index.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { createAssemblyAIProxy } from "./assemblyai";
2+
import { WsProxyConnection } from "./connection";
3+
import { createDeepgramProxy } from "./deepgram";
4+
import { createSonioxProxy } from "./soniox";
5+
6+
export { WsProxyConnection, type WsProxyOptions } from "./connection";
7+
export { normalizeWsData, type WsPayload } from "./utils";
8+
export { buildDeepgramUrl, createDeepgramProxy } from "./deepgram";
9+
export { buildAssemblyAIUrl, createAssemblyAIProxy } from "./assemblyai";
10+
export { buildSonioxUrl, createSonioxProxy } from "./soniox";
11+
12+
export const UPSTREAM_URL_HEADER = "x-owh-upstream-url";
13+
export const UPSTREAM_AUTH_HEADER = "x-owh-upstream-auth";
14+
15+
export type SttProvider = "deepgram" | "assemblyai" | "soniox";
16+
17+
export function createProxyFromRequest(
18+
incomingUrl: URL,
19+
reqHeaders: Headers,
20+
): WsProxyConnection {
21+
const upstreamOverride = reqHeaders.get(UPSTREAM_URL_HEADER);
22+
const rawAuth = reqHeaders.get(UPSTREAM_AUTH_HEADER);
23+
24+
if (upstreamOverride) {
25+
const url = new URL(upstreamOverride);
26+
const headers =
27+
rawAuth && rawAuth.length > 0 ? { Authorization: rawAuth } : undefined;
28+
29+
return new WsProxyConnection(url.toString(), {
30+
headers,
31+
});
32+
}
33+
34+
const provider =
35+
(incomingUrl.searchParams.get("provider") as SttProvider) || "deepgram";
36+
37+
switch (provider) {
38+
case "assemblyai":
39+
return createAssemblyAIProxy(incomingUrl);
40+
case "soniox":
41+
return createSonioxProxy();
42+
case "deepgram":
43+
default:
44+
return createDeepgramProxy(incomingUrl);
45+
}
46+
}

apps/api/src/stt/soniox.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { env } from "../env";
2+
import { WsProxyConnection } from "./connection";
3+
4+
const CONTROL_MESSAGE_TYPES = new Set(["keepalive", "finalize"]);
5+
6+
export const buildSonioxUrl = () => {
7+
return new URL("wss://stt-rt.soniox.com/transcribe-websocket");
8+
};
9+
10+
export const createSonioxProxy = (): WsProxyConnection => {
11+
if (!env.SONIOX_API_KEY) {
12+
throw new Error("SONIOX_API_KEY not configured");
13+
}
14+
15+
const target = buildSonioxUrl();
16+
return new WsProxyConnection(target.toString(), {
17+
controlMessageTypes: CONTROL_MESSAGE_TYPES,
18+
});
19+
};

0 commit comments

Comments
 (0)