Skip to content

Commit 4b3c263

Browse files
committed
Add subchannel interface
1 parent 81ef5e3 commit 4b3c263

9 files changed

+109
-24
lines changed

packages/grpc-js/src/channel.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import { Filter } from './filter';
4949

5050
import { ConnectivityState } from './connectivity-state';
5151
import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
52+
import { Subchannel } from './subchannel';
5253

5354
/**
5455
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@@ -451,7 +452,7 @@ export class ChannelImplementation implements Channel {
451452
if (subchannelState === ConnectivityState.READY) {
452453
try {
453454
const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
454-
pickResult.subchannel!.startCallStream(
455+
pickResult.subchannel?.getRealSubchannel().startCallStream(
455456
finalMetadata,
456457
callStream,
457458
[...dynamicFilters, ...pickExtraFilters]

packages/grpc-js/src/experimental.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ export { Call as CallStream } from './call-stream';
3434
export { Filter, BaseFilter, FilterFactory } from './filter';
3535
export { FilterStackFactory } from './filter-stack';
3636
export { registerAdminService } from './admin';
37+
export { SubchannelInterface, BaseSubchannelWrapper, ConnectivityStateListener } from './subchannel-interface'

packages/grpc-js/src/load-balancer-child-handler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ import {
2121
LoadBalancingConfig,
2222
createLoadBalancer,
2323
} from './load-balancer';
24-
import { Subchannel } from './subchannel';
2524
import { SubchannelAddress } from './subchannel-address';
2625
import { ChannelOptions } from './channel-options';
2726
import { ConnectivityState } from './connectivity-state';
2827
import { Picker } from './picker';
2928
import { ChannelRef, SubchannelRef } from './channelz';
29+
import { SubchannelInterface } from './subchannel-interface';
3030

3131
const TYPE_NAME = 'child_load_balancer_helper';
3232

@@ -40,7 +40,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
4040
createSubchannel(
4141
subchannelAddress: SubchannelAddress,
4242
subchannelArgs: ChannelOptions
43-
): Subchannel {
43+
): SubchannelInterface {
4444
return this.parent.channelControlHelper.createSubchannel(
4545
subchannelAddress,
4646
subchannelArgs

packages/grpc-js/src/load-balancer-pick-first.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ import {
3131
PickResultType,
3232
UnavailablePicker,
3333
} from './picker';
34-
import { Subchannel, ConnectivityStateListener } from './subchannel';
3534
import {
3635
SubchannelAddress,
3736
subchannelAddressToString,
3837
} from './subchannel-address';
3938
import * as logging from './logging';
4039
import { LogVerbosity } from './constants';
40+
import { SubchannelInterface, ConnectivityStateListener } from './subchannel-interface';
4141

4242
const TRACER_NAME = 'pick_first';
4343

@@ -77,7 +77,7 @@ export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
7777
* picked subchannel.
7878
*/
7979
class PickFirstPicker implements Picker {
80-
constructor(private subchannel: Subchannel) {}
80+
constructor(private subchannel: SubchannelInterface) {}
8181

8282
pick(pickArgs: PickArgs): CompletePickResult {
8383
return {
@@ -107,7 +107,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
107107
* The list of subchannels this load balancer is currently attempting to
108108
* connect to.
109109
*/
110-
private subchannels: Subchannel[] = [];
110+
private subchannels: SubchannelInterface[] = [];
111111
/**
112112
* The current connectivity state of the load balancer.
113113
*/
@@ -124,7 +124,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
124124
* and only if the load balancer's current state is READY. In that case,
125125
* the subchannel's current state is also READY.
126126
*/
127-
private currentPick: Subchannel | null = null;
127+
private currentPick: SubchannelInterface | null = null;
128128
/**
129129
* Listener callback attached to each subchannel in the `subchannels` list
130130
* while establishing a connection.
@@ -157,7 +157,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
157157
[ConnectivityState.TRANSIENT_FAILURE]: 0,
158158
};
159159
this.subchannelStateListener = (
160-
subchannel: Subchannel,
160+
subchannel: SubchannelInterface,
161161
previousState: ConnectivityState,
162162
newState: ConnectivityState
163163
) => {
@@ -219,7 +219,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
219219
}
220220
};
221221
this.pickedSubchannelStateListener = (
222-
subchannel: Subchannel,
222+
subchannel: SubchannelInterface,
223223
previousState: ConnectivityState,
224224
newState: ConnectivityState
225225
) => {
@@ -310,7 +310,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
310310
}, CONNECTION_DELAY_INTERVAL_MS);
311311
}
312312

313-
private pickSubchannel(subchannel: Subchannel) {
313+
private pickSubchannel(subchannel: SubchannelInterface) {
314314
trace('Pick subchannel with address ' + subchannel.getAddress());
315315
if (this.currentPick !== null) {
316316
this.currentPick.unref();

packages/grpc-js/src/load-balancer-round-robin.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ import {
3030
PickResultType,
3131
UnavailablePicker,
3232
} from './picker';
33-
import { Subchannel, ConnectivityStateListener } from './subchannel';
3433
import {
3534
SubchannelAddress,
3635
subchannelAddressToString,
3736
} from './subchannel-address';
3837
import * as logging from './logging';
3938
import { LogVerbosity } from './constants';
39+
import { ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';
4040

4141
const TRACER_NAME = 'round_robin';
4242

@@ -67,7 +67,7 @@ class RoundRobinLoadBalancingConfig implements LoadBalancingConfig {
6767

6868
class RoundRobinPicker implements Picker {
6969
constructor(
70-
private readonly subchannelList: Subchannel[],
70+
private readonly subchannelList: SubchannelInterface[],
7171
private nextIndex = 0
7272
) {}
7373

@@ -88,7 +88,7 @@ class RoundRobinPicker implements Picker {
8888
* balancer implementation to preserve this part of the picker state if
8989
* possible when a subchannel connects or disconnects.
9090
*/
91-
peekNextSubchannel(): Subchannel {
91+
peekNextSubchannel(): SubchannelInterface {
9292
return this.subchannelList[this.nextIndex];
9393
}
9494
}
@@ -102,7 +102,7 @@ interface ConnectivityStateCounts {
102102
}
103103

104104
export class RoundRobinLoadBalancer implements LoadBalancer {
105-
private subchannels: Subchannel[] = [];
105+
private subchannels: SubchannelInterface[] = [];
106106

107107
private currentState: ConnectivityState = ConnectivityState.IDLE;
108108

@@ -121,7 +121,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
121121
[ConnectivityState.TRANSIENT_FAILURE]: 0,
122122
};
123123
this.subchannelStateListener = (
124-
subchannel: Subchannel,
124+
subchannel: SubchannelInterface,
125125
previousState: ConnectivityState,
126126
newState: ConnectivityState
127127
) => {

packages/grpc-js/src/load-balancer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { SubchannelAddress } from './subchannel-address';
2121
import { ConnectivityState } from './connectivity-state';
2222
import { Picker } from './picker';
2323
import { ChannelRef, SubchannelRef } from './channelz';
24+
import { SubchannelInterface } from './subchannel-interface';
2425

2526
/**
2627
* A collection of functions associated with a channel that a load balancer
@@ -35,7 +36,7 @@ export interface ChannelControlHelper {
3536
createSubchannel(
3637
subchannelAddress: SubchannelAddress,
3738
subchannelArgs: ChannelOptions
38-
): Subchannel;
39+
): SubchannelInterface;
3940
/**
4041
* Passes a new subchannel picker up to the channel. This is called if either
4142
* the connectivity state changes or if a different picker is needed for any

packages/grpc-js/src/picker.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { Metadata } from './metadata';
2121
import { Status } from './constants';
2222
import { LoadBalancer } from './load-balancer';
2323
import { FilterFactory, Filter } from './filter';
24+
import { SubchannelInterface } from './subchannel-interface';
2425

2526
export enum PickResultType {
2627
COMPLETE,
@@ -36,7 +37,7 @@ export interface PickResult {
3637
* `pickResultType` is COMPLETE. If null, indicates that the call should be
3738
* dropped.
3839
*/
39-
subchannel: Subchannel | null;
40+
subchannel: SubchannelInterface | null;
4041
/**
4142
* The status object to end the call with. Populated if and only if
4243
* `pickResultType` is TRANSIENT_FAILURE.
@@ -53,7 +54,7 @@ export interface PickResult {
5354

5455
export interface CompletePickResult extends PickResult {
5556
pickResultType: PickResultType.COMPLETE;
56-
subchannel: Subchannel | null;
57+
subchannel: SubchannelInterface | null;
5758
status: null;
5859
extraFilterFactories: FilterFactory<Filter>[];
5960
onCallStarted: (() => void) | null;
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2022 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import { SubchannelRef } from "./channelz";
19+
import { ConnectivityState } from "./connectivity-state";
20+
import { Subchannel } from "./subchannel";
21+
22+
export type ConnectivityStateListener = (
23+
subchannel: SubchannelInterface,
24+
previousState: ConnectivityState,
25+
newState: ConnectivityState
26+
) => void;
27+
28+
/**
29+
* This is an interface for load balancing policies to use to interact with
30+
* subchannels. This allows load balancing policies to wrap and unwrap
31+
* subchannels.
32+
*
33+
* Any load balancing policy that wraps subchannels must unwrap the subchannel
34+
* in the picker, so that other load balancing policies consistently have
35+
* access to their own wrapper objects.
36+
*/
37+
export interface SubchannelInterface {
38+
getConnectivityState(): ConnectivityState;
39+
addConnectivityStateListener(listener: ConnectivityStateListener): void;
40+
removeConnectivityStateListener(listener: ConnectivityStateListener): void;
41+
startConnecting(): void;
42+
getAddress(): string;
43+
ref(): void;
44+
unref(): void;
45+
getChannelzRef(): SubchannelRef;
46+
/**
47+
* If this is a wrapper, return the wrapped subchannel, otherwise return this
48+
*/
49+
getRealSubchannel(): Subchannel;
50+
}
51+
52+
export abstract class BaseSubchannelWrapper implements SubchannelInterface {
53+
constructor(private child: SubchannelInterface) {}
54+
55+
getConnectivityState(): ConnectivityState {
56+
return this.child.getConnectivityState();
57+
}
58+
addConnectivityStateListener(listener: ConnectivityStateListener): void {
59+
this.child.addConnectivityStateListener(listener);
60+
}
61+
removeConnectivityStateListener(listener: ConnectivityStateListener): void {
62+
this.child.removeConnectivityStateListener(listener);
63+
}
64+
startConnecting(): void {
65+
this.child.startConnecting();
66+
}
67+
getAddress(): string {
68+
return this.child.getAddress();
69+
}
70+
ref(): void {
71+
this.child.ref();
72+
}
73+
unref(): void {
74+
this.child.unref();
75+
}
76+
getChannelzRef(): SubchannelRef {
77+
return this.child.getChannelzRef();
78+
}
79+
getRealSubchannel(): Subchannel {
80+
return this.child.getRealSubchannel();
81+
}
82+
}

packages/grpc-js/src/subchannel.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import {
3737
subchannelAddressToString,
3838
} from './subchannel-address';
3939
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
40+
import { ConnectivityStateListener } from './subchannel-interface';
4041

4142
const clientVersion = require('../../package.json').version;
4243

@@ -54,12 +55,6 @@ const BACKOFF_JITTER = 0.2;
5455
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
5556
const KEEPALIVE_TIMEOUT_MS = 20000;
5657

57-
export type ConnectivityStateListener = (
58-
subchannel: Subchannel,
59-
previousState: ConnectivityState,
60-
newState: ConnectivityState
61-
) => void;
62-
6358
export interface SubchannelCallStatsTracker {
6459
addMessageSent(): void;
6560
addMessageReceived(): void;
@@ -949,4 +944,8 @@ export class Subchannel {
949944
getChannelzRef(): SubchannelRef {
950945
return this.channelzRef;
951946
}
947+
948+
getRealSubchannel(): this {
949+
return this;
950+
}
952951
}

0 commit comments

Comments
 (0)