Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions apps/api/src/stt/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import {
type WsPayload,
} from "./utils";

// WebSocket close code 1011: server encountered an unexpected condition
const DEFAULT_CLOSE_CODE = 1011;
const UPSTREAM_ERROR_TIMEOUT = 1000;
const UPSTREAM_CONNECT_TIMEOUT = 5000;
// Grace period (ms) before closing connection after upstream error event
const UPSTREAM_ERROR_GRACE_MS = 1000;
// Maximum time (ms) to wait for upstream WebSocket to connect
const UPSTREAM_CONNECT_TIMEOUT_MS = 5000;
// Maximum bytes allowed in pending message queues before triggering backpressure
const MAX_PENDING_QUEUE_BYTES = 5 * 1024 * 1024; // 5 MiB

type QueuedPayload = { payload: WsPayload; size: number };
Expand Down Expand Up @@ -66,7 +70,7 @@ export class WsProxyConnection {
if (!this.shuttingDown) {
this.closeConnections(DEFAULT_CLOSE_CODE, "upstream_error");
}
}, UPSTREAM_ERROR_TIMEOUT);
}, UPSTREAM_ERROR_GRACE_MS);
}

private resolveUpstreamReadyWaiters() {
Expand Down Expand Up @@ -181,9 +185,10 @@ export class WsProxyConnection {
this.upstreamReadyPromise = null;
this.upstreamReadyResolve = null;
this.upstreamReadyReject = null;
this.hasTransformedFirst = false;
}

private flushPendingMessages() {
private flushUpstreamQueue() {
if (
!this.upstream ||
!this.upstreamReady ||
Expand Down Expand Up @@ -216,7 +221,7 @@ export class WsProxyConnection {
}
}

private flushDownstreamMessages() {
private flushDownstreamQueue() {
if (
!this.clientSocket ||
this.clientSocket.readyState !== WebSocket.OPEN ||
Expand Down Expand Up @@ -269,16 +274,23 @@ export class WsProxyConnection {
this.upstream.addEventListener("open", () => {
this.upstreamReady = true;
this.resolveUpstreamReadyWaiters();
this.flushPendingMessages();
this.flushDownstreamMessages();
this.flushUpstreamQueue();
this.flushDownstreamQueue();
});

this.upstream.addEventListener("message", async (event) => {
const payload = await normalizeWsData(event.data);
if (!payload) {
return;
try {
const payload = await normalizeWsData(event.data);
if (!payload) {
return;
}
this.forwardDownstreamPayload(payload);
} catch (error) {
Sentry.captureException(error, {
tags: { operation: "upstream_message_normalize" },
});
this.closeConnections(DEFAULT_CLOSE_CODE, "message_normalize_failed");
}
this.forwardDownstreamPayload(payload);
});

this.upstream.addEventListener("close", (event) => {
Expand Down Expand Up @@ -311,7 +323,7 @@ export class WsProxyConnection {
});
}

async preconnectUpstream(timeoutMs = UPSTREAM_CONNECT_TIMEOUT) {
async preconnectUpstream(timeoutMs = UPSTREAM_CONNECT_TIMEOUT_MS) {
this.ensureUpstreamSocket();
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;

Expand Down Expand Up @@ -342,11 +354,14 @@ export class WsProxyConnection {
}

initializeUpstream(clientWs: ServerWebSocket<unknown>) {
// Set clientSocket BEFORE ensureUpstreamSocket to prevent race condition:
// If upstream becomes ready during ensureUpstreamSocket(), the open handler
// will call flushDownstreamQueue() which needs clientSocket to be set.
this.clientSocket = clientWs;
this.ensureUpstreamSocket();
if (this.upstreamReady) {
this.flushPendingMessages();
this.flushDownstreamMessages();
this.flushUpstreamQueue();
this.flushDownstreamQueue();
}
}

Expand Down
7 changes: 6 additions & 1 deletion apps/api/src/stt/deepgram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ export const buildDeepgramUrl = (incomingUrl: URL) => {
};

export const createDeepgramProxy = (incomingUrl: URL): WsProxyConnection => {
const apiKey = env.DEEPGRAM_API_KEY;
if (!apiKey) {
throw new Error("DEEPGRAM_API_KEY not configured");
}

const target = buildDeepgramUrl(incomingUrl);
return new WsProxyConnection(target.toString(), {
headers: {
Authorization: `Token ${env.DEEPGRAM_API_KEY}`,
Authorization: `Token ${apiKey}`,
},
controlMessageTypes: CONTROL_MESSAGE_TYPES,
});
Expand Down
40 changes: 36 additions & 4 deletions apps/api/src/stt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ export const UPSTREAM_AUTH_HEADER = "x-owh-upstream-auth";

export type SttProvider = "deepgram" | "assemblyai" | "soniox";

const VALID_PROVIDERS: readonly SttProvider[] = [
"deepgram",
"assemblyai",
"soniox",
];

function isValidProvider(provider: string): provider is SttProvider {
return VALID_PROVIDERS.includes(provider as SttProvider);
}

export function createProxyFromRequest(
incomingUrl: URL,
reqHeaders: Headers,
Expand All @@ -22,7 +32,19 @@ export function createProxyFromRequest(
const rawAuth = reqHeaders.get(UPSTREAM_AUTH_HEADER);

if (upstreamOverride) {
const url = new URL(upstreamOverride);
let url: URL;
try {
url = new URL(upstreamOverride);
} catch {
throw new Error(`Invalid upstream URL: ${upstreamOverride}`);
}

if (url.protocol !== "wss:" && url.protocol !== "ws:") {
throw new Error(
`Invalid upstream URL protocol: ${url.protocol} (expected ws: or wss:)`,
);
}

const headers =
rawAuth && rawAuth.length > 0 ? { Authorization: rawAuth } : undefined;

Expand All @@ -31,16 +53,26 @@ export function createProxyFromRequest(
});
}

const provider =
(incomingUrl.searchParams.get("provider") as SttProvider) || "deepgram";
const providerParam = incomingUrl.searchParams.get("provider");

if (providerParam && !isValidProvider(providerParam)) {
throw new Error(
`Unknown STT provider: ${providerParam} (valid: ${VALID_PROVIDERS.join(", ")})`,
);
}

// After validation above, providerParam is either null or a valid SttProvider
const provider: SttProvider =
providerParam && isValidProvider(providerParam)
? providerParam
: "deepgram";

switch (provider) {
case "assemblyai":
return createAssemblyAIProxy(incomingUrl);
case "soniox":
return createSonioxProxy();
case "deepgram":
default:
return createDeepgramProxy(incomingUrl);
}
}
3 changes: 3 additions & 0 deletions apps/api/src/stt/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const normalizeWsData = async (
return null;
};

// Clone binary data to ensure ownership - WebSocket message buffers may be
// reused or invalidated after the event handler returns, so we copy the data
// to prevent use-after-free issues when the payload is queued for later use.
const cloneBinaryPayload = (input: ArrayBuffer | ArrayBufferView) => {
const view =
input instanceof ArrayBuffer
Expand Down