Skip to content

Commit d7ef0fa

Browse files
committed
feat(api): implement concurrency control and request de-duplication for TTS audio processing
1 parent 70723f4 commit d7ef0fa

File tree

3 files changed

+236
-24
lines changed

3 files changed

+236
-24
lines changed

src/app/api/tts/route.ts

Lines changed: 166 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,87 @@ const ttsAudioCache = new LRUCache<string, AudioBufferValue>({
2323
ttl: TTS_CACHE_TTL_MS,
2424
});
2525

26+
// Concurrency controls and in-flight de-duplication
27+
const TTS_MAX_CONCURRENCY = Number(process.env.TTS_MAX_CONCURRENCY || 4);
28+
29+
class Semaphore {
30+
private permits: number;
31+
private queue: Array<() => void> = [];
32+
constructor(max: number) {
33+
this.permits = Math.max(1, max);
34+
}
35+
async acquire(): Promise<() => void> {
36+
if (this.permits > 0) {
37+
this.permits -= 1;
38+
return this.release.bind(this);
39+
}
40+
return new Promise<() => void>((resolve) => {
41+
this.queue.push(() => {
42+
this.permits -= 1;
43+
resolve(this.release.bind(this));
44+
});
45+
});
46+
}
47+
private release() {
48+
this.permits += 1;
49+
const next = this.queue.shift();
50+
if (next) next();
51+
}
52+
}
53+
54+
const ttsSemaphore = new Semaphore(TTS_MAX_CONCURRENCY);
55+
56+
type InflightEntry = {
57+
promise: Promise<ArrayBuffer>;
58+
controller: AbortController;
59+
consumers: number;
60+
};
61+
62+
const inflightRequests = new Map<string, InflightEntry>();
63+
64+
function sleep(ms: number) {
65+
return new Promise((res) => setTimeout(res, ms));
66+
}
67+
68+
async function fetchTTSBufferWithRetry(
69+
openai: OpenAI,
70+
createParams: ExtendedSpeechParams,
71+
signal: AbortSignal
72+
): Promise<ArrayBuffer> {
73+
let attempt = 0;
74+
const maxRetries = Number(process.env.TTS_MAX_RETRIES ?? 2);
75+
let delay = Number(process.env.TTS_RETRY_INITIAL_MS ?? 250);
76+
const maxDelay = Number(process.env.TTS_RETRY_MAX_MS ?? 2000);
77+
const backoff = Number(process.env.TTS_RETRY_BACKOFF ?? 2);
78+
79+
// Retry on 429 and 5xx only; never retry aborts
80+
for (;;) {
81+
try {
82+
const response = await openai.audio.speech.create(createParams as SpeechCreateParams, { signal });
83+
return await response.arrayBuffer();
84+
} catch (err: unknown) {
85+
if (signal?.aborted || (err instanceof Error && err.name === 'AbortError')) {
86+
throw err;
87+
}
88+
const status = (() => {
89+
if (typeof err === 'object' && err !== null) {
90+
const rec = err as Record<string, unknown>;
91+
if (typeof rec.status === 'number') return rec.status as number;
92+
if (typeof rec.statusCode === 'number') return rec.statusCode as number;
93+
}
94+
return 0;
95+
})();
96+
const retryable = status === 429 || status >= 500;
97+
if (!retryable || attempt >= maxRetries) {
98+
throw err;
99+
}
100+
await sleep(Math.min(delay, maxDelay));
101+
delay = Math.min(maxDelay, delay * backoff);
102+
attempt += 1;
103+
}
104+
}
105+
}
106+
26107
function makeCacheKey(input: {
27108
provider: string;
28109
model: string | null | undefined;
@@ -102,30 +183,108 @@ export async function POST(req: NextRequest) {
102183
instructions: createParams.instructions,
103184
});
104185

186+
const etag = `W/"${cacheKey}"`;
187+
const ifNoneMatch = req.headers.get('if-none-match');
188+
105189
const cachedBuffer = ttsAudioCache.get(cacheKey);
106190
if (cachedBuffer) {
191+
if (ifNoneMatch && (ifNoneMatch.includes(cacheKey) || ifNoneMatch.includes(etag))) {
192+
return new NextResponse(null, {
193+
status: 304,
194+
headers: {
195+
'ETag': etag,
196+
'Cache-Control': 'private, max-age=1800',
197+
'Vary': 'x-tts-provider, x-openai-key, x-openai-base-url'
198+
}
199+
});
200+
}
107201
console.log('TTS cache HIT for key:', cacheKey.slice(0, 8));
108202
return new NextResponse(cachedBuffer, {
109203
headers: {
110204
'Content-Type': contentType,
111205
'X-Cache': 'HIT',
206+
'ETag': etag,
207+
'Content-Length': String(cachedBuffer.byteLength),
208+
'Cache-Control': 'private, max-age=1800',
209+
'Vary': 'x-tts-provider, x-openai-key, x-openai-base-url'
112210
}
113211
});
114212
}
115213

116-
const response = await openai.audio.speech.create(createParams as SpeechCreateParams, { signal: req.signal });
214+
// De-duplicate identical in-flight requests and bound upstream concurrency
215+
const existing = inflightRequests.get(cacheKey);
216+
if (existing) {
217+
console.log('TTS in-flight JOIN for key:', cacheKey.slice(0, 8));
218+
existing.consumers += 1;
219+
220+
const onAbort = (_evt: Event) => {
221+
existing.consumers = Math.max(0, existing.consumers - 1);
222+
if (existing.consumers === 0) {
223+
existing.controller.abort();
224+
}
225+
};
226+
req.signal.addEventListener('abort', onAbort, { once: true });
227+
228+
try {
229+
const buffer = await existing.promise;
230+
return new NextResponse(buffer, {
231+
headers: {
232+
'Content-Type': contentType,
233+
'X-Cache': 'INFLIGHT',
234+
'ETag': etag,
235+
'Content-Length': String(buffer.byteLength),
236+
'Cache-Control': 'private, max-age=1800',
237+
'Vary': 'x-tts-provider, x-openai-key, x-openai-base-url'
238+
}
239+
});
240+
} finally {
241+
try { req.signal.removeEventListener('abort', onAbort); } catch {}
242+
}
243+
}
244+
245+
const controller = new AbortController();
246+
const entry: InflightEntry = {
247+
controller,
248+
consumers: 1,
249+
promise: (async () => {
250+
const release = await ttsSemaphore.acquire();
251+
try {
252+
const buffer = await fetchTTSBufferWithRetry(openai, createParams, controller.signal);
253+
// Save to cache
254+
ttsAudioCache.set(cacheKey, buffer);
255+
return buffer;
256+
} finally {
257+
release();
258+
inflightRequests.delete(cacheKey);
259+
}
260+
})()
261+
};
117262

118-
// Read the audio data as an ArrayBuffer and return it with appropriate headers
119-
// This will also be aborted if the client cancels
120-
const buffer = await response.arrayBuffer();
263+
inflightRequests.set(cacheKey, entry);
121264

122-
// Save to cache
123-
ttsAudioCache.set(cacheKey, buffer);
265+
const onAbort = (_evt: Event) => {
266+
entry.consumers = Math.max(0, entry.consumers - 1);
267+
if (entry.consumers === 0) {
268+
entry.controller.abort();
269+
}
270+
};
271+
req.signal.addEventListener('abort', onAbort, { once: true });
272+
273+
let buffer: ArrayBuffer;
274+
try {
275+
buffer = await entry.promise;
276+
} finally {
277+
try { req.signal.removeEventListener('abort', onAbort); } catch {}
278+
}
124279

125280
return new NextResponse(buffer, {
126281
headers: {
127282
'Content-Type': contentType,
128-
'X-Cache': 'MISS'
283+
'X-Cache': 'MISS',
284+
'ETag': etag,
285+
'Content-Length': String(buffer.byteLength),
286+
'Cache-Control': 'private, max-age=1800',
287+
'Vary': 'x-tts-provider, x-openai-key, x-openai-base-url'
129288
}
130289
});
131290
} catch (error) {

tests/folders.spec.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { test, expect } from '@playwright/test';
2-
import { setupTest, uploadFiles, ensureDocumentsListed, deleteAllLocalDocuments } from './helpers';
2+
import { setupTest, uploadFiles, ensureDocumentsListed, deleteAllLocalDocuments, waitForDocumentListHintPersist } from './helpers';
33

44
test.describe('Document folders and hint persistence', () => {
55
test.beforeEach(async ({ page }) => {
@@ -88,6 +88,9 @@ test.describe('Document folders and hint persistence', () => {
8888
// Hint should disappear
8989
await expect(hint).toHaveCount(0);
9090

91+
// Ensure the dismissal has been persisted to IndexedDB before reloading
92+
await waitForDocumentListHintPersist(page, false);
93+
9194
// Reload and ensure it remains dismissed
9295
await page.reload();
9396
await page.waitForLoadState('networkidle');

tests/helpers.ts

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,8 @@ export async function waitAndClickPlay(page: Page) {
4949
await expect(page.getByRole('button', { name: 'Play' })).toBeVisible();
5050
// Play the TTS by clicking the button
5151
await page.getByRole('button', { name: 'Play' }).click();
52-
53-
// Expect for buttons to be disabled
54-
await expect(page.locator('button[aria-label="Skip forward"][disabled]')).toBeVisible();
55-
await expect(page.locator('button[aria-label="Skip backward"][disabled]')).toBeVisible();
56-
57-
// Wait for the TTS to stop processing
58-
await Promise.all([
59-
page.waitForSelector('button[aria-label="Skip forward"]:not([disabled])', { timeout: 45000 }),
60-
page.waitForSelector('button[aria-label="Skip backward"]:not([disabled])', { timeout: 45000 }),
61-
]);
62-
63-
await page.waitForFunction(() => {
64-
return navigator.mediaSession?.playbackState === 'playing';
65-
});
52+
// Use resilient processing transition helper (tolerates fast completion)
53+
await expectProcessingTransition(page);
6654
}
6755

6856
/**
@@ -93,7 +81,7 @@ export async function pauseTTSAndVerify(page: Page) {
9381
export async function setupTest(page: Page) {
9482
// Navigate to the home page before each test
9583
await page.goto('/');
96-
//await page.waitForLoadState('networkidle');
84+
await page.waitForLoadState('networkidle');
9785

9886
// If running in CI, select the "Custom OpenAI-Like" model and "Deepinfra" provider
9987
if (process.env.CI) {
@@ -323,7 +311,37 @@ export async function changeNativeSpeedAndAssert(page: Page, newSpeed: number) {
323311

324312
// Expect navigator.mediaSession.playbackState to equal given state
325313
export async function expectMediaState(page: Page, state: 'playing' | 'paused') {
326-
await page.waitForFunction((s) => navigator.mediaSession?.playbackState === s, state, { timeout: 20000 });
314+
// WebKit (and sometimes other engines) may not reliably update navigator.mediaSession.playbackState.
315+
// Fallback heuristics:
316+
// 1. Prefer mediaSession if it matches desired state.
317+
// 2. Otherwise inspect any <audio> element: use paused flag and currentTime progression.
318+
// 3. Allow short grace period for first frame to advance.
319+
// 4. If neither detectable, keep polling until timeout.
320+
await page.waitForFunction((desired) => {
321+
try {
322+
const msState = (navigator.mediaSession && navigator.mediaSession.playbackState) || '';
323+
if (msState === desired) return true;
324+
325+
const audio: HTMLAudioElement | null = document.querySelector('audio');
326+
if (audio) {
327+
// Track advancement by storing last time on the element dataset
328+
const last = parseFloat(audio.dataset.lastTime || '0');
329+
const curr = audio.currentTime;
330+
audio.dataset.lastTime = String(curr);
331+
332+
if (desired === 'playing') {
333+
// Consider playing if not paused AND time has advanced at least a tiny amount
334+
if (!audio.paused && curr > 0 && curr > last) return true;
335+
} else {
336+
// paused target
337+
if (audio.paused) return true;
338+
}
339+
}
340+
return false;
341+
} catch {
342+
return false;
343+
}
344+
}, state, { timeout: 25000 });
327345
}
328346

329347
// Use Navigator to go to a specific page number (PDF)
@@ -352,4 +370,36 @@ export async function countRenderedTextLayers(page: Page): Promise<number> {
352370
// Force viewport resize to trigger resize hooks (e.g., EPUB)
353371
export async function triggerViewportResize(page: Page, width: number, height: number) {
354372
await page.setViewportSize({ width, height });
373+
}
374+
375+
// Wait for DocumentListState.showHint to persist in IndexedDB 'config' store
376+
export async function waitForDocumentListHintPersist(page: Page, expected: boolean) {
377+
await page.waitForFunction(async (exp) => {
378+
try {
379+
const openDb = () => new Promise<IDBDatabase>((resolve, reject) => {
380+
const req = indexedDB.open('openreader-db');
381+
req.onsuccess = () => resolve(req.result);
382+
req.onerror = () => reject(req.error);
383+
});
384+
const db = await openDb();
385+
const readConfig = () => new Promise<any>((resolve, reject) => {
386+
const tx = db.transaction(['config'], 'readonly');
387+
const store = tx.objectStore('config');
388+
const getReq = store.get('documentListState');
389+
getReq.onsuccess = () => resolve(getReq.result);
390+
getReq.onerror = () => reject(getReq.error);
391+
});
392+
const item = await readConfig();
393+
db.close();
394+
if (!item || typeof item.value !== 'string') return false;
395+
try {
396+
const parsed = JSON.parse(item.value);
397+
return parsed && parsed.showHint === exp;
398+
} catch {
399+
return false;
400+
}
401+
} catch {
402+
return false;
403+
}
404+
}, expected, { timeout: 5000 });
355405
}

0 commit comments

Comments
 (0)