Skip to content

Commit fb98794

Browse files
committed
grpc-js-xds: Complete federation implementation
1 parent 596d5f1 commit fb98794

28 files changed

+1423
-2354
lines changed

packages/grpc-js-xds/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
"dependencies": {
4646
"@grpc/proto-loader": "^0.6.0",
4747
"google-auth-library": "^7.0.2",
48-
"re2-wasm": "^1.0.1"
48+
"re2-wasm": "^1.0.1",
49+
"vscode-uri": "^3.0.7"
4950
},
5051
"peerDependencies": {
5152
"@grpc/grpc-js": "~1.8.0"

packages/grpc-js-xds/src/csds.ts

Lines changed: 38 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515
*
1616
*/
1717

18-
import { Node } from "./generated/envoy/config/core/v3/Node";
1918
import { ClientConfig, _envoy_service_status_v3_ClientConfig_GenericXdsConfig as GenericXdsConfig } from "./generated/envoy/service/status/v3/ClientConfig";
2019
import { ClientStatusDiscoveryServiceHandlers } from "./generated/envoy/service/status/v3/ClientStatusDiscoveryService";
2120
import { ClientStatusRequest__Output } from "./generated/envoy/service/status/v3/ClientStatusRequest";
2221
import { ClientStatusResponse } from "./generated/envoy/service/status/v3/ClientStatusResponse";
2322
import { Timestamp } from "./generated/google/protobuf/Timestamp";
24-
import { AdsTypeUrl, CDS_TYPE_URL, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from "./resources";
25-
import { HandleResponseResult } from "./xds-stream-state/xds-stream-state";
23+
import { xdsResourceNameToString } from "./resources";
2624
import { sendUnaryData, ServerDuplexStream, ServerUnaryCall, status, experimental, loadPackageDefinition, logVerbosity } from '@grpc/grpc-js';
2725
import { loadSync } from "@grpc/proto-loader";
2826
import { ProtoGrpcType as CsdsProtoGrpcType } from "./generated/csds";
2927

3028
import registerAdminService = experimental.registerAdminService;
29+
import { XdsClient } from "./xds-client";
3130

3231
const TRACER_NAME = 'csds';
3332

@@ -47,115 +46,46 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
4746
}
4847
}
4948

50-
let clientNode: Node | null = null;
49+
const registeredClients: XdsClient[] = [];
5150

52-
const configStatus = {
53-
[EDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
54-
[CDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
55-
[RDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
56-
[LDS_TYPE_URL]: new Map<string, GenericXdsConfig>()
57-
};
58-
59-
/**
60-
* This function only accepts a v3 Node message, because we are only supporting
61-
* v3 CSDS and it only handles v3 Nodes. If the client is actually using v2 xDS
62-
* APIs, it should just provide the equivalent v3 Node message.
63-
* @param node The Node message for the client that is requesting resources
64-
*/
65-
export function setCsdsClientNode(node: Node) {
66-
clientNode = node;
67-
}
68-
69-
/**
70-
* Update the config status maps from the list of names of requested resources
71-
* for a specific type URL. These lists are the source of truth for determining
72-
* what resources will be listed in the CSDS response. Any resource that is not
73-
* in this list will never actually be applied anywhere.
74-
* @param typeUrl The resource type URL
75-
* @param names The list of resource names that are being requested
76-
*/
77-
export function updateCsdsRequestedNameList(typeUrl: AdsTypeUrl, names: string[]) {
78-
trace('Update type URL ' + typeUrl + ' with names [' + names + ']');
79-
const currentTime = dateToProtoTimestamp(new Date());
80-
const configMap = configStatus[typeUrl];
81-
for (const name of names) {
82-
if (!configMap.has(name)) {
83-
configMap.set(name, {
84-
type_url: typeUrl,
85-
name: name,
86-
last_updated: currentTime,
87-
client_status: 'REQUESTED'
88-
});
89-
}
90-
}
91-
for (const name of configMap.keys()) {
92-
if (!names.includes(name)) {
93-
configMap.delete(name);
94-
}
95-
}
51+
export function registerXdsClientWithCsds(client: XdsClient) {
52+
registeredClients.push(client);
9653
}
9754

98-
/**
99-
* Update the config status maps from the result of parsing a single ADS
100-
* response. All resources that validated are considered "ACKED", and all
101-
* resources that failed validation are considered "NACKED".
102-
* @param typeUrl The type URL of resources in this response
103-
* @param versionInfo The version info field from this response
104-
* @param updates The lists of resources that passed and failed validation
105-
*/
106-
export function updateCsdsResourceResponse(typeUrl: AdsTypeUrl, versionInfo: string, updates: HandleResponseResult) {
107-
const currentTime = dateToProtoTimestamp(new Date());
108-
const configMap = configStatus[typeUrl];
109-
for (const {name, raw} of updates.accepted) {
110-
const mapEntry = configMap.get(name);
111-
if (mapEntry) {
112-
trace('Updated ' + typeUrl + ' resource ' + name + ' to state ACKED');
113-
mapEntry.client_status = 'ACKED';
114-
mapEntry.version_info = versionInfo;
115-
mapEntry.xds_config = raw;
116-
mapEntry.error_state = null;
117-
mapEntry.last_updated = currentTime;
118-
}
119-
}
120-
for (const {name, error, raw} of updates.rejected) {
121-
const mapEntry = configMap.get(name);
122-
if (mapEntry) {
123-
trace('Updated ' + typeUrl + ' resource ' + name + ' to state NACKED');
124-
mapEntry.client_status = 'NACKED';
125-
mapEntry.error_state = {
126-
failed_configuration: raw,
127-
last_update_attempt: currentTime,
128-
details: error,
129-
version_info: versionInfo
130-
};
131-
}
132-
}
133-
for (const name of updates.missing) {
134-
const mapEntry = configMap.get(name);
135-
if (mapEntry) {
136-
trace('Updated ' + typeUrl + ' resource ' + name + ' to state DOES_NOT_EXIST');
137-
mapEntry.client_status = 'DOES_NOT_EXIST';
138-
mapEntry.version_info = versionInfo;
139-
mapEntry.xds_config = null;
140-
mapEntry.error_state = null;
141-
mapEntry.last_updated = currentTime;
55+
function getCurrentConfigList(): ClientConfig[] {
56+
const result: ClientConfig[] = [];
57+
for (const client of registeredClients) {
58+
if (!client.adsNode) {
59+
continue;
14260
}
143-
}
144-
}
145-
146-
function getCurrentConfig(): ClientConfig {
147-
const genericConfigList: GenericXdsConfig[] = [];
148-
for (const configMap of Object.values(configStatus)) {
149-
for (const configValue of configMap.values()) {
150-
genericConfigList.push(configValue);
61+
const genericConfigList: GenericXdsConfig[] = [];
62+
for (const [authority, authorityState] of client.authorityStateMap) {
63+
for (const [type, typeMap] of authorityState.resourceMap) {
64+
for (const [key, resourceState] of typeMap) {
65+
const typeUrl = type.getFullTypeUrl();
66+
const meta = resourceState.meta;
67+
genericConfigList.push({
68+
name: xdsResourceNameToString({authority, key}, typeUrl),
69+
client_status: meta.clientStatus,
70+
version_info: meta.version,
71+
xds_config: meta.clientStatus === 'ACKED' ? meta.rawResource : undefined,
72+
last_updated: meta.updateTime ? dateToProtoTimestamp(meta.updateTime) : undefined,
73+
error_state: meta.clientStatus === 'NACKED' ? {
74+
details: meta.failedDetails,
75+
failed_configuration: meta.rawResource,
76+
last_update_attempt: meta.failedUpdateTime ? dateToProtoTimestamp(meta.failedUpdateTime) : undefined,
77+
version_info: meta.failedVersion
78+
} : undefined
79+
});
80+
}
81+
}
15182
}
83+
result.push({
84+
node: client.adsNode,
85+
generic_xds_configs: genericConfigList
86+
});
15287
}
153-
const config = {
154-
node: clientNode,
155-
generic_xds_configs: genericConfigList
156-
};
157-
trace('Sending current config ' + JSON.stringify(config, undefined, 2));
158-
return config;
88+
return result;
15989
}
16090

16191
const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
@@ -169,7 +99,7 @@ const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
16999
return;
170100
}
171101
callback(null, {
172-
config: [getCurrentConfig()]
102+
config: getCurrentConfigList()
173103
});
174104
},
175105
StreamClientStatus(call: ServerDuplexStream<ClientStatusRequest__Output, ClientStatusResponse>) {
@@ -182,7 +112,7 @@ const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
182112
return;
183113
}
184114
call.write({
185-
config: [getCurrentConfig()]
115+
config: getCurrentConfigList()
186116
});
187117
});
188118
call.on('end', () => {

packages/grpc-js-xds/src/load-balancer-cds.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717

1818
import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
19-
import { getSingletonXdsClient, XdsClient } from './xds-client';
19+
import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client';
2020
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
2121
import SubchannelAddress = experimental.SubchannelAddress;
2222
import UnavailablePicker = experimental.UnavailablePicker;
@@ -29,13 +29,12 @@ import OutlierDetectionLoadBalancingConfig = experimental.OutlierDetectionLoadBa
2929
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
3030
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
3131
import QueuePicker = experimental.QueuePicker;
32-
import { Watcher } from './xds-stream-state/xds-stream-state';
3332
import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection';
3433
import { Duration__Output } from './generated/google/protobuf/Duration';
3534
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
3635
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler, XdsClusterResolverLoadBalancingConfig } from './load-balancer-xds-cluster-resolver';
3736
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';
38-
import { CdsUpdate, OutlierDetectionUpdate } from './xds-stream-state/cds-state';
37+
import { CdsUpdate, ClusterResourceType, OutlierDetectionUpdate } from './xds-resource-type/cluster-resource-type';
3938

4039
const TRACER_NAME = 'cds_balancer';
4140

@@ -195,8 +194,8 @@ export class CdsLoadBalancer implements LoadBalancer {
195194
return;
196195
}
197196
trace('Adding watcher for cluster ' + cluster);
198-
const watcher: Watcher<CdsUpdate> = {
199-
onValidUpdate: (update) => {
197+
const watcher: Watcher<CdsUpdate> = new Watcher<CdsUpdate>({
198+
onResourceChanged: (update) => {
200199
this.clusterTree[cluster].latestUpdate = update;
201200
if (update.type === 'AGGREGATE') {
202201
const children = update.aggregateChildren
@@ -227,15 +226,17 @@ export class CdsLoadBalancer implements LoadBalancer {
227226
}
228227
},
229228
onResourceDoesNotExist: () => {
229+
trace('Received onResourceDoesNotExist update for cluster ' + cluster);
230230
if (cluster in this.clusterTree) {
231231
this.clusterTree[cluster].latestUpdate = undefined;
232232
this.clusterTree[cluster].children = [];
233233
}
234234
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `CDS resource ${cluster} does not exist`, metadata: new Metadata()}));
235235
this.childBalancer.destroy();
236236
},
237-
onTransientError: (statusObj) => {
237+
onError: (statusObj) => {
238238
if (!this.updatedChild) {
239+
trace('Transitioning to transient failure due to onError update for cluster' + cluster);
239240
this.channelControlHelper.updateState(
240241
connectivityState.TRANSIENT_FAILURE,
241242
new UnavailablePicker({
@@ -246,19 +247,23 @@ export class CdsLoadBalancer implements LoadBalancer {
246247
);
247248
}
248249
}
249-
};
250+
});
250251
this.clusterTree[cluster] = {
251252
watcher: watcher,
252253
children: []
253254
};
254-
this.xdsClient?.addClusterWatcher(cluster, watcher);
255+
if (this.xdsClient) {
256+
ClusterResourceType.startWatch(this.xdsClient, cluster, watcher);
257+
}
255258
}
256259

257260
private removeCluster(cluster: string) {
258261
if (!(cluster in this.clusterTree)) {
259262
return;
260263
}
261-
this.xdsClient?.removeClusterWatcher(cluster, this.clusterTree[cluster].watcher);
264+
if (this.xdsClient) {
265+
ClusterResourceType.cancelWatch(this.xdsClient, cluster, this.clusterTree[cluster].watcher);
266+
}
262267
delete this.clusterTree[cluster];
263268
}
264269

packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ import { ClusterLoadAssignment__Output } from "./generated/envoy/config/endpoint
2323
import { LrsLoadBalancingConfig } from "./load-balancer-lrs";
2424
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from "./load-balancer-priority";
2525
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from "./load-balancer-weighted-target";
26-
import { getSingletonXdsClient, XdsClient } from "./xds-client";
26+
import { getSingletonXdsClient, Watcher, XdsClient } from "./xds-client";
2727
import { DropCategory, XdsClusterImplLoadBalancingConfig } from "./load-balancer-xds-cluster-impl";
28-
import { Watcher } from "./xds-stream-state/xds-stream-state";
2928

3029
import LoadBalancingConfig = experimental.LoadBalancingConfig;
3130
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig;
@@ -38,6 +37,7 @@ import ChannelControlHelper = experimental.ChannelControlHelper;
3837
import OutlierDetectionLoadBalancingConfig = experimental.OutlierDetectionLoadBalancingConfig;
3938
import subchannelAddressToString = experimental.subchannelAddressToString;
4039
import { serverConfigEqual, validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap";
40+
import { EndpointResourceType } from "./xds-resource-type/endpoint-resource-type";
4141

4242
const TRACER_NAME = 'xds_cluster_resolver';
4343

@@ -377,24 +377,26 @@ export class XdsClusterResolver implements LoadBalancer {
377377
};
378378
if (mechanism.type === 'EDS') {
379379
const edsServiceName = mechanism.eds_service_name ?? mechanism.cluster;
380-
const watcher: Watcher<ClusterLoadAssignment__Output> = {
381-
onValidUpdate: update => {
380+
const watcher: Watcher<ClusterLoadAssignment__Output> = new Watcher<ClusterLoadAssignment__Output>({
381+
onResourceChanged: update => {
382382
mechanismEntry.latestUpdate = getEdsPriorities(update);
383383
this.maybeUpdateChild();
384384
},
385385
onResourceDoesNotExist: () => {
386386
trace('Resource does not exist: ' + edsServiceName);
387387
mechanismEntry.latestUpdate = [{localities: [], dropCategories: []}];
388388
},
389-
onTransientError: error => {
389+
onError: error => {
390390
if (!mechanismEntry.latestUpdate) {
391391
trace('xDS request failed with error ' + error);
392392
mechanismEntry.latestUpdate = [{localities: [], dropCategories: []}];
393393
}
394394
}
395-
};
395+
});
396396
mechanismEntry.watcher = watcher;
397-
this.xdsClient?.addEndpointWatcher(edsServiceName, watcher);
397+
if (this.xdsClient) {
398+
EndpointResourceType.startWatch(this.xdsClient, edsServiceName, watcher);
399+
}
398400
} else {
399401
const resolver = createResolver({scheme: 'dns', path: mechanism.dns_hostname!}, {
400402
onSuccessfulResolution: addressList => {
@@ -434,7 +436,9 @@ export class XdsClusterResolver implements LoadBalancer {
434436
for (const mechanismEntry of this.discoveryMechanismList) {
435437
if (mechanismEntry.watcher) {
436438
const edsServiceName = mechanismEntry.discoveryMechanism.eds_service_name ?? mechanismEntry.discoveryMechanism.cluster;
437-
this.xdsClient?.removeEndpointWatcher(edsServiceName, mechanismEntry.watcher);
439+
if (this.xdsClient) {
440+
EndpointResourceType.cancelWatch(this.xdsClient, edsServiceName, mechanismEntry.watcher);
441+
}
438442
}
439443
mechanismEntry.resolver?.destroy();
440444
}

0 commit comments

Comments
 (0)