@@ -141,23 +141,30 @@ export interface CallableRequest<T = any> {
141141 * The raw request handled by the callable.
142142 */
143143 rawRequest : Request ;
144+
145+ /**
146+ * Whether this is a streaming request.
147+ * Code can be optimized by not trying to generate a stream of chunks to
148+ * call response.sendChunk on if request.acceptsStreaming is false.
149+ * It is always safe, however, to call response.sendChunk as this will
150+ * noop if acceptsStreaming is false.
151+ */
152+ acceptsStreaming : boolean ;
144153}
145154
146155/**
147- * CallableProxyResponse exposes subset of express.Response object
148- * to allow writing partial, streaming responses back to the client .
156+ * CallableProxyResponse allows streaming response chunks and listening to signals
157+ * triggered in events such as a disconnect .
149158 */
150- export interface CallableProxyResponse {
159+ export interface CallableResponse < T = string > {
151160 /**
152161 * Writes a chunk of the response body to the client. This method can be called
153162 * multiple times to stream data progressively.
163+ * Returns a promise of whether the data was written. This can be false, for example,
164+ * if the request was not a streaming request. Rejects if there is a network error.
154165 */
155- write : express . Response [ "write" ] ;
156- /**
157- * Indicates whether the client has requested and can handle streaming responses.
158- * This should be checked before attempting to stream data to avoid compatibility issues.
159- */
160- acceptsStreaming : boolean ;
166+ sendChunk : ( chunk : T ) => Promise < boolean > ;
167+
161168 /**
162169 * An AbortSignal that is triggered when the client disconnects or the
163170 * request is terminated prematurely.
@@ -586,13 +593,9 @@ async function checkTokens(
586593 auth : "INVALID" ,
587594 } ;
588595
589- await Promise . all ( [
590- Promise . resolve ( ) . then ( async ( ) => {
591- verifications . auth = await checkAuthToken ( req , ctx ) ;
592- } ) ,
593- Promise . resolve ( ) . then ( async ( ) => {
594- verifications . app = await checkAppCheckToken ( req , ctx , options ) ;
595- } ) ,
596+ [ verifications . auth , verifications . app ] = await Promise . all ( [
597+ checkAuthToken ( req , ctx ) ,
598+ checkAppCheckToken ( req , ctx , options ) ,
596599 ] ) ;
597600
598601 const logPayload = {
@@ -697,9 +700,9 @@ async function checkAppCheckToken(
697700}
698701
699702type v1CallableHandler = ( data : any , context : CallableContext ) => any | Promise < any > ;
700- type v2CallableHandler < Req , Res > = (
703+ type v2CallableHandler < Req , Res , Stream > = (
701704 request : CallableRequest < Req > ,
702- response ?: CallableProxyResponse
705+ response ?: CallableResponse < Stream >
703706) => Res ;
704707
705708/** @internal **/
@@ -717,9 +720,9 @@ export interface CallableOptions {
717720}
718721
719722/** @internal */
720- export function onCallHandler < Req = any , Res = any > (
723+ export function onCallHandler < Req = any , Res = any , Stream = string > (
721724 options : CallableOptions ,
722- handler : v1CallableHandler | v2CallableHandler < Req , Res > ,
725+ handler : v1CallableHandler | v2CallableHandler < Req , Res , Stream > ,
723726 version : "gcfv1" | "gcfv2"
724727) : ( req : Request , res : express . Response ) => Promise < void > {
725728 const wrapped = wrapOnCallHandler ( options , handler , version ) ;
@@ -738,9 +741,9 @@ function encodeSSE(data: unknown): string {
738741}
739742
740743/** @internal */
741- function wrapOnCallHandler < Req = any , Res = any > (
744+ function wrapOnCallHandler < Req = any , Res = any , Stream = string > (
742745 options : CallableOptions ,
743- handler : v1CallableHandler | v2CallableHandler < Req , Res > ,
746+ handler : v1CallableHandler | v2CallableHandler < Req , Res , Stream > ,
744747 version : "gcfv1" | "gcfv2"
745748) : ( req : Request , res : express . Response ) => Promise < void > {
746749 return async ( req : Request , res : express . Response ) : Promise < void > => {
@@ -848,27 +851,41 @@ function wrapOnCallHandler<Req = any, Res = any>(
848851 const arg : CallableRequest < Req > = {
849852 ...context ,
850853 data,
854+ acceptsStreaming,
851855 } ;
852856
853- const responseProxy : CallableProxyResponse = {
854- write ( chunk ) : boolean {
857+ const responseProxy : CallableResponse < Stream > = {
858+ sendChunk ( chunk : Stream ) : Promise < boolean > {
855859 // if client doesn't accept sse-protocol, response.write() is no-op.
856860 if ( ! acceptsStreaming ) {
857- return false ;
861+ return Promise . resolve ( false ) ;
858862 }
859863 // if connection is already closed, response.write() is no-op.
860864 if ( abortController . signal . aborted ) {
861- return false ;
865+ return Promise . resolve ( false ) ;
862866 }
863867 const formattedData = encodeSSE ( { message : chunk } ) ;
864- const wrote = res . write ( formattedData ) ;
868+ let resolve : ( wrote : boolean ) => void ;
869+ let reject : ( err : Error ) => void ;
870+ const p = new Promise < boolean > ( ( res , rej ) => {
871+ resolve = res ;
872+ reject = rej ;
873+ } ) ;
874+ const wrote = res . write ( formattedData , ( error ) => {
875+ if ( error ) {
876+ reject ( error ) ;
877+ return ;
878+ }
879+ resolve ( wrote ) ;
880+ } ) ;
881+
865882 // Reset heartbeat timer after successful write
866883 if ( wrote && heartbeatInterval !== null && heartbeatSeconds > 0 ) {
867884 scheduleHeartbeat ( ) ;
868885 }
869- return wrote ;
886+
887+ return p ;
870888 } ,
871- acceptsStreaming,
872889 signal : abortController . signal ,
873890 } ;
874891 if ( acceptsStreaming ) {
0 commit comments