Skip to content

Commit dca3670

Browse files
committed
grpc-js-xds: Add details to ADS response handling result
1 parent 8b7a4a0 commit dca3670

File tree

6 files changed

+193
-95
lines changed

6 files changed

+193
-95
lines changed

packages/grpc-js-xds/src/xds-client.ts

Lines changed: 74 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import { EdsState } from './xds-stream-state/eds-state';
4545
import { CdsState } from './xds-stream-state/cds-state';
4646
import { RdsState } from './xds-stream-state/rds-state';
4747
import { LdsState } from './xds-stream-state/lds-state';
48-
import { Watcher } from './xds-stream-state/xds-stream-state';
48+
import { HandleResponseResult, ResourcePair, Watcher } from './xds-stream-state/xds-stream-state';
4949
import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment';
5050
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
5151
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
@@ -242,11 +242,14 @@ function getResponseMessages<T extends AdsTypeUrl>(
242242
targetTypeUrl: T,
243243
allowedTypeUrls: string[],
244244
resources: Any__Output[]
245-
): AdsOutputType<T>[] {
246-
const result: AdsOutputType<T>[] = [];
245+
): ResourcePair<AdsOutputType<T>>[] {
246+
const result: ResourcePair<AdsOutputType<T>>[] = [];
247247
for (const resource of resources) {
248248
if (allowedTypeUrls.includes(resource.type_url)) {
249-
result.push(decodeSingleResource(targetTypeUrl, resource.value));
249+
result.push({
250+
resource: decodeSingleResource(targetTypeUrl, resource.value),
251+
raw: resource
252+
});
250253
} else {
251254
throw new Error(
252255
`ADS Error: Invalid resource type ${resource.type_url}, expected ${allowedTypeUrls}`
@@ -450,8 +453,10 @@ export class XdsClient {
450453
}
451454

452455
private handleAdsResponse(message: DiscoveryResponse__Output) {
453-
let errorString: string | null;
454-
let serviceKind: AdsServiceKind;
456+
let handleResponseResult: {
457+
result: HandleResponseResult;
458+
serviceKind: AdsServiceKind;
459+
} | null = null;
455460
let isV2: boolean;
456461
switch (message.type_url) {
457462
case EDS_TYPE_URL_V2:
@@ -463,56 +468,71 @@ export class XdsClient {
463468
default:
464469
isV2 = false;
465470
}
466-
switch (message.type_url) {
467-
case EDS_TYPE_URL_V2:
468-
case EDS_TYPE_URL_V3:
469-
errorString = this.adsState.eds.handleResponses(
470-
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources),
471-
isV2
472-
);
473-
serviceKind = 'eds';
474-
break;
475-
case CDS_TYPE_URL_V2:
476-
case CDS_TYPE_URL_V3:
477-
errorString = this.adsState.cds.handleResponses(
478-
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources),
479-
isV2
480-
);
481-
serviceKind = 'cds';
482-
break;
483-
case RDS_TYPE_URL_V2:
484-
case RDS_TYPE_URL_V3:
485-
errorString = this.adsState.rds.handleResponses(
486-
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources),
487-
isV2
488-
);
489-
serviceKind = 'rds';
490-
break;
491-
case LDS_TYPE_URL_V2:
492-
case LDS_TYPE_URL_V3:
493-
errorString = this.adsState.lds.handleResponses(
494-
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources),
495-
isV2
496-
);
497-
serviceKind = 'lds';
498-
break;
499-
default:
500-
errorString = `Unknown type_url ${message.type_url}`;
501-
// This is not used in this branch, but setting it makes the types easier to handle
502-
serviceKind = 'eds';
471+
try {
472+
switch (message.type_url) {
473+
case EDS_TYPE_URL_V2:
474+
case EDS_TYPE_URL_V3:
475+
handleResponseResult = {
476+
result: this.adsState.eds.handleResponses(
477+
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources),
478+
isV2
479+
),
480+
serviceKind: 'eds'
481+
};
482+
break;
483+
case CDS_TYPE_URL_V2:
484+
case CDS_TYPE_URL_V3:
485+
handleResponseResult = {
486+
result: this.adsState.cds.handleResponses(
487+
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources),
488+
isV2
489+
),
490+
serviceKind: 'cds'
491+
};
492+
break;
493+
case RDS_TYPE_URL_V2:
494+
case RDS_TYPE_URL_V3:
495+
handleResponseResult = {
496+
result: this.adsState.rds.handleResponses(
497+
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources),
498+
isV2
499+
),
500+
serviceKind: 'rds'
501+
};
502+
break;
503+
case LDS_TYPE_URL_V2:
504+
case LDS_TYPE_URL_V3:
505+
handleResponseResult = {
506+
result: this.adsState.lds.handleResponses(
507+
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources),
508+
isV2
509+
),
510+
serviceKind: 'lds'
511+
}
512+
break;
513+
}
514+
} catch (e) {
515+
trace('Nacking message with protobuf parsing error: ' + e.message);
516+
this.nack(message.type_url, e.message);
503517
}
504-
if (errorString === null) {
505-
trace('Acking message with type URL ' + message.type_url);
506-
/* errorString can only be null in one of the first 4 cases, which
507-
* implies that message.type_url is one of the 4 known type URLs, which
508-
* means that this type assertion is valid. */
509-
const typeUrl = message.type_url as AdsTypeUrl;
510-
this.adsState[serviceKind].nonce = message.nonce;
511-
this.adsState[serviceKind].versionInfo = message.version_info;
512-
this.ack(serviceKind);
518+
if (handleResponseResult === null) {
519+
// Null handleResponseResult means that the type_url was unrecognized
520+
trace('Nacking message with unknown type URL ' + message.type_url);
521+
this.nack(message.type_url, `Unknown type_url ${message.type_url}`);
513522
} else {
514-
trace('Nacking message with type URL ' + message.type_url + ': "' + errorString + '"');
515-
this.nack(message.type_url, errorString);
523+
if (handleResponseResult.result.rejected.length > 0) {
524+
// rejected.length > 0 means that at least one message validation failed
525+
const errorString = `${handleResponseResult.serviceKind.toUpperCase()} Error: ${handleResponseResult.result.rejected[0].error}`;
526+
trace('Nacking message with type URL ' + message.type_url + ': ' + errorString);
527+
this.nack(message.type_url, errorString);
528+
} else {
529+
// If we get here, all message validation succeeded
530+
trace('Acking message with type URL ' + message.type_url);
531+
const serviceKind = handleResponseResult.serviceKind;
532+
this.adsState[serviceKind].nonce = message.nonce;
533+
this.adsState[serviceKind].versionInfo = message.version_info;
534+
this.ack(serviceKind);
535+
}
516536
}
517537
}
518538

packages/grpc-js-xds/src/xds-stream-state/cds-state.ts

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
1919
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
20+
import { Any__Output } from "../generated/google/protobuf/Any";
2021
import { EdsState } from "./eds-state";
21-
import { Watcher, XdsStreamState } from "./xds-stream-state";
22+
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
2223

2324
const TRACER_NAME = 'xds_client';
2425

@@ -125,26 +126,40 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
125126
* onResourceDoesNotExist method.
126127
* @param allClusterNames
127128
*/
128-
private handleMissingNames(allClusterNames: Set<string>) {
129+
private handleMissingNames(allClusterNames: Set<string>): string[] {
130+
const missingNames: string[] = [];
129131
for (const [clusterName, watcherList] of this.watchers.entries()) {
130132
if (!allClusterNames.has(clusterName)) {
131133
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
134+
missingNames.push(clusterName);
132135
for (const watcher of watcherList) {
133136
watcher.onResourceDoesNotExist();
134137
}
135138
}
136139
}
140+
return missingNames;
137141
}
138142

139-
handleResponses(responses: Cluster__Output[], isV2: boolean): string | null {
143+
handleResponses(responses: ResourcePair<Cluster__Output>[], isV2: boolean): HandleResponseResult {
140144
const validResponses: Cluster__Output[] = [];
141-
let errorMessage: string | null = null;
142-
for (const message of responses) {
143-
if (this.validateResponse(message)) {
144-
validResponses.push(message);
145+
const result: HandleResponseResult = {
146+
accepted: [],
147+
rejected: [],
148+
missing: []
149+
}
150+
for (const {resource, raw} of responses) {
151+
if (this.validateResponse(resource)) {
152+
validResponses.push(resource);
153+
result.accepted.push({
154+
name: resource.name,
155+
raw: raw});
145156
} else {
146-
trace('CDS validation failed for message ' + JSON.stringify(message));
147-
errorMessage = 'CDS Error: Cluster validation failed';
157+
trace('CDS validation failed for message ' + JSON.stringify(resource));
158+
result.rejected.push({
159+
name: resource.name,
160+
raw: raw,
161+
error: `Cluster validation failed for resource ${resource.name}`
162+
});
148163
}
149164
}
150165
this.latestResponses = validResponses;
@@ -163,9 +178,9 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
163178
}
164179
}
165180
trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']');
166-
this.handleMissingNames(allClusterNames);
181+
result.missing = this.handleMissingNames(allClusterNames);
167182
this.edsState.handleMissingNames(allEdsServiceNames);
168-
return errorMessage;
183+
return result;
169184
}
170185

171186
reportStreamError(status: StatusObject): void {

packages/grpc-js-xds/src/xds-stream-state/eds-state.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
1919
import { isIPv4, isIPv6 } from "net";
2020
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
21-
import { Watcher, XdsStreamState } from "./xds-stream-state";
21+
import { Any__Output } from "../generated/google/protobuf/Any";
22+
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
2223

2324
const TRACER_NAME = 'xds_client';
2425

@@ -145,15 +146,26 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
145146
}
146147
}
147148

148-
handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) {
149+
handleResponses(responses: ResourcePair<ClusterLoadAssignment__Output>[], isV2: boolean): HandleResponseResult {
149150
const validResponses: ClusterLoadAssignment__Output[] = [];
150-
let errorMessage: string | null = null;
151-
for (const message of responses) {
152-
if (this.validateResponse(message)) {
153-
validResponses.push(message);
151+
let result: HandleResponseResult = {
152+
accepted: [],
153+
rejected: [],
154+
missing: []
155+
}
156+
for (const {resource, raw} of responses) {
157+
if (this.validateResponse(resource)) {
158+
validResponses.push(resource);
159+
result.accepted.push({
160+
name: resource.cluster_name,
161+
raw: raw});
154162
} else {
155-
trace('EDS validation failed for message ' + JSON.stringify(message));
156-
errorMessage = 'EDS Error: ClusterLoadAssignment validation failed';
163+
trace('EDS validation failed for message ' + JSON.stringify(resource));
164+
result.rejected.push({
165+
name: resource.cluster_name,
166+
raw: raw,
167+
error: `ClusterLoadAssignment validation failed for resource ${resource.cluster_name}`
168+
});
157169
}
158170
}
159171
this.latestResponses = validResponses;
@@ -167,7 +179,7 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
167179
}
168180
}
169181
trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']');
170-
return errorMessage;
182+
return result;
171183
}
172184

173185
reportStreamError(status: StatusObject): void {

packages/grpc-js-xds/src/xds-stream-state/lds-state.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ import * as protoLoader from '@grpc/proto-loader';
1919
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
2020
import { Listener__Output } from '../generated/envoy/config/listener/v3/Listener';
2121
import { RdsState } from "./rds-state";
22-
import { Watcher, XdsStreamState } from "./xds-stream-state";
22+
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
2323
import { HttpConnectionManager__Output } from '../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
2424
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources';
2525
import { getTopLevelFilterUrl, validateTopLevelFilter } from '../http-filter';
2626
import { EXPERIMENTAL_FAULT_INJECTION } from '../environment';
27+
import { Any__Output } from '../generated/google/protobuf/Any';
2728

2829
const TRACER_NAME = 'xds_client';
2930

@@ -143,25 +144,40 @@ export class LdsState implements XdsStreamState<Listener__Output> {
143144
return false;
144145
}
145146

146-
private handleMissingNames(allTargetNames: Set<string>) {
147+
private handleMissingNames(allTargetNames: Set<string>): string[] {
148+
const missingNames: string[] = [];
147149
for (const [targetName, watcherList] of this.watchers.entries()) {
148150
if (!allTargetNames.has(targetName)) {
151+
missingNames.push(targetName);
149152
for (const watcher of watcherList) {
150153
watcher.onResourceDoesNotExist();
151154
}
152155
}
153156
}
157+
return missingNames;
154158
}
155159

156-
handleResponses(responses: Listener__Output[], isV2: boolean): string | null {
160+
handleResponses(responses: ResourcePair<Listener__Output>[], isV2: boolean): HandleResponseResult {
157161
const validResponses: Listener__Output[] = [];
158-
let errorMessage: string | null = null;
159-
for (const message of responses) {
160-
if (this.validateResponse(message, isV2)) {
161-
validResponses.push(message);
162+
let result: HandleResponseResult = {
163+
accepted: [],
164+
rejected: [],
165+
missing: []
166+
}
167+
for (const {resource, raw} of responses) {
168+
if (this.validateResponse(resource, isV2)) {
169+
validResponses.push(resource);
170+
result.accepted.push({
171+
name: resource.name,
172+
raw: raw
173+
});
162174
} else {
163-
trace('LDS validation failed for message ' + JSON.stringify(message));
164-
errorMessage = 'LDS Error: Route validation failed';
175+
trace('LDS validation failed for message ' + JSON.stringify(resource));
176+
result.rejected.push({
177+
name: resource.name,
178+
raw: raw,
179+
error: `Listener validation failed for resource ${resource.name}`
180+
});
165181
}
166182
}
167183
this.latestResponses = validResponses;
@@ -180,9 +196,9 @@ export class LdsState implements XdsStreamState<Listener__Output> {
180196
}
181197
}
182198
trace('Received LDS response with listener names [' + Array.from(allTargetNames) + ']');
183-
this.handleMissingNames(allTargetNames);
199+
result.missing = this.handleMissingNames(allTargetNames);
184200
this.rdsState.handleMissingNames(allRouteConfigNames);
185-
return errorMessage;
201+
return result;
186202
}
187203

188204
reportStreamError(status: StatusObject): void {

0 commit comments

Comments
 (0)