Skip to content

Commit 5a5e424

Browse files
committed
grpc-js: Enable servers to send trailers-only responses
1 parent 035c260 commit 5a5e424

File tree

7 files changed

+666
-33
lines changed

7 files changed

+666
-33
lines changed

packages/grpc-js/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
5959
- `grpc.default_compression_algorithm`
6060
- `grpc.enable_channelz`
6161
- `grpc.dns_min_time_between_resolutions_ms`
62+
- `grpc.enable_retries`
63+
- `grpc.per_rpc_retry_buffer_size`
64+
- `grpc.retry_buffer_size`
6265
- `grpc-node.max_session_memory`
6366
- `channelOverride`
6467
- `channelFactoryOverride`

packages/grpc-js/src/load-balancing-call.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,15 @@ export class LoadBalancingCall implements Call {
150150
try {
151151
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
152152
onReceiveMetadata: metadata => {
153+
this.trace('Received metadata');
153154
this.listener!.onReceiveMetadata(metadata);
154155
},
155156
onReceiveMessage: message => {
157+
this.trace('Received message');
156158
this.listener!.onReceiveMessage(message);
157159
},
158160
onReceiveStatus: status => {
161+
this.trace('Received status');
159162
if (status.code === http2.constants.NGHTTP2_REFUSED_STREAM) {
160163
this.outputStatus(status, 'REFUSED');
161164
} else {

packages/grpc-js/src/retrying-call.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ export class RetryingCall implements Call {
145145
private initialMetadata: Metadata | null = null;
146146
private underlyingCalls: UnderlyingCall[] = [];
147147
private writeBuffer: WriteBufferEntry[] = [];
148+
/**
149+
* Tracks whether a read has been started, so that we know whether to start
150+
* reads on new child calls. This only matters for the first read, because
151+
* once a message comes in the child call becomes committed and there will
152+
* be no new child calls.
153+
*/
154+
private readStarted = false;
148155
private transparentRetryUsed: boolean = false;
149156
/**
150157
* Number of attempts so far
@@ -319,7 +326,7 @@ export class RetryingCall implements Call {
319326
this.reportStatus(status);
320327
break;
321328
case 'HEDGING':
322-
if (this.isStatusCodeInList(this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes, status.code)) {
329+
if (this.isStatusCodeInList(this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? [], status.code)) {
323330
this.retryThrottler?.addCallFailed();
324331
let delayMs: number;
325332
if (pushback === null) {
@@ -378,6 +385,7 @@ export class RetryingCall implements Call {
378385
if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
379386
return;
380387
}
388+
this.trace('state=' + this.state + ' handling status from child [' + this.underlyingCalls[callIndex].call.getCallNumber() + '] in state ' + this.underlyingCalls[callIndex].state);
381389
this.underlyingCalls[callIndex].state = 'COMPLETED';
382390
if (status.code === Status.OK) {
383391
this.retryThrottler?.addCallSucceeded();
@@ -465,6 +473,7 @@ export class RetryingCall implements Call {
465473
let receivedMetadata = false;
466474
child.start(initialMetadata, {
467475
onReceiveMetadata: metadata => {
476+
this.trace('Received metadata from child [' + child.getCallNumber() + ']');
468477
this.commitCall(index);
469478
receivedMetadata = true;
470479
if (previousAttempts > 0) {
@@ -475,19 +484,24 @@ export class RetryingCall implements Call {
475484
}
476485
},
477486
onReceiveMessage: message => {
487+
this.trace('Received message from child [' + child.getCallNumber() + ']');
478488
this.commitCall(index);
479489
if (this.underlyingCalls[index].state === 'ACTIVE') {
480490
this.listener!.onReceiveMessage(message);
481491
}
482492
},
483493
onReceiveStatus: status => {
494+
this.trace('Received status from child [' + child.getCallNumber() + ']');
484495
if (!receivedMetadata && previousAttempts > 0) {
485496
status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
486497
}
487-
this.commitCall(index);
488498
this.handleChildStatus(status, index);
489499
}
490-
})
500+
});
501+
this.sendNextChildMessage(index);
502+
if (this.readStarted) {
503+
child.startRead();
504+
}
491505
}
492506

493507
start(metadata: Metadata, listener: InterceptingListener): void {
@@ -559,6 +573,7 @@ export class RetryingCall implements Call {
559573
}
560574
startRead(): void {
561575
this.trace('startRead called');
576+
this.readStarted = true;
562577
for (const underlyingCall of this.underlyingCalls) {
563578
if (underlyingCall?.state === 'ACTIVE') {
564579
underlyingCall.call.startRead();

packages/grpc-js/src/server-call.ts

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -673,21 +673,31 @@ export class Http2ServerCallStream<
673673

674674
clearTimeout(this.deadlineTimer);
675675

676-
if (!this.wantTrailers) {
677-
this.wantTrailers = true;
678-
this.stream.once('wantTrailers', () => {
679-
const trailersToSend = Object.assign(
680-
{
681-
[GRPC_STATUS_HEADER]: statusObj.code,
682-
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string),
683-
},
684-
statusObj.metadata.toHttp2Headers()
685-
);
686-
687-
this.stream.sendTrailers(trailersToSend);
688-
});
689-
this.sendMetadata();
690-
this.stream.end();
676+
if (this.stream.headersSent) {
677+
if (!this.wantTrailers) {
678+
this.wantTrailers = true;
679+
this.stream.once('wantTrailers', () => {
680+
const trailersToSend = Object.assign(
681+
{
682+
[GRPC_STATUS_HEADER]: statusObj.code,
683+
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string),
684+
},
685+
statusObj.metadata.toHttp2Headers()
686+
);
687+
688+
this.stream.sendTrailers(trailersToSend);
689+
});
690+
this.stream.end();
691+
}
692+
} else {
693+
const trailersToSend = Object.assign(
694+
{
695+
[GRPC_STATUS_HEADER]: statusObj.code,
696+
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string),
697+
},
698+
statusObj.metadata.toHttp2Headers()
699+
);
700+
this.stream.respond(trailersToSend, {endStream: true});
691701
}
692702
}
693703

packages/grpc-js/src/service-config.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export interface RetryPolicy {
5050
export interface HedgingPolicy {
5151
maxAttempts: number;
5252
hedgingDelay?: string;
53-
nonFatalStatusCodes: (Status | string)[];
53+
nonFatalStatusCodes?: (Status | string)[];
5454
}
5555

5656
export interface MethodConfig {
@@ -124,19 +124,23 @@ function validateRetryPolicy(obj: any): RetryPolicy {
124124
if (!('backoffMultiplier' in obj) || typeof obj.backoffMultiplier !== 'number' || obj.backoffMultiplier <= 0) {
125125
throw new Error('Invalid method config retry policy: backoffMultiplier must be a number greater than 0');
126126
}
127-
if (('retryableStatusCodes' in obj) && Array.isArray(obj.retryableStatusCodes)) {
128-
for (const value of obj.retryableStatusCodes) {
129-
if (typeof value === 'number') {
130-
if (!Object.values(Status).includes(value)) {
131-
throw new Error('Invlid method config retry policy: retryableStatusCodes value not in status code range');
132-
}
133-
} else if (typeof value === 'string') {
134-
if (!Object.values(Status).includes(value.toUpperCase())) {
135-
throw new Error('Invlid method config retry policy: retryableStatusCodes value not a status code name');
136-
}
137-
} else {
138-
throw new Error('Invlid method config retry policy: retryableStatusCodes value must be a string or number');
127+
if (!(('retryableStatusCodes' in obj) && Array.isArray(obj.retryableStatusCodes))) {
128+
throw new Error('Invalid method config retry policy: retryableStatusCodes is required');
129+
}
130+
if (obj.retryableStatusCodes.length === 0) {
131+
throw new Error('Invalid method config retry policy: retryableStatusCodes must be non-empty');
132+
}
133+
for (const value of obj.retryableStatusCodes) {
134+
if (typeof value === 'number') {
135+
if (!Object.values(Status).includes(value)) {
136+
throw new Error('Invlid method config retry policy: retryableStatusCodes value not in status code range');
139137
}
138+
} else if (typeof value === 'string') {
139+
if (!Object.values(Status).includes(value.toUpperCase())) {
140+
throw new Error('Invlid method config retry policy: retryableStatusCodes value not a status code name');
141+
}
142+
} else {
143+
throw new Error('Invlid method config retry policy: retryableStatusCodes value must be a string or number');
140144
}
141145
}
142146
return {
@@ -171,12 +175,14 @@ function validateHedgingPolicy(obj: any): HedgingPolicy {
171175
}
172176
}
173177
const result: HedgingPolicy = {
174-
maxAttempts: obj.maxAttempts,
175-
nonFatalStatusCodes: obj.nonFatalStatusCodes
178+
maxAttempts: obj.maxAttempts
176179
}
177180
if (obj.hedgingDelay) {
178181
result.hedgingDelay = obj.hedgingDelay;
179182
}
183+
if (obj.nonFatalStatusCodes) {
184+
result.nonFatalStatusCodes = obj.nonFatalStatusCodes;
185+
}
180186
return result;
181187
}
182188

@@ -291,6 +297,9 @@ export function validateServiceConfig(obj: any): ServiceConfig {
291297
}
292298
}
293299
}
300+
if ('retryThrottling' in obj) {
301+
result.retryThrottling = validateRetryThrottling(obj.retryThrottling);
302+
}
294303
// Validate method name uniqueness
295304
const seenMethodNames: MethodConfigName[] = [];
296305
for (const methodConfig of result.methodConfig) {

0 commit comments

Comments
 (0)