Skip to content

Commit 0dbcbcc

Browse files
authored
Merge pull request #2008 from grpc/@grpc/[email protected]
Upmerge more changes from @grpc/grpc [email protected]
2 parents 6847664 + a65fdce commit 0dbcbcc

File tree

6 files changed

+83
-37
lines changed

6 files changed

+83
-37
lines changed

packages/grpc-js/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js",
3-
"version": "1.4.4",
3+
"version": "1.4.6",
44
"description": "gRPC Library for Node - pure JS implementation",
55
"homepage": "https://grpc.io/",
66
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

packages/grpc-js/src/call-stream.ts

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,37 @@ export function isInterceptingListener(
149149
}
150150

151151
export class InterceptingListenerImpl implements InterceptingListener {
152+
private processingMetadata = false;
153+
private hasPendingMessage = false;
154+
private pendingMessage: any;
152155
private processingMessage = false;
153156
private pendingStatus: StatusObject | null = null;
154157
constructor(
155158
private listener: FullListener,
156159
private nextListener: InterceptingListener
157160
) {}
158161

162+
private processPendingMessage() {
163+
if (this.hasPendingMessage) {
164+
this.nextListener.onReceiveMessage(this.pendingMessage);
165+
this.pendingMessage = null;
166+
this.hasPendingMessage = false;
167+
}
168+
}
169+
170+
private processPendingStatus() {
171+
if (this.pendingStatus) {
172+
this.nextListener.onReceiveStatus(this.pendingStatus);
173+
}
174+
}
175+
159176
onReceiveMetadata(metadata: Metadata): void {
177+
this.processingMetadata = true;
160178
this.listener.onReceiveMetadata(metadata, (metadata) => {
179+
this.processingMetadata = false;
161180
this.nextListener.onReceiveMetadata(metadata);
181+
this.processPendingMessage();
182+
this.processPendingStatus();
162183
});
163184
}
164185
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -168,15 +189,18 @@ export class InterceptingListenerImpl implements InterceptingListener {
168189
this.processingMessage = true;
169190
this.listener.onReceiveMessage(message, (msg) => {
170191
this.processingMessage = false;
171-
this.nextListener.onReceiveMessage(msg);
172-
if (this.pendingStatus) {
173-
this.nextListener.onReceiveStatus(this.pendingStatus);
192+
if (this.processingMetadata) {
193+
this.pendingMessage = msg;
194+
this.hasPendingMessage = true;
195+
} else {
196+
this.nextListener.onReceiveMessage(msg);
197+
this.processPendingStatus();
174198
}
175199
});
176200
}
177201
onReceiveStatus(status: StatusObject): void {
178202
this.listener.onReceiveStatus(status, (processedStatus) => {
179-
if (this.processingMessage) {
203+
if (this.processingMetadata || this.processingMessage) {
180204
this.pendingStatus = processedStatus;
181205
} else {
182206
this.nextListener.onReceiveStatus(processedStatus);
@@ -283,7 +307,7 @@ export class Http2CallStream implements Call {
283307

284308
private outputStatus() {
285309
/* Precondition: this.finalStatus !== null */
286-
if (!this.statusOutput) {
310+
if (this.listener && !this.statusOutput) {
287311
this.statusOutput = true;
288312
const filteredStatus = this.filterStack.receiveTrailers(
289313
this.finalStatus!
@@ -692,6 +716,7 @@ export class Http2CallStream implements Call {
692716
this.trace('Sending metadata');
693717
this.listener = listener;
694718
this.channel._startCallStream(this, metadata);
719+
this.maybeOutputStatus();
695720
}
696721

697722
private destroyHttp2Stream() {

packages/grpc-js/src/client-interceptors.ts

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,18 @@ export class InterceptingCall implements InterceptingCallInterface {
208208
*/
209209
private requester: FullRequester;
210210
/**
211-
* Indicates that a message has been passed to the listener's onReceiveMessage
212-
* method it has not been passed to the corresponding next callback
211+
* Indicates that metadata has been passed to the requester's start
212+
* method but it has not been passed to the corresponding next callback
213+
*/
214+
private processingMetadata = false;
215+
/**
216+
* Message context for a pending message that is waiting for
217+
*/
218+
private pendingMessageContext: MessageContext | null = null;
219+
private pendingMessage: any;
220+
/**
221+
* Indicates that a message has been passed to the requester's sendMessage
222+
* method but it has not been passed to the corresponding next callback
213223
*/
214224
private processingMessage = false;
215225
/**
@@ -242,6 +252,21 @@ export class InterceptingCall implements InterceptingCallInterface {
242252
getPeer() {
243253
return this.nextCall.getPeer();
244254
}
255+
256+
private processPendingMessage() {
257+
if (this.pendingMessageContext) {
258+
this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
259+
this.pendingMessageContext = null;
260+
this.pendingMessage = null;
261+
}
262+
}
263+
264+
private processPendingHalfClose() {
265+
if (this.pendingHalfClose) {
266+
this.nextCall.halfClose();
267+
}
268+
}
269+
245270
start(
246271
metadata: Metadata,
247272
interceptingListener?: Partial<InterceptingListener>
@@ -257,7 +282,9 @@ export class InterceptingCall implements InterceptingCallInterface {
257282
interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
258283
((status) => {}),
259284
};
285+
this.processingMetadata = true;
260286
this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
287+
this.processingMetadata = false;
261288
let finalInterceptingListener: InterceptingListener;
262289
if (isInterceptingListener(listener)) {
263290
finalInterceptingListener = listener;
@@ -276,16 +303,21 @@ export class InterceptingCall implements InterceptingCallInterface {
276303
);
277304
}
278305
this.nextCall.start(md, finalInterceptingListener);
306+
this.processPendingMessage();
307+
this.processPendingHalfClose();
279308
});
280309
}
281310
// eslint-disable-next-line @typescript-eslint/no-explicit-any
282311
sendMessageWithContext(context: MessageContext, message: any): void {
283312
this.processingMessage = true;
284313
this.requester.sendMessage(message, (finalMessage) => {
285314
this.processingMessage = false;
286-
this.nextCall.sendMessageWithContext(context, finalMessage);
287-
if (this.pendingHalfClose) {
288-
this.nextCall.halfClose();
315+
if (this.processingMetadata) {
316+
this.pendingMessageContext = context;
317+
this.pendingMessage = message;
318+
} else {
319+
this.nextCall.sendMessageWithContext(context, finalMessage);
320+
this.processPendingHalfClose();
289321
}
290322
});
291323
}
@@ -298,7 +330,7 @@ export class InterceptingCall implements InterceptingCallInterface {
298330
}
299331
halfClose(): void {
300332
this.requester.halfClose(() => {
301-
if (this.processingMessage) {
333+
if (this.processingMetadata || this.processingMessage) {
302334
this.pendingHalfClose = true;
303335
} else {
304336
this.nextCall.halfClose();

packages/grpc-js/src/object-stream.ts

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,17 @@ export interface IntermediateObjectWritable<T> extends Writable {
3636
write(chunk: any & T, cb?: WriteCallback): boolean;
3737
write(chunk: any & T, encoding?: any, cb?: WriteCallback): boolean;
3838
setDefaultEncoding(encoding: string): this;
39-
end(): void;
40-
end(chunk: any & T, cb?: Function): void;
41-
end(chunk: any & T, encoding?: any, cb?: Function): void;
39+
end(): ReturnType<Writable['end']> extends Writable ? this : void;
40+
end(chunk: any & T, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
41+
end(chunk: any & T, encoding?: any, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
4242
}
4343

4444
export interface ObjectWritable<T> extends IntermediateObjectWritable<T> {
4545
_write(chunk: T, encoding: string, callback: Function): void;
4646
write(chunk: T, cb?: Function): boolean;
4747
write(chunk: T, encoding?: any, cb?: Function): boolean;
4848
setDefaultEncoding(encoding: string): this;
49-
end(): void;
50-
end(chunk: T, cb?: Function): void;
51-
end(chunk: T, encoding?: any, cb?: Function): void;
49+
end(): ReturnType<Writable['end']> extends Writable ? this : void;
50+
end(chunk: T, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
51+
end(chunk: T, encoding?: any, cb?: Function): ReturnType<Writable['end']> extends Writable ? this : void;
5252
}
53-
54-
export type ObjectDuplex<T, U> = {
55-
read(size?: number): U;
56-
57-
_write(chunk: T, encoding: string, callback: Function): void;
58-
write(chunk: T, cb?: Function): boolean;
59-
write(chunk: T, encoding?: any, cb?: Function): boolean;
60-
end(): void;
61-
end(chunk: T, cb?: Function): void;
62-
end(chunk: T, encoding?: any, cb?: Function): void;
63-
} & Duplex &
64-
ObjectWritable<T> &
65-
ObjectReadable<U>;

packages/grpc-js/src/resolving-load-balancer.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { ConnectivityState } from './connectivity-state';
2626
import { ConfigSelector, createResolver, Resolver } from './resolver';
2727
import { ServiceError } from './call';
2828
import { Picker, UnavailablePicker, QueuePicker } from './picker';
29-
import { BackoffTimeout } from './backoff-timeout';
29+
import { BackoffOptions, BackoffTimeout } from './backoff-timeout';
3030
import { Status } from './constants';
3131
import { StatusObject } from './call-stream';
3232
import { Metadata } from './metadata';
@@ -248,15 +248,18 @@ export class ResolvingLoadBalancer implements LoadBalancer {
248248
},
249249
channelOptions
250250
);
251-
251+
const backoffOptions: BackoffOptions = {
252+
initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'],
253+
maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'],
254+
};
252255
this.backoffTimeout = new BackoffTimeout(() => {
253256
if (this.continueResolving) {
254257
this.updateResolution();
255258
this.continueResolving = false;
256259
} else {
257260
this.updateState(this.latestChildState, this.latestChildPicker);
258261
}
259-
});
262+
}, backoffOptions);
260263
this.backoffTimeout.unref();
261264
}
262265

packages/grpc-js/src/server-call.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
238238
this.trailingMetadata = metadata;
239239
}
240240

241-
super.end();
241+
return super.end();
242242
}
243243
}
244244

@@ -285,7 +285,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType>
285285
this.trailingMetadata = metadata;
286286
}
287287

288-
super.end();
288+
return super.end();
289289
}
290290
}
291291

@@ -295,7 +295,6 @@ ServerDuplexStreamImpl.prototype._write =
295295
ServerWritableStreamImpl.prototype._write;
296296
ServerDuplexStreamImpl.prototype._final =
297297
ServerWritableStreamImpl.prototype._final;
298-
ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end;
299298

300299
// Unary response callback signature.
301300
export type sendUnaryData<ResponseType> = (

0 commit comments

Comments
 (0)