Skip to content

Commit eb2ea32

Browse files
committed
Implement heartbeat and abort signal.
1 parent cd88163 commit eb2ea32

File tree

2 files changed

+70
-5
lines changed

2 files changed

+70
-5
lines changed

src/common/providers/https.ts

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ const JWT_REGEX = /^[a-zA-Z0-9\-_=]+?\.[a-zA-Z0-9\-_=]+?\.([a-zA-Z0-9\-_=]+)?$/;
4040
export const CALLABLE_AUTH_HEADER = "x-callable-context-auth";
4141
/** @internal */
4242
export const ORIGINAL_AUTH_HEADER = "x-original-auth";
43+
/** @internal */
44+
export const DEFAULT_HEARTBEAT_SECONDS = 30;
4345

4446
/** An express request with the wire format representation of the request body. */
4547
export interface Request extends express.Request {
@@ -146,8 +148,21 @@ export interface CallableRequest<T = any> {
146148
* to allow writing partial, streaming responses back to the client.
147149
*/
148150
export interface CallableProxyResponse {
151+
/**
152+
* Writes a chunk of the response body to the client. This method can be called
153+
* multiple times to stream data progressively.
154+
*/
149155
write: express.Response["write"];
156+
/**
157+
* Indicates whether the client has requested and can handle streaming responses.
158+
* This should be checked before attempting to stream data to avoid compatibility issues.
159+
*/
150160
acceptsStreaming: boolean;
161+
/**
162+
* An AbortSignal that is triggered when the client disconnects or the
163+
* request is terminated prematurely.
164+
*/
165+
signal: AbortSignal;
151166
}
152167

153168
/**
@@ -692,6 +707,13 @@ export interface CallableOptions {
692707
cors: cors.CorsOptions;
693708
enforceAppCheck?: boolean;
694709
consumeAppCheckToken?: boolean;
710+
/**
711+
* Time in seconds between sending heartbeat messages to keep the connection
712+
* alive. Set to `null` to disable heartbeats.
713+
*
714+
* Defaults to 30 seconds.
715+
*/
716+
heartbeatSeconds?: number | null
695717
}
696718

697719
/** @internal */
@@ -722,6 +744,22 @@ function wrapOnCallHandler<Req = any, Res = any>(
722744
version: "gcfv1" | "gcfv2"
723745
): (req: Request, res: express.Response) => Promise<void> {
724746
return async (req: Request, res: express.Response): Promise<void> => {
747+
const abortController = new AbortController();
748+
let heartbeatInterval: NodeJS.Timeout | null = null;
749+
750+
const cleanup = () => {
751+
if (heartbeatInterval) {
752+
clearInterval(heartbeatInterval);
753+
heartbeatInterval = null;
754+
}
755+
req.removeAllListeners('close');
756+
};
757+
758+
req.on('close', () => {
759+
cleanup()
760+
abortController.abort();
761+
});
762+
725763
try {
726764
if (!isValidRequest(req)) {
727765
logger.error("Invalid request, unable to process.");
@@ -791,24 +829,41 @@ function wrapOnCallHandler<Req = any, Res = any>(
791829
...context,
792830
data,
793831
};
794-
// TODO: set up optional heartbeat
795832
const responseProxy: CallableProxyResponse = {
796833
write(chunk): boolean {
797-
if (acceptsStreaming) {
798-
const formattedData = encodeSSE({ message: chunk });
799-
return res.write(formattedData);
800-
}
801834
// if client doesn't accept sse-protocol, response.write() is no-op.
835+
if (!acceptsStreaming) {
836+
return false
837+
}
838+
// if connection is already closed, response.write() is no-op.
839+
if (abortController.signal.aborted) {
840+
return false
841+
}
842+
const formattedData = encodeSSE({ message: chunk });
843+
return res.write(formattedData);
802844
},
803845
acceptsStreaming,
846+
signal: abortController.signal
804847
};
805848
if (acceptsStreaming) {
806849
// SSE always responds with 200
807850
res.status(200);
851+
const heartbeatSeconds = options.heartbeatSeconds ?? DEFAULT_HEARTBEAT_SECONDS;
852+
if (heartbeatSeconds !== null && heartbeatSeconds > 0) {
853+
heartbeatInterval = setInterval(
854+
() => res.write(": ping\n"),
855+
heartbeatSeconds * 1000
856+
);
857+
}
808858
}
809859
// For some reason the type system isn't picking up that the handler
810860
// is a one argument function.
811861
result = await (handler as any)(arg, responseProxy);
862+
863+
if (heartbeatInterval) {
864+
clearInterval(heartbeatInterval);
865+
heartbeatInterval = null;
866+
}
812867
}
813868

814869
// Encode the result as JSON to preserve types like Dates.
@@ -837,6 +892,8 @@ function wrapOnCallHandler<Req = any, Res = any>(
837892
} else {
838893
res.status(status).send(body);
839894
}
895+
} finally {
896+
cleanup()
840897
}
841898
};
842899
}

src/v2/providers/https.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ export interface CallableOptions extends HttpsOptions {
198198
* further decisions, such as requiring additional security checks or rejecting the request.
199199
*/
200200
consumeAppCheckToken?: boolean;
201+
202+
/**
203+
* Time in seconds between sending heartbeat messages to keep the connection
204+
* alive. Set to `null` to disable heartbeats.
205+
*
206+
* Defaults to 30 seconds.
207+
*/
208+
heartbeatSeconds?: number | null
201209
}
202210

203211
/**

0 commit comments

Comments
 (0)