Skip to content

Commit 9e0df98

Browse files
committed
Only send heartbeat when data hasn't been written in the given interval.
1 parent fe8a030 commit 9e0df98

File tree

3 files changed

+65
-10
lines changed

3 files changed

+65
-10
lines changed

spec/common/providers/https.spec.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,37 @@ describe("onCallHandler", () => {
864864
const handlerPromise = runHandler(fn, mockReq as any);
865865
await clock.tickAsync(11_000);
866866
const resp = await handlerPromise;
867-
expect(resp.body).to.include(': ping\n: ping\ndata: {"result":"done"}');
867+
const data = [": ping", ": ping", `data: {"result":"done"}`];
868+
expect(resp.body).to.equal([...data, ""].join("\n"));
869+
});
870+
871+
it("doesn't send heartbeat messages if user writes data", async () => {
872+
const mockReq = mockRequest(
873+
{ message: "test heartbeat" },
874+
"application/json",
875+
{},
876+
{ accept: "text/event-stream" }
877+
);
878+
879+
const fn = https.onCallHandler(
880+
{
881+
cors: { origin: true, methods: "POST" },
882+
heartbeatSeconds: 5,
883+
},
884+
async (resp, res) => {
885+
await new Promise((resolve) => setTimeout(resolve, 3_000));
886+
res.write("hello");
887+
await new Promise((resolve) => setTimeout(resolve, 3_000));
888+
return "done";
889+
},
890+
"gcfv2"
891+
);
892+
893+
const handlerPromise = runHandler(fn, mockReq as any);
894+
await clock.tickAsync(10_000);
895+
const resp = await handlerPromise;
896+
const data = [`data: {"message":"hello"}`, `data: {"result":"done"}`];
897+
expect(resp.body).to.equal([...data, ""].join("\n"));
868898
});
869899

870900
it("respects null heartbeatSeconds option", async () => {
@@ -890,7 +920,7 @@ describe("onCallHandler", () => {
890920
const handlerPromise = runHandler(fn, mockReq as any);
891921
await clock.tickAsync(31_000);
892922
const resp = await handlerPromise;
893-
expect(resp.body).to.include('data: {"result":"done"}');
923+
expect(resp.body).to.equal('data: {"result":"done"}\n');
894924
});
895925
});
896926
});

spec/helper.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ export function runHandler(
8686

8787
public write(writeBody: any) {
8888
this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody;
89+
return true;
8990
}
9091

9192
public end() {

src/common/providers/https.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -747,15 +747,27 @@ function wrapOnCallHandler<Req = any, Res = any>(
747747
const abortController = new AbortController();
748748
let heartbeatInterval: NodeJS.Timeout | null = null;
749749

750-
const clearHeartbeatInterval = () => {
750+
const clearScheduledHeartbeat = () => {
751751
if (heartbeatInterval) {
752-
clearInterval(heartbeatInterval);
752+
clearTimeout(heartbeatInterval);
753753
heartbeatInterval = null;
754754
}
755755
};
756756

757+
const scheduleHeartbeat = (heartbeatSeconds: number) => {
758+
clearScheduledHeartbeat(); // Clear any existing timeout
759+
if (!abortController.signal.aborted) {
760+
heartbeatInterval = setTimeout(() => {
761+
if (!abortController.signal.aborted) {
762+
res.write(": ping\n");
763+
scheduleHeartbeat(heartbeatSeconds);
764+
}
765+
}, heartbeatSeconds * 1000);
766+
}
767+
};
768+
757769
req.on("close", () => {
758-
clearHeartbeatInterval();
770+
clearScheduledHeartbeat();
759771
abortController.abort();
760772
});
761773

@@ -828,6 +840,12 @@ function wrapOnCallHandler<Req = any, Res = any>(
828840
...context,
829841
data,
830842
};
843+
844+
const heartbeatSeconds =
845+
options.heartbeatSeconds === undefined
846+
? DEFAULT_HEARTBEAT_SECONDS
847+
: options.heartbeatSeconds;
848+
831849
const responseProxy: CallableProxyResponse = {
832850
write(chunk): boolean {
833851
// if client doesn't accept sse-protocol, response.write() is no-op.
@@ -839,23 +857,29 @@ function wrapOnCallHandler<Req = any, Res = any>(
839857
return false;
840858
}
841859
const formattedData = encodeSSE({ message: chunk });
842-
return res.write(formattedData);
860+
const wrote = res.write(formattedData);
861+
//
862+
// Reset heartbeat timer after successful write
863+
if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) {
864+
scheduleHeartbeat(heartbeatSeconds);
865+
}
866+
return wrote;
843867
},
844868
acceptsStreaming,
845869
signal: abortController.signal,
846870
};
847871
if (acceptsStreaming) {
848872
// SSE always responds with 200
849873
res.status(200);
850-
const heartbeatSeconds = options.heartbeatSeconds ?? DEFAULT_HEARTBEAT_SECONDS;
874+
851875
if (heartbeatSeconds !== null && heartbeatSeconds > 0) {
852-
heartbeatInterval = setInterval(() => res.write(": ping\n"), heartbeatSeconds * 1000);
876+
scheduleHeartbeat(heartbeatSeconds);
853877
}
854878
}
855879
// For some reason the type system isn't picking up that the handler
856880
// is a one argument function.
857881
result = await (handler as any)(arg, responseProxy);
858-
clearHeartbeatInterval();
882+
clearScheduledHeartbeat();
859883
}
860884

861885
if (!abortController.signal.aborted) {
@@ -894,7 +918,7 @@ function wrapOnCallHandler<Req = any, Res = any>(
894918
res.end();
895919
}
896920
} finally {
897-
clearHeartbeatInterval();
921+
clearScheduledHeartbeat();
898922
}
899923
};
900924
}

0 commit comments

Comments
 (0)