Skip to content

Commit 3f71020

Browse files
authored
Merge pull request #2211 from murgatroid99/merge_1.6.x
Merge the 1.6.x branch into master
2 parents 09f3dd9 + 3d60328 commit 3f71020

File tree

5 files changed

+47
-16
lines changed

5 files changed

+47
-16
lines changed

packages/grpc-js/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js",
3-
"version": "1.6.9",
3+
"version": "1.6.11",
44
"description": "gRPC Library for Node - pure JS implementation",
55
"homepage": "https://grpc.io/",
66
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

packages/grpc-js/src/call-stream.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,11 @@ export class Http2CallStream implements Call {
329329
process.nextTick(() => {
330330
this.listener?.onReceiveStatus(filteredStatus);
331331
});
332+
/* Leave the http2 stream in flowing state to drain incoming messages, to
333+
* ensure that the stream closure completes. The call stream already does
334+
* not push more messages after the status is output, so the messages go
335+
* nowhere either way. */
336+
this.http2Stream?.resume();
332337
if (this.subchannel) {
333338
this.subchannel.callUnref();
334339
this.subchannel.removeDisconnectListener(this.disconnectListener);
@@ -483,7 +488,11 @@ export class Http2CallStream implements Call {
483488
}
484489
let details = '';
485490
if (typeof metadataMap['grpc-message'] === 'string') {
486-
details = decodeURI(metadataMap['grpc-message']);
491+
try {
492+
details = decodeURI(metadataMap['grpc-message']);
493+
} catch (e) {
494+
details = metadataMap['grpc-messages'] as string;
495+
}
487496
metadata.remove('grpc-message');
488497
this.trace(
489498
'received status details string "' + details + '" from server'
@@ -573,8 +582,15 @@ export class Http2CallStream implements Call {
573582
}
574583
}
575584
});
576-
stream.on('trailers', this.handleTrailers.bind(this));
585+
stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
586+
this.handleTrailers(headers);
587+
});
577588
stream.on('data', (data: Buffer) => {
589+
/* If the status has already been output, allow the http2 stream to
590+
* drain without processing the data. */
591+
if (this.statusOutput) {
592+
return;
593+
}
578594
this.trace('receive HTTP/2 data frame of length ' + data.length);
579595
const messages = this.decoder.write(data);
580596

@@ -686,9 +702,6 @@ export class Http2CallStream implements Call {
686702
}
687703
this.streamEndWatchers.forEach(watcher => watcher(false));
688704
});
689-
if (!this.pendingRead) {
690-
stream.pause();
691-
}
692705
if (this.pendingWrite) {
693706
if (!this.pendingWriteCallback) {
694707
throw new Error('Invalid state in write handling code');

packages/grpc-js/src/load-balancer-outlier-detection.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
467467
// Step 3
468468
for (const [address, mapEntry] of this.addressMap.entries()) {
469469
// Step 3.i
470-
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) {
470+
if (this.getCurrentEjectionPercent() >= this.latestConfig.getMaxEjectionPercent()) {
471471
break;
472472
}
473473
// Step 3.ii
@@ -500,14 +500,22 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
500500
}
501501
trace('Running failure percentage check. threshold=' + failurePercentageConfig.threshold + ' request volume threshold=' + failurePercentageConfig.request_volume);
502502
// Step 1
503-
if (this.addressMap.size < failurePercentageConfig.minimum_hosts) {
503+
let addressesWithTargetVolume = 0;
504+
for (const mapEntry of this.addressMap.values()) {
505+
const successes = mapEntry.counter.getLastSuccesses();
506+
const failures = mapEntry.counter.getLastFailures();
507+
if (successes + failures >= failurePercentageConfig.request_volume) {
508+
addressesWithTargetVolume += 1;
509+
}
510+
}
511+
if (addressesWithTargetVolume < failurePercentageConfig.minimum_hosts) {
504512
return;
505513
}
506514

507515
// Step 2
508516
for (const [address, mapEntry] of this.addressMap.entries()) {
509517
// Step 2.i
510-
if (this.getCurrentEjectionPercent() > this.latestConfig.getMaxEjectionPercent()) {
518+
if (this.getCurrentEjectionPercent() >= this.latestConfig.getMaxEjectionPercent()) {
511519
break;
512520
}
513521
// Step 2.ii

packages/grpc-js/src/subchannel.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,21 @@ export class Subchannel {
361361
this.handleDisconnect();
362362
}, this.keepaliveTimeoutMs);
363363
this.keepaliveTimeoutId.unref?.();
364-
this.session!.ping(
365-
(err: Error | null, duration: number, payload: Buffer) => {
366-
this.keepaliveTrace('Received ping response');
367-
clearTimeout(this.keepaliveTimeoutId);
368-
}
369-
);
364+
try {
365+
this.session!.ping(
366+
(err: Error | null, duration: number, payload: Buffer) => {
367+
this.keepaliveTrace('Received ping response');
368+
clearTimeout(this.keepaliveTimeoutId);
369+
}
370+
);
371+
} catch (e) {
372+
/* If we fail to send a ping, the connection is no longer functional, so
373+
* we should discard it. */
374+
this.transitionToState(
375+
[ConnectivityState.READY],
376+
ConnectivityState.TRANSIENT_FAILURE
377+
);
378+
}
370379
}
371380

372381
private startKeepalivePings() {

packages/grpc-js/tsconfig.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
"target": "es2017",
77
"module": "commonjs",
88
"resolveJsonModule": true,
9-
"incremental": true
9+
"incremental": true,
10+
"types": ["mocha"]
1011
},
1112
"include": [
1213
"src/**/*.ts",

0 commit comments

Comments
 (0)