Skip to content

Commit dcaccb6

Browse files
committed
Add concurrency and timeout controls to audio extraction
1 parent 7c95208 commit dcaccb6

File tree

5 files changed

+303
-23
lines changed

5 files changed

+303
-23
lines changed

apps/media-server/src/__tests__/index.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ describe("GET /", () => {
1010
expect(data).toEqual({
1111
name: "@cap/media-server",
1212
version: "1.0.0",
13-
endpoints: ["/health", "/audio/check", "/audio/extract"],
13+
endpoints: ["/health", "/audio/status", "/audio/check", "/audio/extract"],
1414
});
1515
});
1616
});

apps/media-server/src/app.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ app.get("/", (c) => {
1414
return c.json({
1515
name: "@cap/media-server",
1616
version: "1.0.0",
17-
endpoints: ["/health", "/audio/check", "/audio/extract"],
17+
endpoints: ["/health", "/audio/status", "/audio/check", "/audio/extract"],
1818
});
1919
});
2020

apps/media-server/src/lib/ffmpeg.ts

Lines changed: 160 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { spawn } from "bun";
1+
import { type Subprocess, spawn } from "bun";
22

33
export interface AudioExtractionOptions {
44
format?: "mp3";
@@ -12,23 +12,91 @@ const DEFAULT_OPTIONS: Required<AudioExtractionOptions> = {
1212
bitrate: "128k",
1313
};
1414

15+
const CHECK_TIMEOUT_MS = 30_000;
16+
const EXTRACT_TIMEOUT_MS = 120_000;
17+
const MAX_AUDIO_SIZE_BYTES = 100 * 1024 * 1024;
18+
19+
let activeProcesses = 0;
20+
const MAX_CONCURRENT_PROCESSES = 6;
21+
22+
export function getActiveProcessCount(): number {
23+
return activeProcesses;
24+
}
25+
26+
export function canAcceptNewProcess(): boolean {
27+
return activeProcesses < MAX_CONCURRENT_PROCESSES;
28+
}
29+
30+
function killProcess(proc: Subprocess): void {
31+
try {
32+
proc.kill();
33+
} catch {}
34+
}
35+
36+
async function withTimeout<T>(
37+
promise: Promise<T>,
38+
timeoutMs: number,
39+
cleanup?: () => void,
40+
): Promise<T> {
41+
let timeoutId: ReturnType<typeof setTimeout> | undefined;
42+
const timeoutPromise = new Promise<never>((_, reject) => {
43+
timeoutId = setTimeout(() => {
44+
cleanup?.();
45+
reject(new Error(`Operation timed out after ${timeoutMs}ms`));
46+
}, timeoutMs);
47+
});
48+
49+
try {
50+
const result = await Promise.race([promise, timeoutPromise]);
51+
if (timeoutId) clearTimeout(timeoutId);
52+
return result;
53+
} catch (err) {
54+
if (timeoutId) clearTimeout(timeoutId);
55+
throw err;
56+
}
57+
}
58+
1559
export async function checkHasAudioTrack(videoUrl: string): Promise<boolean> {
60+
if (!canAcceptNewProcess()) {
61+
throw new Error("Server is busy, please try again later");
62+
}
63+
64+
activeProcesses++;
65+
1666
const proc = spawn({
1767
cmd: ["ffmpeg", "-i", videoUrl, "-hide_banner"],
1868
stdout: "pipe",
1969
stderr: "pipe",
2070
});
2171

22-
const stderrText = await new Response(proc.stderr).text();
23-
await proc.exited;
72+
try {
73+
const result = await withTimeout(
74+
(async () => {
75+
const stderrText = await new Response(proc.stderr).text();
76+
await proc.exited;
77+
return /Stream #\d+:\d+.*Audio:/.test(stderrText);
78+
})(),
79+
CHECK_TIMEOUT_MS,
80+
() => killProcess(proc),
81+
);
2482

25-
return /Stream #\d+:\d+.*Audio:/.test(stderrText);
83+
return result;
84+
} finally {
85+
activeProcesses--;
86+
killProcess(proc);
87+
}
2688
}
2789

2890
export async function extractAudio(
2991
videoUrl: string,
3092
options: AudioExtractionOptions = {},
3193
): Promise<Uint8Array> {
94+
if (!canAcceptNewProcess()) {
95+
throw new Error("Server is busy, please try again later");
96+
}
97+
98+
activeProcesses++;
99+
32100
const opts = { ...DEFAULT_OPTIONS, ...options };
33101

34102
const ffmpegArgs = [
@@ -51,23 +119,53 @@ export async function extractAudio(
51119
stderr: "pipe",
52120
});
53121

54-
const [stdout, stderrText, exitCode] = await Promise.all([
55-
new Response(proc.stdout).arrayBuffer(),
56-
new Response(proc.stderr).text(),
57-
proc.exited,
58-
]);
122+
try {
123+
const result = await withTimeout(
124+
(async () => {
125+
const [stdout, stderrText, exitCode] = await Promise.all([
126+
new Response(proc.stdout).arrayBuffer(),
127+
new Response(proc.stderr).text(),
128+
proc.exited,
129+
]);
130+
131+
if (exitCode !== 0) {
132+
throw new Error(`FFmpeg exited with code ${exitCode}: ${stderrText}`);
133+
}
134+
135+
if (stdout.byteLength > MAX_AUDIO_SIZE_BYTES) {
136+
throw new Error(
137+
`Audio too large: ${stdout.byteLength} bytes exceeds ${MAX_AUDIO_SIZE_BYTES} byte limit`,
138+
);
139+
}
140+
141+
return new Uint8Array(stdout);
142+
})(),
143+
EXTRACT_TIMEOUT_MS,
144+
() => killProcess(proc),
145+
);
59146

60-
if (exitCode !== 0) {
61-
throw new Error(`FFmpeg exited with code ${exitCode}: ${stderrText}`);
147+
return result;
148+
} finally {
149+
activeProcesses--;
150+
killProcess(proc);
62151
}
152+
}
63153

64-
return new Uint8Array(stdout);
154+
export interface StreamingExtractResult {
155+
stream: ReadableStream<Uint8Array>;
156+
cleanup: () => void;
65157
}
66158

67-
export async function extractAudioStream(
159+
export function extractAudioStream(
68160
videoUrl: string,
69161
options: AudioExtractionOptions = {},
70-
): Promise<ReadableStream<Uint8Array>> {
162+
): StreamingExtractResult {
163+
if (!canAcceptNewProcess()) {
164+
throw new Error("Server is busy, please try again later");
165+
}
166+
167+
activeProcesses++;
168+
71169
const opts = { ...DEFAULT_OPTIONS, ...options };
72170

73171
const ffmpegArgs = [
@@ -90,5 +188,52 @@ export async function extractAudioStream(
90188
stderr: "pipe",
91189
});
92190

93-
return proc.stdout as ReadableStream<Uint8Array>;
191+
let timeoutId: ReturnType<typeof setTimeout> | undefined;
192+
let cleaned = false;
193+
194+
const cleanup = () => {
195+
if (cleaned) return;
196+
cleaned = true;
197+
if (timeoutId) clearTimeout(timeoutId);
198+
activeProcesses--;
199+
killProcess(proc);
200+
};
201+
202+
timeoutId = setTimeout(() => {
203+
console.error("[ffmpeg] Stream extraction timed out");
204+
cleanup();
205+
}, EXTRACT_TIMEOUT_MS);
206+
207+
proc.exited.then((code) => {
208+
if (code !== 0) {
209+
console.error(`[ffmpeg] Stream extraction exited with code ${code}`);
210+
}
211+
cleanup();
212+
});
213+
214+
const originalStream = proc.stdout as ReadableStream<Uint8Array>;
215+
216+
const wrappedStream = new ReadableStream<Uint8Array>({
217+
async start(controller) {
218+
const reader = originalStream.getReader();
219+
try {
220+
while (true) {
221+
const { done, value } = await reader.read();
222+
if (done) break;
223+
controller.enqueue(value);
224+
}
225+
controller.close();
226+
} catch (err) {
227+
controller.error(err);
228+
} finally {
229+
reader.releaseLock();
230+
cleanup();
231+
}
232+
},
233+
cancel() {
234+
cleanup();
235+
},
236+
});
237+
238+
return { stream: wrappedStream, cleanup };
94239
}

apps/media-server/src/routes/audio.ts

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,39 @@
11
import { Hono } from "hono";
22
import { z } from "zod";
3-
import { checkHasAudioTrack, extractAudio } from "../lib/ffmpeg";
3+
import {
4+
canAcceptNewProcess,
5+
checkHasAudioTrack,
6+
extractAudio,
7+
extractAudioStream,
8+
getActiveProcessCount,
9+
} from "../lib/ffmpeg";
410

511
const audio = new Hono();
612

713
const videoUrlSchema = z.object({
814
videoUrl: z.string().url(),
915
});
1016

17+
const extractSchema = z.object({
18+
videoUrl: z.string().url(),
19+
stream: z.boolean().optional().default(false),
20+
});
21+
22+
function isBusyError(err: unknown): boolean {
23+
return err instanceof Error && err.message.includes("Server is busy");
24+
}
25+
26+
function isTimeoutError(err: unknown): boolean {
27+
return err instanceof Error && err.message.includes("timed out");
28+
}
29+
30+
audio.get("/status", (c) => {
31+
return c.json({
32+
activeProcesses: getActiveProcessCount(),
33+
canAcceptNewProcess: canAcceptNewProcess(),
34+
});
35+
});
36+
1137
audio.post("/check", async (c) => {
1238
const body = await c.req.json();
1339
const result = videoUrlSchema.safeParse(body);
@@ -28,6 +54,29 @@ audio.post("/check", async (c) => {
2854
return c.json({ hasAudio });
2955
} catch (err) {
3056
console.error("[audio/check] Error:", err);
57+
58+
if (isBusyError(err)) {
59+
return c.json(
60+
{
61+
error: "Server is busy",
62+
code: "SERVER_BUSY",
63+
details: "Too many concurrent requests, please retry later",
64+
},
65+
503,
66+
);
67+
}
68+
69+
if (isTimeoutError(err)) {
70+
return c.json(
71+
{
72+
error: "Request timed out",
73+
code: "TIMEOUT",
74+
details: err instanceof Error ? err.message : String(err),
75+
},
76+
504,
77+
);
78+
}
79+
3180
return c.json(
3281
{
3382
error: "Failed to check audio track",
@@ -41,7 +90,7 @@ audio.post("/check", async (c) => {
4190

4291
audio.post("/extract", async (c) => {
4392
const body = await c.req.json();
44-
const result = videoUrlSchema.safeParse(body);
93+
const result = extractSchema.safeParse(body);
4594

4695
if (!result.success) {
4796
return c.json(
@@ -54,16 +103,28 @@ audio.post("/extract", async (c) => {
54103
);
55104
}
56105

106+
const { videoUrl, stream: useStreaming } = result.data;
107+
57108
try {
58-
const hasAudio = await checkHasAudioTrack(result.data.videoUrl);
109+
const hasAudio = await checkHasAudioTrack(videoUrl);
59110
if (!hasAudio) {
60111
return c.json(
61112
{ error: "Video has no audio track", code: "NO_AUDIO_TRACK" },
62113
422,
63114
);
64115
}
65116

66-
const audioData = await extractAudio(result.data.videoUrl);
117+
if (useStreaming) {
118+
const { stream } = extractAudioStream(videoUrl);
119+
return new Response(stream, {
120+
headers: {
121+
"Content-Type": "audio/mpeg",
122+
"Transfer-Encoding": "chunked",
123+
},
124+
});
125+
}
126+
127+
const audioData = await extractAudio(videoUrl);
67128

68129
return new Response(Buffer.from(audioData), {
69130
headers: {
@@ -73,6 +134,29 @@ audio.post("/extract", async (c) => {
73134
});
74135
} catch (err) {
75136
console.error("[audio/extract] Error:", err);
137+
138+
if (isBusyError(err)) {
139+
return c.json(
140+
{
141+
error: "Server is busy",
142+
code: "SERVER_BUSY",
143+
details: "Too many concurrent requests, please retry later",
144+
},
145+
503,
146+
);
147+
}
148+
149+
if (isTimeoutError(err)) {
150+
return c.json(
151+
{
152+
error: "Request timed out",
153+
code: "TIMEOUT",
154+
details: err instanceof Error ? err.message : String(err),
155+
},
156+
504,
157+
);
158+
}
159+
76160
return c.json(
77161
{
78162
error: "Failed to extract audio",

0 commit comments

Comments
 (0)