Skip to content

Commit 7ccd6ce

Browse files
authored
fix timeout handling for fetch-transport read sessions (#68)
1 parent fb139b6 commit 7ccd6ce

File tree

2 files changed

+59
-46
lines changed

2 files changed

+59
-46
lines changed

.changeset/plenty-cats-cheat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@s2-dev/streamstore": patch
3+
---
4+
5+
Fix timeout logic for resumable read sessions with fetch transport

packages/streamstore/src/lib/stream/transport/fetch/index.ts

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -212,61 +212,69 @@ export class FetchReadSession<Format extends "string" | "bytes" = "string">
212212
const reader = eventStream.getReader();
213213
let done = false;
214214

215+
// Sentinel value to indicate timeout check needed (not actual timeout)
216+
const TIMEOUT_CHECK = Symbol("timeout-check");
217+
215218
super({
216219
pull: async (controller) => {
217220
if (done) {
218221
controller.close();
219222
return;
220223
}
221224

222-
// Check for ping timeout before reading
223-
const now = performance.now();
224-
const timeSinceLastPingMs = now - lastPingTimeMs;
225-
if (timeSinceLastPingMs > PING_TIMEOUT_MS) {
226-
const timeoutError = new S2Error({
227-
message: `No ping received for ${Math.floor(timeSinceLastPingMs / 1000)}s (timeout: ${PING_TIMEOUT_MS / 1000}s)`,
228-
status: 408, // Request Timeout
229-
code: "TIMEOUT",
230-
});
231-
debug("ping timeout detected, elapsed=%dms", timeSinceLastPingMs);
232-
controller.enqueue({ ok: false, error: timeoutError });
233-
done = true;
234-
controller.close();
235-
return;
236-
}
237-
238225
try {
239-
// Calculate remaining time until timeout
240-
const remainingTimeMs = PING_TIMEOUT_MS - timeSinceLastPingMs;
241-
242-
// Race reader.read() against timeout
243-
// This ensures we don't wait forever if server stops sending events
244-
const result = await Promise.race([
245-
reader.read(),
246-
new Promise<never>((_, reject) =>
247-
setTimeout(() => {
248-
const elapsed = performance.now() - lastPingTimeMs;
249-
reject(
250-
new S2Error({
251-
message: `No ping received for ${Math.floor(elapsed / 1000)}s (timeout: ${PING_TIMEOUT_MS / 1000}s)`,
252-
status: 408,
253-
code: "TIMEOUT",
254-
}),
255-
);
256-
}, remainingTimeMs),
257-
),
258-
]);
259-
260-
if (result.done) {
261-
done = true;
262-
// Check if stream ended due to error
263-
if (parserError) {
264-
controller.enqueue({ ok: false, error: parserError });
226+
// Loop to handle timeout checks - pings may arrive while waiting,
227+
// so we need to re-check actual elapsed time when timeout fires
228+
while (true) {
229+
// Check for ping timeout before reading
230+
const now = performance.now();
231+
const timeSinceLastPingMs = now - lastPingTimeMs;
232+
233+
if (timeSinceLastPingMs > PING_TIMEOUT_MS) {
234+
const timeoutError = new S2Error({
235+
message: `No ping received for ${Math.floor(timeSinceLastPingMs / 1000)}s (timeout: ${PING_TIMEOUT_MS / 1000}s)`,
236+
status: 408, // Request Timeout
237+
code: "TIMEOUT",
238+
});
239+
debug("ping timeout detected, elapsed=%dms", timeSinceLastPingMs);
240+
controller.enqueue({ ok: false, error: timeoutError });
241+
done = true;
242+
controller.close();
243+
return;
265244
}
266-
controller.close();
267-
} else {
268-
// Emit successful result
269-
controller.enqueue({ ok: true, value: result.value });
245+
246+
// Calculate remaining time until timeout
247+
const remainingTimeMs = PING_TIMEOUT_MS - timeSinceLastPingMs;
248+
249+
// Race reader.read() against timeout
250+
// Timeout resolves with sentinel instead of rejecting, so we can
251+
// re-check actual elapsed time (pings may have updated lastPingTimeMs)
252+
const result = await Promise.race([
253+
reader.read(),
254+
new Promise<typeof TIMEOUT_CHECK>((resolve) =>
255+
setTimeout(() => resolve(TIMEOUT_CHECK), remainingTimeMs),
256+
),
257+
]);
258+
259+
// If timeout fired, loop to check actual elapsed time
260+
// (pings may have arrived during the wait, updating lastPingTimeMs)
261+
if (result === TIMEOUT_CHECK) {
262+
continue;
263+
}
264+
265+
// Got actual data from reader
266+
if (result.done) {
267+
done = true;
268+
// Check if stream ended due to error
269+
if (parserError) {
270+
controller.enqueue({ ok: false, error: parserError });
271+
}
272+
controller.close();
273+
} else {
274+
// Emit successful result
275+
controller.enqueue({ ok: true, value: result.value });
276+
}
277+
return;
270278
}
271279
} catch (error) {
272280
// Convert unexpected errors to S2Error and emit as error result

0 commit comments

Comments
 (0)