Skip to content

Commit 78e2d0e

Browse files
committed
Fix SSE causing app crashes when inner async loop takes too long
1 parent 12de1b0 commit 78e2d0e

File tree

3 files changed

+133
-2
lines changed

3 files changed

+133
-2
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { LoaderArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { logger } from "~/services/logger.server";
4+
import { sse } from "~/utils/sse";
5+
6+
export async function loader({ request }: LoaderArgs) {
7+
const url = new URL(request.url);
8+
const searchParams = Object.fromEntries(url.searchParams.entries());
9+
10+
const options = z
11+
.object({
12+
minDelay: z.coerce.number().int(),
13+
maxDelay: z.coerce.number().int(),
14+
undefinedProbability: z.coerce.number().min(0).max(1).default(0.1),
15+
})
16+
.parse(searchParams);
17+
18+
logger.debug("Test SSE stream", { options });
19+
20+
let lastSignals = calculateChangeSignals(Date.now());
21+
22+
return sse({
23+
request,
24+
run: async (send, stop) => {
25+
const result = await dateForUpdates(options);
26+
27+
if (!result) {
28+
return stop();
29+
}
30+
31+
const newSignals = calculateChangeSignals(result);
32+
33+
if (lastSignals.ts !== newSignals.ts) {
34+
send({ data: JSON.stringify(newSignals) });
35+
}
36+
37+
lastSignals = newSignals;
38+
},
39+
});
40+
}
41+
42+
async function dateForUpdates(opts: {
43+
minDelay: number;
44+
maxDelay: number;
45+
undefinedProbability: number;
46+
}): Promise<number | undefined> {
47+
// Randomly await between minDelay and maxDelay
48+
await new Promise((resolve) => {
49+
setTimeout(resolve, Math.random() * (opts.maxDelay - opts.minDelay) + opts.minDelay);
50+
});
51+
52+
// There should be about a x% chance that this returns undefined
53+
if (Math.random() < opts.undefinedProbability) {
54+
logger.debug("Test SSE dataForUpdates returning undefined");
55+
56+
return undefined;
57+
}
58+
59+
// Randomly return true or false
60+
return Date.now();
61+
}
62+
63+
function calculateChangeSignals(ts: number) {
64+
return {
65+
ts,
66+
};
67+
}

apps/webapp/app/routes/tests.sse.tsx

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { useLoaderData } from "@remix-run/react";
2+
import { LoaderArgs } from "@remix-run/server-runtime";
3+
import { useEventSource } from "remix-utils";
4+
import { z } from "zod";
5+
6+
export async function loader({ request }: LoaderArgs) {
7+
const url = new URL(request.url);
8+
const params = Object.fromEntries(url.searchParams.entries());
9+
10+
const config = z
11+
.object({
12+
minDelay: z.coerce.number().int().min(0).max(10000).default(1000),
13+
maxDelay: z.coerce.number().int().min(0).max(10000).default(2000),
14+
undefinedProbability: z.coerce.number().min(0).max(1).default(0.1),
15+
})
16+
.parse(params);
17+
18+
return config;
19+
}
20+
21+
export default function SSETest() {
22+
const { minDelay, maxDelay, undefinedProbability } = useLoaderData<typeof loader>();
23+
24+
const events = useEventSource(
25+
`/tests/sse/stream?minDelay=${minDelay}&maxDelay=${maxDelay}&undefinedProbability=${undefinedProbability}`,
26+
{
27+
event: "message",
28+
}
29+
);
30+
31+
return (
32+
<div>
33+
<h2>SSE Test</h2>
34+
<p>{events ?? "No events"}</p>
35+
</div>
36+
);
37+
}

apps/webapp/app/utils/sse.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { eventStream } from "remix-utils";
2+
import { logger } from "~/services/logger.server";
23

34
type SseProps = {
45
request: Request;
@@ -30,12 +31,38 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
3031
};
3132

3233
return eventStream(request.signal, (send) => {
34+
const safeSend = (args: { event?: string; data: string }) => {
35+
try {
36+
send(args);
37+
} catch (error) {
38+
if (error instanceof Error) {
39+
if (error.name !== "TypeError") {
40+
logger.debug("Error sending SSE, aborting", {
41+
error: {
42+
name: error.name,
43+
message: error.message,
44+
stack: error.stack,
45+
},
46+
args,
47+
});
48+
}
49+
} else {
50+
logger.debug("Uknown error sending SSE, aborting", {
51+
error,
52+
args,
53+
});
54+
}
55+
56+
abort();
57+
}
58+
};
59+
3360
pinger = setInterval(() => {
34-
send({ event: "ping", data: new Date().toISOString() });
61+
safeSend({ event: "ping", data: new Date().toISOString() });
3562
}, pingInterval);
3663

3764
updater = setInterval(async () => {
38-
run(send, abort);
65+
run(safeSend, abort);
3966
}, updateInterval);
4067

4168
return abort;

0 commit comments

Comments
 (0)