Skip to content

Commit 40c2f61

Browse files
committed
Fix server decompression sequencing, add tracing
1 parent 575c200 commit 40c2f61

File tree

1 file changed

+23
-1
lines changed

1 file changed

+23
-1
lines changed

packages/grpc-js/src/server-call.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,9 +737,24 @@ export class Http2ServerCallStream<
737737
) {
738738
const decoder = new StreamDecoder();
739739

740+
let readsDone = false;
741+
742+
let pendingMessageProcessing = false;
743+
744+
let pushedEnd = false;
745+
746+
const maybePushEnd = () => {
747+
if (!pushedEnd && readsDone && !pendingMessageProcessing) {
748+
pushedEnd = true;
749+
this.pushOrBufferMessage(readable, null);
750+
}
751+
}
752+
740753
this.stream.on('data', async (data: Buffer) => {
741754
const messages = decoder.write(data);
742755

756+
pendingMessageProcessing = true;
757+
this.stream.pause();
743758
for (const message of messages) {
744759
if (
745760
this.maxReceiveMessageSize !== -1 &&
@@ -763,10 +778,14 @@ export class Http2ServerCallStream<
763778

764779
this.pushOrBufferMessage(readable, decompressedMessage);
765780
}
781+
pendingMessageProcessing = false;
782+
this.stream.resume();
783+
maybePushEnd();
766784
});
767785

768786
this.stream.once('end', () => {
769-
this.pushOrBufferMessage(readable, null);
787+
readsDone = true;
788+
maybePushEnd();
770789
});
771790
}
772791

@@ -810,6 +829,7 @@ export class Http2ServerCallStream<
810829
messageBytes: Buffer | null
811830
) {
812831
if (messageBytes === null) {
832+
trace('Received end of stream');
813833
if (this.canPush) {
814834
readable.push(null);
815835
} else {
@@ -819,6 +839,8 @@ export class Http2ServerCallStream<
819839
return;
820840
}
821841

842+
trace('Received message of length ' + messageBytes.length);
843+
822844
this.isPushPending = true;
823845

824846
try {

0 commit comments

Comments
 (0)