Skip to content

Commit 087bc3e

Browse files
authored
Merge pull request #1882 from murgatroid99/grpc-js-xds_v2_message_handling
grpc-js-xds: Distinguish v2 and v3 when handling messages
2 parents a6318a4 + 64a0b0a commit 087bc3e

File tree

7 files changed

+72
-38
lines changed

7 files changed

+72
-38
lines changed

packages/grpc-js-xds/src/resolver-xds.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ class XdsResolver implements Resolver {
212212
private latestRouteConfigName: string | null = null;
213213

214214
private latestRouteConfig: RouteConfiguration__Output | null = null;
215+
private latestRouteConfigIsV2 = false;
215216

216217
private clusterRefcounts = new Map<string, {inLastConfig: boolean, refCount: number}>();
217218

@@ -225,15 +226,15 @@ class XdsResolver implements Resolver {
225226
private channelOptions: ChannelOptions
226227
) {
227228
this.ldsWatcher = {
228-
onValidUpdate: (update: Listener__Output) => {
229+
onValidUpdate: (update: Listener__Output, isV2: boolean) => {
229230
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, update.api_listener!.api_listener!.value);
230231
const defaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout;
231232
if (defaultTimeout === null || defaultTimeout === undefined) {
232233
this.latestDefaultTimeout = undefined;
233234
} else {
234235
this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout);
235236
}
236-
if (EXPERIMENTAL_FAULT_INJECTION) {
237+
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
237238
this.ldsHttpFilterConfigs = [];
238239
for (const filter of httpConnectionManager.http_filters) {
239240
// typed_config must be set here, or validation would have failed
@@ -259,7 +260,7 @@ class XdsResolver implements Resolver {
259260
if (this.latestRouteConfigName) {
260261
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
261262
}
262-
this.handleRouteConfig(httpConnectionManager.route_config!);
263+
this.handleRouteConfig(httpConnectionManager.route_config!, isV2);
263264
break;
264265
default:
265266
// This is prevented by the validation rules
@@ -279,8 +280,8 @@ class XdsResolver implements Resolver {
279280
}
280281
};
281282
this.rdsWatcher = {
282-
onValidUpdate: (update: RouteConfiguration__Output) => {
283-
this.handleRouteConfig(update);
283+
onValidUpdate: (update: RouteConfiguration__Output, isV2: boolean) => {
284+
this.handleRouteConfig(update, isV2);
284285
},
285286
onTransientError: (error: StatusObject) => {
286287
/* A transient error only needs to bubble up as a failure if we have
@@ -310,20 +311,21 @@ class XdsResolver implements Resolver {
310311
refCount.refCount -= 1;
311312
if (!refCount.inLastConfig && refCount.refCount === 0) {
312313
this.clusterRefcounts.delete(clusterName);
313-
this.handleRouteConfig(this.latestRouteConfig!);
314+
this.handleRouteConfig(this.latestRouteConfig!, this.latestRouteConfigIsV2);
314315
}
315316
}
316317
}
317318

318-
private handleRouteConfig(routeConfig: RouteConfiguration__Output) {
319+
private handleRouteConfig(routeConfig: RouteConfiguration__Output, isV2: boolean) {
319320
this.latestRouteConfig = routeConfig;
321+
this.latestRouteConfigIsV2 = isV2;
320322
const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, this.target.path);
321323
if (virtualHost === null) {
322324
this.reportResolutionError('No matching route found');
323325
return;
324326
}
325327
const virtualHostHttpFilterOverrides = new Map<string, HttpFilterConfig>();
326-
if (EXPERIMENTAL_FAULT_INJECTION) {
328+
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
327329
for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) {
328330
const parsedConfig = parseOverrideFilterConfig(filter);
329331
if (parsedConfig) {
@@ -352,7 +354,7 @@ class XdsResolver implements Resolver {
352354
timeout = undefined;
353355
}
354356
const routeHttpFilterOverrides = new Map<string, HttpFilterConfig>();
355-
if (EXPERIMENTAL_FAULT_INJECTION) {
357+
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
356358
for (const [name, filter] of Object.entries(route.typed_per_filter_config ?? {})) {
357359
const parsedConfig = parseOverrideFilterConfig(filter);
358360
if (parsedConfig) {
@@ -367,7 +369,7 @@ class XdsResolver implements Resolver {
367369
const cluster = route.route!.cluster!;
368370
allConfigClusters.add(cluster);
369371
const extraFilterFactories: FilterFactory<Filter>[] = [];
370-
if (EXPERIMENTAL_FAULT_INJECTION) {
372+
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
371373
for (const filterConfig of this.ldsHttpFilterConfigs) {
372374
if (routeHttpFilterOverrides.has(filterConfig.name)) {
373375
const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
@@ -396,7 +398,7 @@ class XdsResolver implements Resolver {
396398
allConfigClusters.add(clusterWeight.name);
397399
const extraFilterFactories: FilterFactory<Filter>[] = [];
398400
const clusterHttpFilterOverrides = new Map<string, HttpFilterConfig>();
399-
if (EXPERIMENTAL_FAULT_INJECTION) {
401+
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
400402
for (const [name, filter] of Object.entries(clusterWeight.typed_per_filter_config ?? {})) {
401403
const parsedConfig = parseOverrideFilterConfig(filter);
402404
if (parsedConfig) {

packages/grpc-js-xds/src/xds-client.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,32 +435,47 @@ export class XdsClient {
435435
private handleAdsResponse(message: DiscoveryResponse__Output) {
436436
let errorString: string | null;
437437
let serviceKind: AdsServiceKind;
438+
let isV2: boolean;
439+
switch (message.type_url) {
440+
case EDS_TYPE_URL_V2:
441+
case CDS_TYPE_URL_V2:
442+
case RDS_TYPE_URL_V2:
443+
case LDS_TYPE_URL_V2:
444+
isV2 = true;
445+
break;
446+
default:
447+
isV2 = false;
448+
}
438449
switch (message.type_url) {
439450
case EDS_TYPE_URL_V2:
440451
case EDS_TYPE_URL_V3:
441452
errorString = this.adsState.eds.handleResponses(
442-
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources)
453+
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources),
454+
isV2
443455
);
444456
serviceKind = 'eds';
445457
break;
446458
case CDS_TYPE_URL_V2:
447459
case CDS_TYPE_URL_V3:
448460
errorString = this.adsState.cds.handleResponses(
449-
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources)
461+
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources),
462+
isV2
450463
);
451464
serviceKind = 'cds';
452465
break;
453466
case RDS_TYPE_URL_V2:
454467
case RDS_TYPE_URL_V3:
455468
errorString = this.adsState.rds.handleResponses(
456-
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources)
469+
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources),
470+
isV2
457471
);
458472
serviceKind = 'rds';
459473
break;
460474
case LDS_TYPE_URL_V2:
461475
case LDS_TYPE_URL_V3:
462476
errorString = this.adsState.lds.handleResponses(
463-
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources)
477+
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources),
478+
isV2
464479
);
465480
serviceKind = 'lds';
466481
break;

packages/grpc-js-xds/src/xds-stream-state/cds-state.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
3636
>();
3737

3838
private latestResponses: Cluster__Output[] = [];
39+
private latestIsV2 = false;
3940

4041
constructor(
4142
private edsState: EdsState,
@@ -61,13 +62,14 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
6162

6263
/* If we have already received an update for the requested edsServiceName,
6364
* immediately pass that update along to the watcher */
65+
const isV2 = this.latestIsV2;
6466
for (const message of this.latestResponses) {
6567
if (message.name === clusterName) {
6668
/* These updates normally occur asynchronously, so we ensure that
6769
* the same happens here */
6870
process.nextTick(() => {
6971
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
70-
watcher.onValidUpdate(message);
72+
watcher.onValidUpdate(message, isV2);
7173
});
7274
}
7375
}
@@ -134,14 +136,15 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
134136
}
135137
}
136138

137-
handleResponses(responses: Cluster__Output[]): string | null {
139+
handleResponses(responses: Cluster__Output[], isV2: boolean): string | null {
138140
for (const message of responses) {
139141
if (!this.validateResponse(message)) {
140142
trace('CDS validation failed for message ' + JSON.stringify(message));
141143
return 'CDS Error: Cluster validation failed';
142144
}
143145
}
144146
this.latestResponses = responses;
147+
this.latestIsV2 = isV2;
145148
const allEdsServiceNames: Set<string> = new Set<string>();
146149
const allClusterNames: Set<string> = new Set<string>();
147150
for (const message of responses) {
@@ -152,7 +155,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
152155
);
153156
const watchers = this.watchers.get(message.name) ?? [];
154157
for (const watcher of watchers) {
155-
watcher.onValidUpdate(message);
158+
watcher.onValidUpdate(message, isV2);
156159
}
157160
}
158161
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));

packages/grpc-js-xds/src/xds-stream-state/eds-state.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
3636
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
3737

3838
private latestResponses: ClusterLoadAssignment__Output[] = [];
39+
private latestIsV2 = false;
3940

4041
constructor(private updateResourceNames: () => void) {}
4142

@@ -61,13 +62,14 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
6162

6263
/* If we have already received an update for the requested edsServiceName,
6364
* immediately pass that update along to the watcher */
65+
const isV2 = this.latestIsV2;
6466
for (const message of this.latestResponses) {
6567
if (message.cluster_name === edsServiceName) {
6668
/* These updates normally occur asynchronously, so we ensure that
6769
* the same happens here */
6870
process.nextTick(() => {
6971
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
70-
watcher.onValidUpdate(message);
72+
watcher.onValidUpdate(message, isV2);
7173
});
7274
}
7375
}
@@ -143,20 +145,21 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
143145
}
144146
}
145147

146-
handleResponses(responses: ClusterLoadAssignment__Output[]) {
148+
handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) {
147149
for (const message of responses) {
148150
if (!this.validateResponse(message)) {
149151
trace('EDS validation failed for message ' + JSON.stringify(message));
150152
return 'EDS Error: ClusterLoadAssignment validation failed';
151153
}
152154
}
153155
this.latestResponses = responses;
156+
this.latestIsV2 = isV2;
154157
const allClusterNames: Set<string> = new Set<string>();
155158
for (const message of responses) {
156159
allClusterNames.add(message.cluster_name);
157160
const watchers = this.watchers.get(message.cluster_name) ?? [];
158161
for (const watcher of watchers) {
159-
watcher.onValidUpdate(message);
162+
watcher.onValidUpdate(message, isV2);
160163
}
161164
}
162165
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));

packages/grpc-js-xds/src/xds-stream-state/lds-state.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
3939

4040
private watchers: Map<string, Watcher<Listener__Output>[]> = new Map<string, Watcher<Listener__Output>[]>();
4141
private latestResponses: Listener__Output[] = [];
42+
private latestIsV2 = false;
4243

4344
constructor(private rdsState: RdsState, private updateResourceNames: () => void) {}
4445

@@ -55,13 +56,14 @@ export class LdsState implements XdsStreamState<Listener__Output> {
5556

5657
/* If we have already received an update for the requested edsServiceName,
5758
* immediately pass that update along to the watcher */
59+
const isV2 = this.latestIsV2;
5860
for (const message of this.latestResponses) {
5961
if (message.name === targetName) {
6062
/* These updates normally occur asynchronously, so we ensure that
6163
* the same happens here */
6264
process.nextTick(() => {
6365
trace('Reporting existing RDS update for new watcher for targetName ' + targetName);
64-
watcher.onValidUpdate(message);
66+
watcher.onValidUpdate(message, isV2);
6567
});
6668
}
6769
}
@@ -93,7 +95,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
9395
return Array.from(this.watchers.keys());
9496
}
9597

96-
private validateResponse(message: Listener__Output): boolean {
98+
private validateResponse(message: Listener__Output, isV2: boolean): boolean {
9799
if (
98100
!(
99101
message.api_listener?.api_listener &&
@@ -104,7 +106,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
104106
return false;
105107
}
106108
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value);
107-
if (EXPERIMENTAL_FAULT_INJECTION) {
109+
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
108110
const filterNames = new Set<string>();
109111
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
110112
if (filterNames.has(httpFilter.name)) {
@@ -136,7 +138,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
136138
case 'rds':
137139
return !!httpConnectionManager.rds?.config_source?.ads;
138140
case 'route_config':
139-
return this.rdsState.validateResponse(httpConnectionManager.route_config!);
141+
return this.rdsState.validateResponse(httpConnectionManager.route_config!, isV2);
140142
}
141143
return false;
142144
}
@@ -151,20 +153,21 @@ export class LdsState implements XdsStreamState<Listener__Output> {
151153
}
152154
}
153155

154-
handleResponses(responses: Listener__Output[]): string | null {
156+
handleResponses(responses: Listener__Output[], isV2: boolean): string | null {
155157
for (const message of responses) {
156-
if (!this.validateResponse(message)) {
158+
if (!this.validateResponse(message, isV2)) {
157159
trace('LDS validation failed for message ' + JSON.stringify(message));
158160
return 'LDS Error: Route validation failed';
159161
}
160162
}
161163
this.latestResponses = responses;
164+
this.latestIsV2 = isV2;
162165
const allTargetNames = new Set<string>();
163166
for (const message of responses) {
164167
allTargetNames.add(message.name);
165168
const watchers = this.watchers.get(message.name) ?? [];
166169
for (const watcher of watchers) {
167-
watcher.onValidUpdate(message);
170+
watcher.onValidUpdate(message, isV2);
168171
}
169172
}
170173
trace('Received RDS response with route config names ' + Array.from(allTargetNames));

0 commit comments

Comments
 (0)