diff --git a/CHANGELOG.md b/CHANGELOG.md index 605aabbf3..2446e5f49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,2 @@ - Add an authPolicy callback to CallableOptions for reusable auth middleware as well as helper auth policies (#1650) +- Multiple breaking changes to the not-yet-announced streaming feature for Callable Functions (#1652) diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index ef77a6fa6..86e1b8696 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -770,7 +770,7 @@ describe("onCallHandler", () => { cors: { origin: true, methods: "POST" }, }, (req, resp) => { - resp.write("hello"); + resp.sendChunk("hello"); return "world"; }, "gcfv2" @@ -840,10 +840,10 @@ describe("onCallHandler", () => { { cors: { origin: true, methods: "POST" }, }, - (req, resp) => { - resp.write("initial message"); - mockReq.emit("close"); - resp.write("should not be sent"); + async (req, resp) => { + await resp.sendChunk("initial message"); + await mockReq.emit("close"); + await resp.sendChunk("should not be sent"); return "done"; }, "gcfv2" @@ -908,7 +908,7 @@ describe("onCallHandler", () => { }, async (resp, res) => { await new Promise((resolve) => setTimeout(resolve, 3_000)); - res.write("hello"); + res.sendChunk("hello"); await new Promise((resolve) => setTimeout(resolve, 3_000)); return "done"; }, diff --git a/spec/helper.ts b/spec/helper.ts index 5c8ca0c76..04829be38 100644 --- a/spec/helper.ts +++ b/spec/helper.ts @@ -84,8 +84,11 @@ export function runHandler( } } - public write(writeBody: any) { + public write(writeBody: any, cb?: () => void) { this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody; + if (cb) { + setImmediate(cb); + } return true; } diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 24300cf9d..a6989f130 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -141,23 +141,30 @@ export interface CallableRequest { * The raw request handled by the callable. */ rawRequest: Request; + + /** + * Whether this is a streaming request. + * Code can be optimized by not trying to generate a stream of chunks to + * call response.sendChunk on if request.acceptsStreaming is false. + * It is always safe, however, to call response.sendChunk as this will + * noop if acceptsStreaming is false. + */ + acceptsStreaming: boolean; } /** - * CallableProxyResponse exposes subset of express.Response object - * to allow writing partial, streaming responses back to the client. + * CallableProxyResponse allows streaming response chunks and listening to signals + * triggered in events such as a disconnect. */ -export interface CallableProxyResponse { +export interface CallableResponse { /** * Writes a chunk of the response body to the client. This method can be called * multiple times to stream data progressively. + * Returns a promise of whether the data was written. This can be false, for example, + * if the request was not a streaming request. Rejects if there is a network error. */ - write: express.Response["write"]; - /** - * Indicates whether the client has requested and can handle streaming responses. - * This should be checked before attempting to stream data to avoid compatibility issues. - */ - acceptsStreaming: boolean; + sendChunk: (chunk: T) => Promise; + /** * An AbortSignal that is triggered when the client disconnects or the * request is terminated prematurely. @@ -586,13 +593,9 @@ async function checkTokens( auth: "INVALID", }; - await Promise.all([ - Promise.resolve().then(async () => { - verifications.auth = await checkAuthToken(req, ctx); - }), - Promise.resolve().then(async () => { - verifications.app = await checkAppCheckToken(req, ctx, options); - }), + [verifications.auth, verifications.app] = await Promise.all([ + checkAuthToken(req, ctx), + checkAppCheckToken(req, ctx, options), ]); const logPayload = { @@ -697,9 +700,9 @@ async function checkAppCheckToken( } type v1CallableHandler = (data: any, context: CallableContext) => any | Promise; -type v2CallableHandler = ( +type v2CallableHandler = ( request: CallableRequest, - response?: CallableProxyResponse + response?: CallableResponse ) => Res; /** @internal **/ @@ -718,9 +721,9 @@ export interface CallableOptions { } /** @internal */ -export function onCallHandler( +export function onCallHandler( options: CallableOptions, - handler: v1CallableHandler | v2CallableHandler, + handler: v1CallableHandler | v2CallableHandler, version: "gcfv1" | "gcfv2" ): (req: Request, res: express.Response) => Promise { const wrapped = wrapOnCallHandler(options, handler, version); @@ -739,9 +742,9 @@ function encodeSSE(data: unknown): string { } /** @internal */ -function wrapOnCallHandler( +function wrapOnCallHandler( options: CallableOptions, - handler: v1CallableHandler | v2CallableHandler, + handler: v1CallableHandler | v2CallableHandler, version: "gcfv1" | "gcfv2" ): (req: Request, res: express.Response) => Promise { return async (req: Request, res: express.Response): Promise => { @@ -855,27 +858,41 @@ function wrapOnCallHandler( const arg: CallableRequest = { ...context, data, + acceptsStreaming, }; - const responseProxy: CallableProxyResponse = { - write(chunk): boolean { + const responseProxy: CallableResponse = { + sendChunk(chunk: Stream): Promise { // if client doesn't accept sse-protocol, response.write() is no-op. if (!acceptsStreaming) { - return false; + return Promise.resolve(false); } // if connection is already closed, response.write() is no-op. if (abortController.signal.aborted) { - return false; + return Promise.resolve(false); } const formattedData = encodeSSE({ message: chunk }); - const wrote = res.write(formattedData); + let resolve: (wrote: boolean) => void; + let reject: (err: Error) => void; + const p = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + const wrote = res.write(formattedData, (error) => { + if (error) { + reject(error); + return; + } + resolve(wrote); + }); + // Reset heartbeat timer after successful write if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) { scheduleHeartbeat(); } - return wrote; + + return p; }, - acceptsStreaming, signal: abortController.signal, }; if (acceptsStreaming) { diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 0a5b3e8c3..e59ce9704 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -33,7 +33,7 @@ import { isDebugFeatureEnabled } from "../../common/debug"; import { ResetValue } from "../../common/options"; import { CallableRequest, - CallableProxyResponse, + CallableResponse, FunctionsErrorCode, HttpsError, onCallHandler, @@ -258,12 +258,17 @@ export type HttpsFunction = (( /** * Creates a callable method for clients to call using a Firebase SDK. */ -export interface CallableFunction extends HttpsFunction { +export interface CallableFunction extends HttpsFunction { /** Executes the handler function with the provided data as input. Used for unit testing. * @param data - An input for the handler function. * @returns The output of the handler function. */ - run(data: CallableRequest): Return; + run(request: CallableRequest): Return; + + stream( + request: CallableRequest, + response: CallableResponse + ): { stream: AsyncIterator; output: Return }; } /** @@ -387,22 +392,22 @@ export function onRequest( * @param handler - A function that takes a {@link https.CallableRequest}. * @returns A function that you can export and deploy. */ -export function onCall>( +export function onCall, Stream = unknown>( opts: CallableOptions, - handler: (request: CallableRequest, response?: CallableProxyResponse) => Return -): CallableFunction ? Return : Promise>; + handler: (request: CallableRequest, response?: CallableResponse) => Return +): CallableFunction ? Return : Promise, Stream>; /** * Declares a callable method for clients to call using a Firebase SDK. * @param handler - A function that takes a {@link https.CallableRequest}. * @returns A function that you can export and deploy. */ -export function onCall>( - handler: (request: CallableRequest, response?: CallableProxyResponse) => Return +export function onCall, Stream = unknown>( + handler: (request: CallableRequest, response?: CallableResponse) => Return ): CallableFunction ? Return : Promise>; -export function onCall>( +export function onCall, Stream = unknown>( optsOrHandler: CallableOptions | ((request: CallableRequest) => Return), - handler?: (request: CallableRequest, response?: CallableProxyResponse) => Return + handler?: (request: CallableRequest, response?: CallableResponse) => Return ): CallableFunction ? Return : Promise> { let opts: CallableOptions; if (arguments.length === 1) { @@ -421,7 +426,7 @@ export function onCall>( } // fix the length of handler to make the call to handler consistent - const fixedLen = (req: CallableRequest, resp?: CallableProxyResponse) => handler(req, resp); + const fixedLen = (req: CallableRequest, resp?: CallableResponse) => handler(req, resp); let func: any = onCallHandler( { cors: { origin, methods: "POST" }, @@ -474,6 +479,17 @@ export function onCall>( callableTrigger: {}, }; + // TODO: in the next major version, do auth/appcheck in these helper methods too. func.run = withInit(handler); + func.stream = () => { + return { + stream: { + next(): Promise> { + return Promise.reject("Coming soon"); + }, + }, + output: Promise.reject("Coming soon"), + }; + }; return func; }