Skip to content

Commit cf56623

Browse files
committed
Update code to use pump() patter.
1 parent ebd74cb commit cf56623

File tree

1 file changed

+89
-67
lines changed

1 file changed

+89
-67
lines changed

packages/functions/src/service.ts

Lines changed: 89 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -447,11 +447,6 @@ async function streamAtURL(
447447
};
448448
}
449449

450-
const reader = response.body!.getReader();
451-
const decoder = new TextDecoder();
452-
453-
const pendingLines: string[] = [];
454-
let buffer = '';
455450
let resultResolver: (value: unknown) => void;
456451
let resultRejecter: (reason: unknown) => void;
457452

@@ -461,90 +456,117 @@ async function streamAtURL(
461456
});
462457

463458
options?.signal?.addEventListener('abort', () => {
464-
void reader.cancel();
465459
const error = new FunctionsError(
466460
'cancelled',
467461
'Request was cancelled.'
468462
);
469463
resultRejecter(error);
470464
});
471465

472-
const stream = {
473-
[Symbol.asyncIterator]() {
466+
const processLine = (line: string, controller: ReadableStreamDefaultController): void => {
467+
// ignore all other lines (newline, comments, etc.)
468+
if (!line.startsWith('data: ')) {
469+
return;
470+
}
471+
try {
472+
// Skip 'data: ' (5 chars)
473+
const jsonData = JSON.parse(line.slice(6));
474+
if ('result' in jsonData) {
475+
resultResolver(decode(jsonData.result));
476+
return;
477+
}
478+
if ('message' in jsonData) {
479+
controller.enqueue(decode(jsonData.message));
480+
return;
481+
}
482+
if ('error' in jsonData) {
483+
const error = _errorForResponse(0, jsonData);
484+
controller.error(error);
485+
resultRejecter(error);
486+
return;
487+
}
488+
} catch (error) {
489+
if (error instanceof FunctionsError) {
490+
controller.error(error);
491+
resultRejecter(error);
492+
return;
493+
}
494+
// ignore other parsing errors
495+
}
496+
};
474497

475-
const processLine = (line: string | undefined): { done: boolean, value: unknown } | null => {
476-
// ignore all other lines (newline, comments, etc.)
477-
if (!line?.startsWith('data: ')) {
478-
return null;
498+
const reader = response.body!.getReader();
499+
const decoder = new TextDecoder();
500+
const rstream = new ReadableStream({
501+
start(controller) {
502+
let currentText = '';
503+
return pump();
504+
async function pump(): Promise<void> {
505+
if (options?.signal?.aborted) {
506+
const error = new FunctionsError('cancelled', 'Request was cancelled');
507+
controller.error(error);
508+
resultRejecter(error);
509+
return Promise.resolve();
479510
}
480511

481512
try {
482-
const jsonData = JSON.parse(line.slice(6));
483-
if ('result' in jsonData) {
484-
resultResolver(decode(jsonData.result));
485-
return { done: true, value: undefined };
486-
}
487-
if ('message' in jsonData) {
488-
return { done: false, value: decode(jsonData.message) };
489-
}
490-
if ('error' in jsonData) {
491-
const error = _errorForResponse(0, jsonData);
492-
resultRejecter(error);
493-
throw error;
494-
}
495-
return null; // Unrecognize keys. Skip this line.
496-
} catch (error) {
497-
if (error instanceof FunctionsError) {
498-
throw error;
513+
const { value, done } = await reader.read();
514+
if (done) {
515+
if (currentText.trim()) {
516+
processLine(currentText.trim(), controller);
517+
}
518+
controller.close();
519+
return;
499520
}
500-
// ignore other parsing error
501-
return null;
502-
}
503-
};
504-
return {
505-
async next() {
521+
506522
if (options?.signal?.aborted) {
507-
const error = new FunctionsError(
508-
'cancelled',
509-
'Request was cancelled.'
510-
);
523+
const error = new FunctionsError('cancelled', 'Request was cancelled');
524+
controller.error(error);
511525
resultRejecter(error);
512-
throw error;
513-
}
514-
515-
while (pendingLines.length > 0) {
516-
const result = processLine(pendingLines.shift());
517-
if (result) { return result; }
526+
await reader.cancel();
527+
return;
518528
}
519529

520-
while (true) {
521-
const { value, done } = await reader.read();
522-
523-
if (done) {
524-
if (buffer.trim()) {
525-
const result = processLine(buffer);
526-
if (result) { return result; }
527-
}
528-
return { done: true, value: undefined };
529-
}
530-
531-
buffer += decoder.decode(value, { stream: true });
532-
const lines = buffer.split('\n');
533-
buffer = lines.pop() || '';
534-
pendingLines.push(...lines.filter(line => line.trim()));
530+
currentText += decoder.decode(value, { stream: true });
531+
const lines = currentText.split("\n");
532+
currentText = lines.pop() || '';
535533

536-
if (pendingLines.length > 0) {
537-
const result = processLine(pendingLines.shift());
538-
if (result) { return result; }
534+
for (const line of lines) {
535+
if (line.trim()) {
536+
processLine(line.trim(), controller);
539537
}
540538
}
541-
}
542-
};
539+
return pump();
540+
} catch (error) {
541+
const functionsError = error instanceof FunctionsError
542+
? error
543+
: _errorForResponse(0, null);
544+
controller.error(functionsError);
545+
resultRejecter(functionsError);
546+
};
547+
}
548+
},
549+
cancel() {
550+
return reader.cancel();
543551
}
544-
};
552+
});
545553

546554
return {
547-
stream,
555+
stream: {
556+
[Symbol.asyncIterator]() {
557+
const rreader = rstream.getReader();
558+
return {
559+
async next() {
560+
const { value, done } = await rreader.read();
561+
return { value: value as unknown, done };
562+
},
563+
async return() {
564+
await reader.cancel();
565+
return { done: true, value: undefined };
566+
}
567+
};
568+
}
569+
},
548570
data: resultPromise,
549571
};
550572
}

0 commit comments

Comments
 (0)