Skip to content

Commit 9aea4fb

Browse files
committed
refactor file-stt restate pipeline
1 parent 36334c4 commit 9aea4fb

File tree

6 files changed

+88
-249
lines changed

6 files changed

+88
-249
lines changed

apps/restate/src/audioPipeline.ts

Lines changed: 0 additions & 158 deletions
This file was deleted.

apps/restate/src/deepgram.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { CallbackUrl, createClient } from "@deepgram/sdk";
2+
import { z } from "zod";
3+
4+
export const DeepgramCallback = z.object({
5+
results: z
6+
.object({
7+
channels: z
8+
.array(
9+
z.object({
10+
alternatives: z
11+
.array(z.object({ transcript: z.string() }))
12+
.optional(),
13+
}),
14+
)
15+
.optional(),
16+
})
17+
.optional(),
18+
channel: z
19+
.object({
20+
alternatives: z.array(z.object({ transcript: z.string() })).optional(),
21+
})
22+
.optional(),
23+
});
24+
25+
export type DeepgramCallbackType = z.infer<typeof DeepgramCallback>;
26+
27+
export function extractTranscript(payload: DeepgramCallbackType): string {
28+
return (
29+
payload.results?.channels?.[0]?.alternatives?.[0]?.transcript ??
30+
payload.channel?.alternatives?.[0]?.transcript ??
31+
""
32+
);
33+
}
34+
35+
export async function transcribeWithCallback(
36+
audioUrl: string,
37+
callbackUrl: string,
38+
apiKey: string,
39+
): Promise<string> {
40+
const client = createClient(apiKey);
41+
const { result, error } =
42+
await client.listen.prerecorded.transcribeUrlCallback(
43+
{ url: audioUrl },
44+
new CallbackUrl(callbackUrl),
45+
{ model: "nova-3", smart_format: true },
46+
);
47+
48+
if (error) throw new Error(`Deepgram: ${error.message}`);
49+
if (!result?.request_id) throw new Error("Deepgram: missing request_id");
50+
51+
return result.request_id;
52+
}

apps/restate/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";
22

3-
import { audioPipeline } from "./audioPipeline";
43
import { type Env, envSchema } from "./env";
54
import { rateLimiter } from "./services/rate-limit";
65
import { sttFile } from "./services/stt-file";
@@ -9,7 +8,7 @@ export default {
98
fetch(request: Request, _env: Env, _ctx: ExecutionContext) {
109
const env = envSchema.parse(_env);
1110
return restate.createEndpointHandler({
12-
services: [audioPipeline, rateLimiter, sttFile],
11+
services: [rateLimiter, sttFile],
1312
...(env.RESTATE_IDENTITY_KEY
1413
? { identityKeys: [env.RESTATE_IDENTITY_KEY] }
1514
: {}),

apps/restate/src/services/rate-limit.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,5 @@
11
import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";
22

3-
interface RateLimitState {
4-
windowStartMs: number;
5-
count: number;
6-
}
7-
8-
interface RateLimitConfig {
9-
windowMs: number;
10-
maxInWindow: number;
11-
}
12-
133
export const rateLimiter = restate.object({
144
name: "RateLimiter",
155
handlers: {
@@ -28,7 +18,7 @@ export const rateLimiter = restate.object({
2818

2919
if (current.count >= config.maxInWindow) {
3020
ctx.set("state", current);
31-
throw new restate.TerminalError("Rate limit exceeded", {
21+
throw new restate.TerminalError("rate_limit_exceeded", {
3222
errorCode: 429,
3323
});
3424
}
@@ -47,3 +37,13 @@ export type RateLimiter = typeof rateLimiter;
4737
export function limiter(ctx: restate.Context, key: string) {
4838
return ctx.objectClient<RateLimiter>(rateLimiter, key);
4939
}
40+
41+
interface RateLimitState {
42+
windowStartMs: number;
43+
count: number;
44+
}
45+
46+
interface RateLimitConfig {
47+
windowMs: number;
48+
maxInWindow: number;
49+
}

apps/restate/src/services/stt-file.ts

Lines changed: 11 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
import { CallbackUrl, createClient } from "@deepgram/sdk";
21
import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";
32
import { serde } from "@restatedev/restate-sdk-zod";
43
import { z } from "zod";
54

5+
import {
6+
DeepgramCallback,
7+
DeepgramCallbackType,
8+
extractTranscript,
9+
transcribeWithCallback,
10+
} from "../deepgram";
611
import { type Env } from "../env";
712
import { createSignedUrl, deleteFile } from "../supabase";
813
import { limiter } from "./rate-limit";
@@ -18,63 +23,12 @@ const SttStatus = z.enum(["QUEUED", "TRANSCRIBING", "DONE", "ERROR"]);
1823

1924
export type SttStatusType = z.infer<typeof SttStatus>;
2025

21-
const DeepgramCallback = z.object({
22-
results: z
23-
.object({
24-
channels: z
25-
.array(
26-
z.object({
27-
alternatives: z
28-
.array(z.object({ transcript: z.string() }))
29-
.optional(),
30-
}),
31-
)
32-
.optional(),
33-
})
34-
.optional(),
35-
channel: z
36-
.object({
37-
alternatives: z.array(z.object({ transcript: z.string() })).optional(),
38-
})
39-
.optional(),
40-
});
41-
42-
export type DeepgramCallbackType = z.infer<typeof DeepgramCallback>;
43-
44-
interface SttResult {
45-
status: SttStatusType;
46-
transcript?: string;
47-
error?: string;
48-
}
49-
50-
async function transcribe(
51-
audioUrl: string,
52-
callbackUrl: string,
53-
apiKey: string,
54-
): Promise<string> {
55-
const client = createClient(apiKey);
56-
const { result, error } =
57-
await client.listen.prerecorded.transcribeUrlCallback(
58-
{ url: audioUrl },
59-
new CallbackUrl(callbackUrl),
60-
{ model: "nova-3", smart_format: true },
61-
);
62-
63-
if (error) throw new Error(`Deepgram: ${error.message}`);
64-
if (!result?.request_id) throw new Error("Deepgram: missing request_id");
65-
66-
return result.request_id;
67-
}
68-
6926
export const sttFile = restate.workflow({
7027
name: "SttFile",
7128
handlers: {
7229
run: restate.handlers.workflow.workflow(
7330
{ input: serde.zod(SttFileInput) },
74-
async (
75-
ctx: restate.WorkflowContext,
76-
input: SttFileInputType,
77-
): Promise<SttResult> => {
31+
async (ctx: restate.WorkflowContext, input: SttFileInputType) => {
7832
ctx.set("status", "QUEUED" as SttStatusType);
7933
ctx.set("fileId", input.fileId);
8034

@@ -95,15 +49,15 @@ export const sttFile = restate.workflow({
9549
const callbackUrl = `${env.RESTATE_INGRESS_URL.replace(/\/+$/, "")}/SttFile/${encodeURIComponent(ctx.key)}/onTranscript`;
9650

9751
const requestId = await ctx.run("transcribe", () =>
98-
transcribe(audioUrl, callbackUrl, env.DEEPGRAM_API_KEY),
52+
transcribeWithCallback(audioUrl, callbackUrl, env.DEEPGRAM_API_KEY),
9953
);
10054
ctx.set("deepgramRequestId", requestId);
10155

10256
const transcript = await ctx.promise<string>("transcript");
10357
ctx.set("transcript", transcript);
10458
ctx.set("status", "DONE" as SttStatusType);
10559

106-
return { status: "DONE", transcript };
60+
return { status: "DONE" as const, transcript };
10761
} catch (err) {
10862
const error = err instanceof Error ? err.message : "Unknown error";
10963
ctx.set("status", "ERROR" as SttStatusType);
@@ -126,18 +80,13 @@ export const sttFile = restate.workflow({
12680
const existing = await ctx.get<string>("transcript");
12781
if (existing !== undefined) return;
12882

129-
const transcript =
130-
payload.results?.channels?.[0]?.alternatives?.[0]?.transcript ??
131-
payload.channel?.alternatives?.[0]?.transcript ??
132-
"";
133-
134-
ctx.promise<string>("transcript").resolve(transcript);
83+
ctx.promise<string>("transcript").resolve(extractTranscript(payload));
13584
},
13685
),
13786

13887
getStatus: restate.handlers.workflow.shared(
13988
{},
140-
async (ctx: restate.WorkflowSharedContext): Promise<SttResult> => {
89+
async (ctx: restate.WorkflowSharedContext) => {
14190
const status = (await ctx.get<SttStatusType>("status")) ?? "QUEUED";
14291
const transcript = await ctx.get<string>("transcript");
14392
const error = await ctx.get<string>("error");

0 commit comments

Comments
 (0)