diff --git a/apps/api/src/stt/connection.ts b/apps/api/src/stt/connection.ts index 666b77367c..cdf216fa56 100644 --- a/apps/api/src/stt/connection.ts +++ b/apps/api/src/stt/connection.ts @@ -8,9 +8,13 @@ import { type WsPayload, } from "./utils"; +// WebSocket close code 1011: server encountered an unexpected condition const DEFAULT_CLOSE_CODE = 1011; -const UPSTREAM_ERROR_TIMEOUT = 1000; -const UPSTREAM_CONNECT_TIMEOUT = 5000; +// Grace period (ms) before closing connection after upstream error event +const UPSTREAM_ERROR_GRACE_MS = 1000; +// Maximum time (ms) to wait for upstream WebSocket to connect +const UPSTREAM_CONNECT_TIMEOUT_MS = 5000; +// Maximum bytes allowed in pending message queues before triggering backpressure const MAX_PENDING_QUEUE_BYTES = 5 * 1024 * 1024; // 5 MiB type QueuedPayload = { payload: WsPayload; size: number }; @@ -66,7 +70,7 @@ export class WsProxyConnection { if (!this.shuttingDown) { this.closeConnections(DEFAULT_CLOSE_CODE, "upstream_error"); } - }, UPSTREAM_ERROR_TIMEOUT); + }, UPSTREAM_ERROR_GRACE_MS); } private resolveUpstreamReadyWaiters() { @@ -181,9 +185,10 @@ export class WsProxyConnection { this.upstreamReadyPromise = null; this.upstreamReadyResolve = null; this.upstreamReadyReject = null; + this.hasTransformedFirst = false; } - private flushPendingMessages() { + private flushUpstreamQueue() { if ( !this.upstream || !this.upstreamReady || @@ -216,7 +221,7 @@ export class WsProxyConnection { } } - private flushDownstreamMessages() { + private flushDownstreamQueue() { if ( !this.clientSocket || this.clientSocket.readyState !== WebSocket.OPEN || @@ -269,16 +274,23 @@ export class WsProxyConnection { this.upstream.addEventListener("open", () => { this.upstreamReady = true; this.resolveUpstreamReadyWaiters(); - this.flushPendingMessages(); - this.flushDownstreamMessages(); + this.flushUpstreamQueue(); + this.flushDownstreamQueue(); }); this.upstream.addEventListener("message", async (event) => { - const payload = await normalizeWsData(event.data); - if (!payload) { - return; + try { + const payload = await normalizeWsData(event.data); + if (!payload) { + return; + } + this.forwardDownstreamPayload(payload); + } catch (error) { + Sentry.captureException(error, { + tags: { operation: "upstream_message_normalize" }, + }); + this.closeConnections(DEFAULT_CLOSE_CODE, "message_normalize_failed"); } - this.forwardDownstreamPayload(payload); }); this.upstream.addEventListener("close", (event) => { @@ -311,7 +323,7 @@ export class WsProxyConnection { }); } - async preconnectUpstream(timeoutMs = UPSTREAM_CONNECT_TIMEOUT) { + async preconnectUpstream(timeoutMs = UPSTREAM_CONNECT_TIMEOUT_MS) { this.ensureUpstreamSocket(); let timeoutHandle: ReturnType | null = null; @@ -342,11 +354,14 @@ export class WsProxyConnection { } initializeUpstream(clientWs: ServerWebSocket) { + // Set clientSocket BEFORE ensureUpstreamSocket to prevent race condition: + // If upstream becomes ready during ensureUpstreamSocket(), the open handler + // will call flushDownstreamQueue() which needs clientSocket to be set. this.clientSocket = clientWs; this.ensureUpstreamSocket(); if (this.upstreamReady) { - this.flushPendingMessages(); - this.flushDownstreamMessages(); + this.flushUpstreamQueue(); + this.flushDownstreamQueue(); } } diff --git a/apps/api/src/stt/deepgram.ts b/apps/api/src/stt/deepgram.ts index 0b7adcdd0e..69006ab751 100644 --- a/apps/api/src/stt/deepgram.ts +++ b/apps/api/src/stt/deepgram.ts @@ -18,10 +18,15 @@ export const buildDeepgramUrl = (incomingUrl: URL) => { }; export const createDeepgramProxy = (incomingUrl: URL): WsProxyConnection => { + const apiKey = env.DEEPGRAM_API_KEY; + if (!apiKey) { + throw new Error("DEEPGRAM_API_KEY not configured"); + } + const target = buildDeepgramUrl(incomingUrl); return new WsProxyConnection(target.toString(), { headers: { - Authorization: `Token ${env.DEEPGRAM_API_KEY}`, + Authorization: `Token ${apiKey}`, }, controlMessageTypes: CONTROL_MESSAGE_TYPES, }); diff --git a/apps/api/src/stt/index.ts b/apps/api/src/stt/index.ts index ae02030b1f..34b0c4ba8a 100644 --- a/apps/api/src/stt/index.ts +++ b/apps/api/src/stt/index.ts @@ -14,6 +14,16 @@ export const UPSTREAM_AUTH_HEADER = "x-owh-upstream-auth"; export type SttProvider = "deepgram" | "assemblyai" | "soniox"; +const VALID_PROVIDERS: readonly SttProvider[] = [ + "deepgram", + "assemblyai", + "soniox", +]; + +function isValidProvider(provider: string): provider is SttProvider { + return VALID_PROVIDERS.includes(provider as SttProvider); +} + export function createProxyFromRequest( incomingUrl: URL, reqHeaders: Headers, @@ -22,7 +32,19 @@ export function createProxyFromRequest( const rawAuth = reqHeaders.get(UPSTREAM_AUTH_HEADER); if (upstreamOverride) { - const url = new URL(upstreamOverride); + let url: URL; + try { + url = new URL(upstreamOverride); + } catch { + throw new Error(`Invalid upstream URL: ${upstreamOverride}`); + } + + if (url.protocol !== "wss:" && url.protocol !== "ws:") { + throw new Error( + `Invalid upstream URL protocol: ${url.protocol} (expected ws: or wss:)`, + ); + } + const headers = rawAuth && rawAuth.length > 0 ? { Authorization: rawAuth } : undefined; @@ -31,8 +53,19 @@ export function createProxyFromRequest( }); } - const provider = - (incomingUrl.searchParams.get("provider") as SttProvider) || "deepgram"; + const providerParam = incomingUrl.searchParams.get("provider"); + + if (providerParam && !isValidProvider(providerParam)) { + throw new Error( + `Unknown STT provider: ${providerParam} (valid: ${VALID_PROVIDERS.join(", ")})`, + ); + } + + // After validation above, providerParam is either null or a valid SttProvider + const provider: SttProvider = + providerParam && isValidProvider(providerParam) + ? providerParam + : "deepgram"; switch (provider) { case "assemblyai": @@ -40,7 +73,6 @@ export function createProxyFromRequest( case "soniox": return createSonioxProxy(); case "deepgram": - default: return createDeepgramProxy(incomingUrl); } } diff --git a/apps/api/src/stt/utils.ts b/apps/api/src/stt/utils.ts index 95e69fe428..505388a778 100644 --- a/apps/api/src/stt/utils.ts +++ b/apps/api/src/stt/utils.ts @@ -29,6 +29,9 @@ export const normalizeWsData = async ( return null; }; +// Clone binary data to ensure ownership - WebSocket message buffers may be +// reused or invalidated after the event handler returns, so we copy the data +// to prevent use-after-free issues when the payload is queued for later use. const cloneBinaryPayload = (input: ArrayBuffer | ArrayBufferView) => { const view = input instanceof ArrayBuffer