Skip to content

Commit fb2ea3a

Browse files
authored
Merge pull request #1986 from murgatroid99/grpc-js_interceptor_metadata_message_order
grpc-js: Preserve order of metadata, messages, and call end with async interceptors
2 parents 0a40a79 + 86f3ffd commit fb2ea3a

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

packages/grpc-js/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js",
3-
"version": "1.4.4",
3+
"version": "1.4.5",
44
"description": "gRPC Library for Node - pure JS implementation",
55
"homepage": "https://grpc.io/",
66
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

packages/grpc-js/src/call-stream.ts

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,37 @@ export function isInterceptingListener(
149149
}
150150

151151
export class InterceptingListenerImpl implements InterceptingListener {
152+
private processingMetadata = false;
153+
private hasPendingMessage = false;
154+
private pendingMessage: any;
152155
private processingMessage = false;
153156
private pendingStatus: StatusObject | null = null;
154157
constructor(
155158
private listener: FullListener,
156159
private nextListener: InterceptingListener
157160
) {}
158161

162+
private processPendingMessage() {
163+
if (this.hasPendingMessage) {
164+
this.nextListener.onReceiveMessage(this.pendingMessage);
165+
this.pendingMessage = null;
166+
this.hasPendingMessage = false;
167+
}
168+
}
169+
170+
private processPendingStatus() {
171+
if (this.pendingStatus) {
172+
this.nextListener.onReceiveStatus(this.pendingStatus);
173+
}
174+
}
175+
159176
onReceiveMetadata(metadata: Metadata): void {
177+
this.processingMetadata = true;
160178
this.listener.onReceiveMetadata(metadata, (metadata) => {
179+
this.processingMetadata = false;
161180
this.nextListener.onReceiveMetadata(metadata);
181+
this.processPendingMessage();
182+
this.processPendingStatus();
162183
});
163184
}
164185
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -168,15 +189,18 @@ export class InterceptingListenerImpl implements InterceptingListener {
168189
this.processingMessage = true;
169190
this.listener.onReceiveMessage(message, (msg) => {
170191
this.processingMessage = false;
171-
this.nextListener.onReceiveMessage(msg);
172-
if (this.pendingStatus) {
173-
this.nextListener.onReceiveStatus(this.pendingStatus);
192+
if (this.processingMetadata) {
193+
this.pendingMessage = msg;
194+
this.hasPendingMessage = true;
195+
} else {
196+
this.nextListener.onReceiveMessage(msg);
197+
this.processPendingStatus();
174198
}
175199
});
176200
}
177201
onReceiveStatus(status: StatusObject): void {
178202
this.listener.onReceiveStatus(status, (processedStatus) => {
179-
if (this.processingMessage) {
203+
if (this.processingMetadata || this.processingMessage) {
180204
this.pendingStatus = processedStatus;
181205
} else {
182206
this.nextListener.onReceiveStatus(processedStatus);
@@ -283,7 +307,7 @@ export class Http2CallStream implements Call {
283307

284308
private outputStatus() {
285309
/* Precondition: this.finalStatus !== null */
286-
if (!this.statusOutput) {
310+
if (this.listener && !this.statusOutput) {
287311
this.statusOutput = true;
288312
const filteredStatus = this.filterStack.receiveTrailers(
289313
this.finalStatus!
@@ -692,6 +716,7 @@ export class Http2CallStream implements Call {
692716
this.trace('Sending metadata');
693717
this.listener = listener;
694718
this.channel._startCallStream(this, metadata);
719+
this.maybeOutputStatus();
695720
}
696721

697722
private destroyHttp2Stream() {

packages/grpc-js/src/client-interceptors.ts

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,18 @@ export class InterceptingCall implements InterceptingCallInterface {
208208
*/
209209
private requester: FullRequester;
210210
/**
211-
* Indicates that a message has been passed to the listener's onReceiveMessage
212-
* method it has not been passed to the corresponding next callback
211+
* Indicates that metadata has been passed to the requester's start
212+
* method but it has not been passed to the corresponding next callback
213+
*/
214+
private processingMetadata = false;
215+
/**
216+
* Message context for a pending message that is waiting for
217+
*/
218+
private pendingMessageContext: MessageContext | null = null;
219+
private pendingMessage: any;
220+
/**
221+
* Indicates that a message has been passed to the requester's sendMessage
222+
* method but it has not been passed to the corresponding next callback
213223
*/
214224
private processingMessage = false;
215225
/**
@@ -242,6 +252,21 @@ export class InterceptingCall implements InterceptingCallInterface {
242252
getPeer() {
243253
return this.nextCall.getPeer();
244254
}
255+
256+
private processPendingMessage() {
257+
if (this.pendingMessageContext) {
258+
this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
259+
this.pendingMessageContext = null;
260+
this.pendingMessage = null;
261+
}
262+
}
263+
264+
private processPendingHalfClose() {
265+
if (this.pendingHalfClose) {
266+
this.nextCall.halfClose();
267+
}
268+
}
269+
245270
start(
246271
metadata: Metadata,
247272
interceptingListener?: Partial<InterceptingListener>
@@ -257,7 +282,9 @@ export class InterceptingCall implements InterceptingCallInterface {
257282
interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
258283
((status) => {}),
259284
};
285+
this.processingMetadata = true;
260286
this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
287+
this.processingMetadata = false;
261288
let finalInterceptingListener: InterceptingListener;
262289
if (isInterceptingListener(listener)) {
263290
finalInterceptingListener = listener;
@@ -276,16 +303,21 @@ export class InterceptingCall implements InterceptingCallInterface {
276303
);
277304
}
278305
this.nextCall.start(md, finalInterceptingListener);
306+
this.processPendingMessage();
307+
this.processPendingHalfClose();
279308
});
280309
}
281310
// eslint-disable-next-line @typescript-eslint/no-explicit-any
282311
sendMessageWithContext(context: MessageContext, message: any): void {
283312
this.processingMessage = true;
284313
this.requester.sendMessage(message, (finalMessage) => {
285314
this.processingMessage = false;
286-
this.nextCall.sendMessageWithContext(context, finalMessage);
287-
if (this.pendingHalfClose) {
288-
this.nextCall.halfClose();
315+
if (this.processingMetadata) {
316+
this.pendingMessageContext = context;
317+
this.pendingMessage = message;
318+
} else {
319+
this.nextCall.sendMessageWithContext(context, finalMessage);
320+
this.processPendingHalfClose();
289321
}
290322
});
291323
}
@@ -298,7 +330,7 @@ export class InterceptingCall implements InterceptingCallInterface {
298330
}
299331
halfClose(): void {
300332
this.requester.halfClose(() => {
301-
if (this.processingMessage) {
333+
if (this.processingMetadata || this.processingMessage) {
302334
this.pendingHalfClose = true;
303335
} else {
304336
this.nextCall.halfClose();

0 commit comments

Comments
 (0)