Skip to content

Commit 035c260

Browse files
committed
grpc-js: Implement retries
1 parent c4c321d commit 035c260

File tree

8 files changed

+762
-13
lines changed

8 files changed

+762
-13
lines changed

packages/grpc-js/src/channel-options.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ export interface ChannelOptions {
4444
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
4545
'grpc.enable_channelz'?: number;
4646
'grpc.dns_min_time_between_resolutions_ms'?: number;
47+
'grpc.enable_retries'?: number;
48+
'grpc.per_rpc_retry_buffer_size'?: number;
49+
/* This option is pattered like a core option, but the core does not have
50+
* this option. It is closely related to the option
51+
* grpc.per_rpc_retry_buffer_size, which is in the core. The core will likely
52+
* implement this functionality using the ResourceQuota mechanism, so there
53+
* will probably not be any collision or other inconsistency. */
54+
'grpc.retry_buffer_size'?: number;
4755
'grpc-node.max_session_memory'?: number;
4856
// eslint-disable-next-line @typescript-eslint/no-explicit-any
4957
[key: string]: any;
@@ -71,6 +79,9 @@ export const recognizedOptions = {
7179
'grpc.enable_http_proxy': true,
7280
'grpc.enable_channelz': true,
7381
'grpc.dns_min_time_between_resolutions_ms': true,
82+
'grpc.enable_retries': true,
83+
'grpc.per_rpc_retry_buffer_size': true,
84+
'grpc.retry_buffer_size': true,
7485
'grpc-node.max_session_memory': true,
7586
};
7687

packages/grpc-js/src/internal-channel.ts

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import { Deadline, getDeadlineTimeoutString } from './deadline';
5050
import { ResolvingCall } from './resolving-call';
5151
import { getNextCallNumber } from './call-number';
5252
import { restrictControlPlaneStatusCode } from './control-plane-status';
53+
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
5354

5455
/**
5556
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@@ -78,6 +79,11 @@ interface ErrorConfigResult {
7879

7980
type GetConfigResult = NoneConfigResult | SuccessConfigResult | ErrorConfigResult;
8081

82+
const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
83+
84+
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
85+
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB
86+
8187
export class InternalChannel {
8288

8389
private resolvingLoadBalancer: ResolvingLoadBalancer;
@@ -111,6 +117,7 @@ export class InternalChannel {
111117
* than TRANSIENT_FAILURE.
112118
*/
113119
private currentResolutionError: StatusObject | null = null;
120+
private retryBufferTracker: MessageBufferTracker;
114121

115122
// Channelz info
116123
private readonly channelzEnabled: boolean = true;
@@ -179,6 +186,10 @@ export class InternalChannel {
179186
this.subchannelPool = getSubchannelPool(
180187
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
181188
);
189+
this.retryBufferTracker = new MessageBufferTracker(
190+
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
191+
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
192+
);
182193
const channelControlHelper: ChannelControlHelper = {
183194
createSubchannel: (
184195
subchannelAddress: SubchannelAddress,
@@ -226,7 +237,12 @@ export class InternalChannel {
226237
this.target,
227238
channelControlHelper,
228239
options,
229-
(configSelector) => {
240+
(serviceConfig, configSelector) => {
241+
if (serviceConfig.retryThrottling) {
242+
RETRY_THROTTLER_MAP.set(this.getTarget(), new RetryThrottler(serviceConfig.retryThrottling.maxTokens, serviceConfig.retryThrottling.tokenRatio, RETRY_THROTTLER_MAP.get(this.getTarget())));
243+
} else {
244+
RETRY_THROTTLER_MAP.delete(this.getTarget());
245+
}
230246
if (this.channelzEnabled) {
231247
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
232248
}
@@ -243,6 +259,7 @@ export class InternalChannel {
243259
}
244260
this.configSelectionQueue = [];
245261
});
262+
246263
},
247264
(status) => {
248265
if (this.channelzEnabled) {
@@ -405,6 +422,24 @@ export class InternalChannel {
405422
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
406423
}
407424

425+
createRetryingCall(
426+
callConfig: CallConfig,
427+
method: string,
428+
host: string,
429+
credentials: CallCredentials,
430+
deadline: Deadline
431+
): RetryingCall {
432+
const callNumber = getNextCallNumber();
433+
this.trace(
434+
'createRetryingCall [' +
435+
callNumber +
436+
'] method="' +
437+
method +
438+
'"'
439+
);
440+
return new RetryingCall(this, callConfig, method, host, credentials, deadline, callNumber, this.retryBufferTracker, RETRY_THROTTLER_MAP.get(this.getTarget()))
441+
}
442+
408443
createInnerCall(
409444
callConfig: CallConfig,
410445
method: string,
@@ -413,7 +448,11 @@ export class InternalChannel {
413448
deadline: Deadline
414449
): Call {
415450
// Create a RetryingCall if retries are enabled
416-
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
451+
if (this.options['grpc.enable_retries'] === 0) {
452+
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
453+
} else {
454+
return this.createRetryingCall(callConfig, method, host, credentials, deadline);
455+
}
417456
}
418457

419458
createResolvingCall(
@@ -439,7 +478,7 @@ export class InternalChannel {
439478
parentCall: parentCall,
440479
};
441480

442-
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), getNextCallNumber());
481+
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), callNumber);
443482

444483
if (this.channelzEnabled) {
445484
this.callTracker.addCallStarted();

packages/grpc-js/src/load-balancing-call.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { CallConfig } from "./resolver";
2929
import { splitHostPort } from "./uri-parser";
3030
import * as logging from './logging';
3131
import { restrictControlPlaneStatusCode } from "./control-plane-status";
32+
import * as http2 from 'http2';
3233

3334
const TRACER_NAME = 'load_balancing_call';
3435

@@ -38,6 +39,10 @@ export interface StatusObjectWithProgress extends StatusObject {
3839
progress: RpcProgress;
3940
}
4041

42+
export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
43+
onReceiveStatus(status: StatusObjectWithProgress): void;
44+
}
45+
4146
export class LoadBalancingCall implements Call {
4247
private child: SubchannelCall | null = null;
4348
private readPending = false;
@@ -151,7 +156,11 @@ export class LoadBalancingCall implements Call {
151156
this.listener!.onReceiveMessage(message);
152157
},
153158
onReceiveStatus: status => {
154-
this.outputStatus(status, 'PROCESSED');
159+
if (status.code === http2.constants.NGHTTP2_REFUSED_STREAM) {
160+
this.outputStatus(status, 'REFUSED');
161+
} else {
162+
this.outputStatus(status, 'PROCESSED');
163+
}
155164
}
156165
});
157166
} catch (error) {
@@ -226,7 +235,7 @@ export class LoadBalancingCall implements Call {
226235
getPeer(): string {
227236
return this.child?.getPeer() ?? this.channel.getTarget();
228237
}
229-
start(metadata: Metadata, listener: InterceptingListener): void {
238+
start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
230239
this.trace('start called');
231240
this.listener = listener;
232241
this.metadata = metadata;

packages/grpc-js/src/resolving-load-balancer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ function getDefaultConfigSelector(
8383
}
8484

8585
export interface ResolutionCallback {
86-
(configSelector: ConfigSelector): void;
86+
(serviceConfig: ServiceConfig, configSelector: ConfigSelector): void;
8787
}
8888

8989
export interface ResolutionFailureCallback {
@@ -239,6 +239,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
239239
const finalServiceConfig =
240240
workingServiceConfig ?? this.defaultServiceConfig;
241241
this.onSuccessfulResolution(
242+
finalServiceConfig,
242243
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
243244
);
244245
},

0 commit comments

Comments
 (0)