Skip to content

Commit 974f810

Browse files
authored
Merge pull request #2463 from murgatroid99/grpc-js-xds_federation
grpc-js-xds: Implement federation support
2 parents 2ee8a68 + 967f903 commit 974f810

28 files changed

+2396
-1566
lines changed

packages/grpc-js-xds/gulpfile.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ const cleanAll = gulp.parallel(clean);
6161
const compile = checkTask(() => execNpmCommand('compile'));
6262

6363
const runTests = checkTask(() => {
64+
process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true';
6465
return gulp.src(`${outDir}/test/**/*.js`)
6566
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
6667
require: ['ts-node/register']}));

packages/grpc-js-xds/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
"dependencies": {
4646
"@grpc/proto-loader": "^0.6.0",
4747
"google-auth-library": "^7.0.2",
48-
"re2-wasm": "^1.0.1"
48+
"re2-wasm": "^1.0.1",
49+
"vscode-uri": "^3.0.7"
4950
},
5051
"peerDependencies": {
5152
"@grpc/grpc-js": "~1.8.0"

packages/grpc-js-xds/src/csds.ts

Lines changed: 40 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515
*
1616
*/
1717

18-
import { Node } from "./generated/envoy/config/core/v3/Node";
1918
import { ClientConfig, _envoy_service_status_v3_ClientConfig_GenericXdsConfig as GenericXdsConfig } from "./generated/envoy/service/status/v3/ClientConfig";
2019
import { ClientStatusDiscoveryServiceHandlers } from "./generated/envoy/service/status/v3/ClientStatusDiscoveryService";
2120
import { ClientStatusRequest__Output } from "./generated/envoy/service/status/v3/ClientStatusRequest";
2221
import { ClientStatusResponse } from "./generated/envoy/service/status/v3/ClientStatusResponse";
2322
import { Timestamp } from "./generated/google/protobuf/Timestamp";
24-
import { AdsTypeUrl, CDS_TYPE_URL, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from "./resources";
25-
import { HandleResponseResult } from "./xds-stream-state/xds-stream-state";
23+
import { xdsResourceNameToString } from "./resources";
2624
import { sendUnaryData, ServerDuplexStream, ServerUnaryCall, status, experimental, loadPackageDefinition, logVerbosity } from '@grpc/grpc-js';
2725
import { loadSync } from "@grpc/proto-loader";
2826
import { ProtoGrpcType as CsdsProtoGrpcType } from "./generated/csds";
2927

3028
import registerAdminService = experimental.registerAdminService;
29+
import { XdsClient } from "./xds-client";
3130

3231
const TRACER_NAME = 'csds';
3332

@@ -47,115 +46,47 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
4746
}
4847
}
4948

50-
let clientNode: Node | null = null;
49+
const registeredClients: XdsClient[] = [];
5150

52-
const configStatus = {
53-
[EDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
54-
[CDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
55-
[RDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
56-
[LDS_TYPE_URL]: new Map<string, GenericXdsConfig>()
57-
};
58-
59-
/**
60-
* This function only accepts a v3 Node message, because we are only supporting
61-
* v3 CSDS and it only handles v3 Nodes. If the client is actually using v2 xDS
62-
* APIs, it should just provide the equivalent v3 Node message.
63-
* @param node The Node message for the client that is requesting resources
64-
*/
65-
export function setCsdsClientNode(node: Node) {
66-
clientNode = node;
51+
export function registerXdsClientWithCsds(client: XdsClient) {
52+
registeredClients.push(client);
6753
}
6854

69-
/**
70-
* Update the config status maps from the list of names of requested resources
71-
* for a specific type URL. These lists are the source of truth for determining
72-
* what resources will be listed in the CSDS response. Any resource that is not
73-
* in this list will never actually be applied anywhere.
74-
* @param typeUrl The resource type URL
75-
* @param names The list of resource names that are being requested
76-
*/
77-
export function updateCsdsRequestedNameList(typeUrl: AdsTypeUrl, names: string[]) {
78-
trace('Update type URL ' + typeUrl + ' with names [' + names + ']');
79-
const currentTime = dateToProtoTimestamp(new Date());
80-
const configMap = configStatus[typeUrl];
81-
for (const name of names) {
82-
if (!configMap.has(name)) {
83-
configMap.set(name, {
84-
type_url: typeUrl,
85-
name: name,
86-
last_updated: currentTime,
87-
client_status: 'REQUESTED'
88-
});
89-
}
90-
}
91-
for (const name of configMap.keys()) {
92-
if (!names.includes(name)) {
93-
configMap.delete(name);
94-
}
95-
}
96-
}
97-
98-
/**
99-
* Update the config status maps from the result of parsing a single ADS
100-
* response. All resources that validated are considered "ACKED", and all
101-
* resources that failed validation are considered "NACKED".
102-
* @param typeUrl The type URL of resources in this response
103-
* @param versionInfo The version info field from this response
104-
* @param updates The lists of resources that passed and failed validation
105-
*/
106-
export function updateCsdsResourceResponse(typeUrl: AdsTypeUrl, versionInfo: string, updates: HandleResponseResult) {
107-
const currentTime = dateToProtoTimestamp(new Date());
108-
const configMap = configStatus[typeUrl];
109-
for (const {name, raw} of updates.accepted) {
110-
const mapEntry = configMap.get(name);
111-
if (mapEntry) {
112-
trace('Updated ' + typeUrl + ' resource ' + name + ' to state ACKED');
113-
mapEntry.client_status = 'ACKED';
114-
mapEntry.version_info = versionInfo;
115-
mapEntry.xds_config = raw;
116-
mapEntry.error_state = null;
117-
mapEntry.last_updated = currentTime;
55+
function getCurrentConfigList(): ClientConfig[] {
56+
const result: ClientConfig[] = [];
57+
for (const client of registeredClients) {
58+
if (!client.adsNode) {
59+
continue;
11860
}
119-
}
120-
for (const {name, error, raw} of updates.rejected) {
121-
const mapEntry = configMap.get(name);
122-
if (mapEntry) {
123-
trace('Updated ' + typeUrl + ' resource ' + name + ' to state NACKED');
124-
mapEntry.client_status = 'NACKED';
125-
mapEntry.error_state = {
126-
failed_configuration: raw,
127-
last_update_attempt: currentTime,
128-
details: error,
129-
version_info: versionInfo
130-
};
131-
}
132-
}
133-
for (const name of updates.missing) {
134-
const mapEntry = configMap.get(name);
135-
if (mapEntry) {
136-
trace('Updated ' + typeUrl + ' resource ' + name + ' to state DOES_NOT_EXIST');
137-
mapEntry.client_status = 'DOES_NOT_EXIST';
138-
mapEntry.version_info = versionInfo;
139-
mapEntry.xds_config = null;
140-
mapEntry.error_state = null;
141-
mapEntry.last_updated = currentTime;
142-
}
143-
}
144-
}
145-
146-
function getCurrentConfig(): ClientConfig {
147-
const genericConfigList: GenericXdsConfig[] = [];
148-
for (const configMap of Object.values(configStatus)) {
149-
for (const configValue of configMap.values()) {
150-
genericConfigList.push(configValue);
61+
const genericConfigList: GenericXdsConfig[] = [];
62+
for (const [authority, authorityState] of client.authorityStateMap) {
63+
for (const [type, typeMap] of authorityState.resourceMap) {
64+
for (const [key, resourceState] of typeMap) {
65+
const typeUrl = type.getTypeUrl();
66+
const meta = resourceState.meta;
67+
genericConfigList.push({
68+
name: xdsResourceNameToString({authority, key}, typeUrl),
69+
type_url: typeUrl,
70+
client_status: meta.clientStatus,
71+
version_info: meta.version,
72+
xds_config: meta.clientStatus === 'ACKED' ? meta.rawResource : undefined,
73+
last_updated: meta.updateTime ? dateToProtoTimestamp(meta.updateTime) : undefined,
74+
error_state: meta.clientStatus === 'NACKED' ? {
75+
details: meta.failedDetails,
76+
failed_configuration: meta.rawResource,
77+
last_update_attempt: meta.failedUpdateTime ? dateToProtoTimestamp(meta.failedUpdateTime) : undefined,
78+
version_info: meta.failedVersion
79+
} : undefined
80+
});
81+
}
82+
}
15183
}
84+
result.push({
85+
node: client.adsNode,
86+
generic_xds_configs: genericConfigList
87+
});
15288
}
153-
const config = {
154-
node: clientNode,
155-
generic_xds_configs: genericConfigList
156-
};
157-
trace('Sending current config ' + JSON.stringify(config, undefined, 2));
158-
return config;
89+
return result;
15990
}
16091

16192
const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
@@ -169,7 +100,7 @@ const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
169100
return;
170101
}
171102
callback(null, {
172-
config: [getCurrentConfig()]
103+
config: getCurrentConfigList()
173104
});
174105
},
175106
StreamClientStatus(call: ServerDuplexStream<ClientStatusRequest__Output, ClientStatusResponse>) {
@@ -182,7 +113,7 @@ const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
182113
return;
183114
}
184115
call.write({
185-
config: [getCurrentConfig()]
116+
config: getCurrentConfigList()
186117
});
187118
});
188119
call.on('end', () => {
@@ -211,4 +142,4 @@ const csdsServiceDefinition = csdsGrpcObject.envoy.service.status.v3.ClientStatu
211142

212143
export function setup() {
213144
registerAdminService(() => csdsServiceDefinition, () => csdsImplementation);
214-
}
145+
}

packages/grpc-js-xds/src/environment.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@
1717

1818
export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true';
1919
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
20-
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
20+
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
21+
export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true';

0 commit comments

Comments
 (0)