Skip to content

Commit d28863c

Browse files
committed
Support AbortSignal
1 parent 21db516 commit d28863c

File tree

3 files changed

+70
-12
lines changed

3 files changed

+70
-12
lines changed

packages/functions/src/callable.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ describe('Firebase Functions > Stream', () => {
321321

322322
it('successfully streams data and resolves final result', async () => {
323323
const functions = createTestService(app, region);
324-
const mockFetch = sinon.stub(functions, 'fetchImpl' as any);
324+
const mockFetch = sinon.stub(globalThis, 'fetch' as any);
325325

326326
const mockResponse = new ReadableStream({
327327
start(controller) {
@@ -355,7 +355,7 @@ describe('Firebase Functions > Stream', () => {
355355

356356
it('handles network errors', async () => {
357357
const functions = createTestService(app, region);
358-
const mockFetch = sinon.stub(functions, 'fetchImpl' as any);
358+
const mockFetch = sinon.stub(globalThis, 'fetch' as any);
359359

360360
mockFetch.rejects(new Error('Network error'));
361361

@@ -380,7 +380,7 @@ describe('Firebase Functions > Stream', () => {
380380

381381
it('handles server-side errors', async () => {
382382
const functions = createTestService(app, region);
383-
const mockFetch = sinon.stub(functions, 'fetchImpl' as any);
383+
const mockFetch = sinon.stub(globalThis, 'fetch' as any);
384384

385385
const mockResponse = new ReadableStream({
386386
start(controller) {
@@ -429,7 +429,7 @@ describe('Firebase Functions > Stream', () => {
429429
);
430430

431431
const functions = createTestService(app, region, authProvider);
432-
const mockFetch = sinon.stub(functions, 'fetchImpl' as any);
432+
const mockFetch = sinon.stub(globalThis, 'fetch' as any);
433433

434434
const mockResponse = new ReadableStream({
435435
start(controller) {

packages/functions/src/public-types.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,21 @@ export interface HttpsCallableOptions {
6464
limitedUseAppCheckTokens?: boolean;
6565
}
6666

67+
68+
/**
69+
* An interface for metadata about how stream call should be executed.
70+
* @public
71+
*/
72+
export interface HttpsCallableStreamOptions {
73+
/**
74+
* An AbortSignal that can be used to cancel the streaming response. When the signal is aborted,
75+
* both the underlying connection and stream will be terminated.
76+
*/
77+
signal?: AbortSignal
78+
}
79+
80+
81+
6782
/**
6883
* A `Functions` instance.
6984
* @public

packages/functions/src/service.ts

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import {
2020
HttpsCallable,
2121
HttpsCallableResult,
2222
HttpsCallableStreamResult,
23-
HttpsCallableOptions
23+
HttpsCallableOptions,
24+
HttpsCallableStreamOptions
2425
} from './public-types';
2526
import { _errorForResponse, FunctionsError } from './error';
2627
import { ContextProvider } from './context';
@@ -186,8 +187,8 @@ export function httpsCallable<RequestData, ResponseData, StreamData = unknown>(
186187
return call(functionsInstance, name, data, options || {});
187188
};
188189

189-
callable.stream = (data?: RequestData | null) => {
190-
return stream(functionsInstance, name, data);
190+
callable.stream = (data?: RequestData | null, options?: HttpsCallableStreamOptions) => {
191+
return stream(functionsInstance, name, data, options);
191192
};
192193

193194
return callable as HttpsCallable<RequestData, ResponseData, StreamData>;
@@ -207,8 +208,8 @@ export function httpsCallableFromURL<RequestData, ResponseData, StreamData = unk
207208
return callAtURL(functionsInstance, url, data, options || {});
208209
};
209210

210-
callable.stream = (data?: RequestData | null) => {
211-
return streamAtURL(functionsInstance, url, options || {});
211+
callable.stream = (data?: RequestData | null, options?: HttpsCallableStreamOptions) => {
212+
return streamAtURL(functionsInstance, url, options);
212213
};
213214
return callable as HttpsCallable<RequestData, ResponseData, StreamData>;
214215
}
@@ -354,25 +355,29 @@ async function callAtURL(
354355
* Calls a callable function asynchronously and returns a streaming result.
355356
* @param name The name of the callable trigger.
356357
* @param data The data to pass as params to the function.
358+
* @param options Streaming request options.
357359
*/
358360
function stream(
359361
functionsInstance: FunctionsService,
360362
name: string,
361363
data: unknown,
364+
options?: HttpsCallableStreamOptions
362365
): Promise<HttpsCallableStreamResult> {
363366
const url = functionsInstance._url(name);
364-
return streamAtURL(functionsInstance, url, data);
367+
return streamAtURL(functionsInstance, url, options);
365368
}
366369

367370
/**
368371
* Calls a callable function asynchronously and return a streaming result.
369372
* @param url The url of the callable trigger.
370373
* @param data The data to pass as params to the function.
374+
* @param options Streaming request options.
371375
*/
372376
async function streamAtURL(
373377
functionsInstance: FunctionsService,
374378
url: string,
375379
data: unknown,
380+
options?: HttpsCallableStreamOptions
376381
): Promise<HttpsCallableStreamResult> {
377382
// Encode any special types, such as dates, in the input data.
378383
data = encode(data);
@@ -396,12 +401,31 @@ async function streamAtURL(
396401

397402
let response: Response;
398403
try {
399-
response = await functionsInstance.fetchImpl(url, {
404+
response = await fetch(url, {
400405
method: 'POST',
401406
body: JSON.stringify(body),
402-
headers
407+
headers,
408+
signal: options?.signal
403409
});
404410
} catch (e) {
411+
if (e instanceof Error && e.name === 'AbortError') {
412+
const error = new FunctionsError(
413+
'cancelled',
414+
'Request was cancelled.'
415+
);
416+
return {
417+
data: Promise.reject(error),
418+
stream: {
419+
[Symbol.asyncIterator]() {
420+
return {
421+
next() {
422+
return Promise.reject(error);
423+
}
424+
};
425+
}
426+
}
427+
};
428+
}
405429
// This could be an unhandled error on the backend, or it could be a
406430
// network error. There's no way to know, since an unhandled error on the
407431
// backend will fail to set the proper CORS header, and thus will be
@@ -434,10 +458,29 @@ async function streamAtURL(
434458
resultRejecter = reject;
435459
});
436460

461+
// Set up abort handler for the stream
462+
options?.signal?.addEventListener('abort', () => {
463+
reader.cancel();
464+
const error = new FunctionsError(
465+
'cancelled',
466+
'Request was cancelled.'
467+
);
468+
resultRejecter(error);
469+
});
470+
437471
const stream = {
438472
[Symbol.asyncIterator]() {
439473
return {
440474
async next() {
475+
if (options?.signal?.aborted) {
476+
const error = new FunctionsError(
477+
'cancelled',
478+
'Request was cancelled.'
479+
);
480+
resultRejecter(error)
481+
throw error;
482+
}
483+
441484
while (true) {
442485
const { value, done } = await reader.read();
443486
if (done) return { done: true, value: undefined };

0 commit comments

Comments
 (0)