Skip to content

Commit eeaa6c0

Browse files
authored
Merge pull request #2434 from murgatroid99/v1.8.x_upmerge
Merge v1.8.x into master
2 parents f05ef55 + 85d227b commit eeaa6c0

File tree

7 files changed

+358
-58
lines changed

7 files changed

+358
-58
lines changed

packages/grpc-js-xds/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js-xds",
3-
"version": "1.8.1",
3+
"version": "1.8.2",
44
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
55
"main": "build/src/index.js",
66
"scripts": {

packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*
1616
*/
1717

18-
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
18+
import { experimental, logVerbosity, Metadata, status, StatusObject } from "@grpc/grpc-js";
1919
import { Any__Output } from "../generated/google/protobuf/Any";
2020

2121
const TRACER_NAME = 'xds_client';
@@ -157,47 +157,55 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
157157
return Array.from(this.subscriptions.keys());
158158
}
159159
handleResponses(responses: ResourcePair<ResponseType>[]): HandleResponseResult {
160-
const validResponses: ResponseType[] = [];
161160
let result: HandleResponseResult = {
162161
accepted: [],
163162
rejected: [],
164163
missing: []
165164
}
165+
const allResourceNames = new Set<string>();
166166
for (const {resource, raw} of responses) {
167167
const resourceName = this.getResourceName(resource);
168+
allResourceNames.add(resourceName);
169+
const subscriptionEntry = this.subscriptions.get(resourceName);
168170
if (this.validateResponse(resource)) {
169-
validResponses.push(resource);
170171
result.accepted.push({
171172
name: resourceName,
172173
raw: raw});
174+
if (subscriptionEntry) {
175+
for (const watcher of subscriptionEntry.watchers) {
176+
/* Use process.nextTick to prevent errors from the watcher from
177+
* bubbling up through here. */
178+
process.nextTick(() => {
179+
watcher.onValidUpdate(resource);
180+
});
181+
}
182+
clearTimeout(subscriptionEntry.resourceTimer);
183+
subscriptionEntry.cachedResponse = resource;
184+
if (subscriptionEntry.deletionIgnored) {
185+
experimental.log(logVerbosity.INFO, `Received resource with previously ignored deletion: ${resourceName}`);
186+
subscriptionEntry.deletionIgnored = false;
187+
}
188+
}
173189
} else {
174190
this.trace('Validation failed for message ' + JSON.stringify(resource));
175191
result.rejected.push({
176192
name: resourceName,
177193
raw: raw,
178194
error: `Validation failed for resource ${resourceName}`
179195
});
180-
}
181-
}
182-
const allResourceNames = new Set<string>();
183-
for (const resource of validResponses) {
184-
const resourceName = this.getResourceName(resource);
185-
allResourceNames.add(resourceName);
186-
const subscriptionEntry = this.subscriptions.get(resourceName);
187-
if (subscriptionEntry) {
188-
const watchers = subscriptionEntry.watchers;
189-
for (const watcher of watchers) {
190-
/* Use process.nextTick to prevent errors from the watcher from
191-
* bubbling up through here. */
192-
process.nextTick(() => {
193-
watcher.onValidUpdate(resource);
194-
});
195-
}
196-
clearTimeout(subscriptionEntry.resourceTimer);
197-
subscriptionEntry.cachedResponse = resource;
198-
if (subscriptionEntry.deletionIgnored) {
199-
experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName);
200-
subscriptionEntry.deletionIgnored = false;
196+
if (subscriptionEntry) {
197+
for (const watcher of subscriptionEntry.watchers) {
198+
/* Use process.nextTick to prevent errors from the watcher from
199+
* bubbling up through here. */
200+
process.nextTick(() => {
201+
watcher.onTransientError({
202+
code: status.UNAVAILABLE,
203+
details: `Validation failed for resource ${resourceName}`,
204+
metadata: new Metadata()
205+
});
206+
});
207+
}
208+
clearTimeout(subscriptionEntry.resourceTimer);
201209
}
202210
}
203211
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2023 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import * as assert from 'assert';
19+
import { register } from "../src";
20+
import { Cluster } from '../src/generated/envoy/config/cluster/v3/Cluster';
21+
import { Backend } from "./backend";
22+
import { XdsTestClient } from "./client";
23+
import { FakeEdsCluster, FakeRouteGroup } from "./framework";
24+
import { XdsServer } from "./xds-server";
25+
26+
register();
27+
28+
describe('Validation errors', () => {
29+
let xdsServer: XdsServer;
30+
let client: XdsTestClient;
31+
beforeEach(done => {
32+
xdsServer = new XdsServer();
33+
xdsServer.startServer(error => {
34+
done(error);
35+
});
36+
});
37+
afterEach(() => {
38+
client?.close();
39+
xdsServer?.shutdownServer();
40+
});
41+
it('Should continue to use a valid resource after receiving an invalid EDS update', done => {
42+
const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
43+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
44+
routeGroup.startAllBackends().then(() => {
45+
xdsServer.setEdsResource(cluster.getEndpointConfig());
46+
xdsServer.setCdsResource(cluster.getClusterConfig());
47+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
48+
xdsServer.setLdsResource(routeGroup.getListener());
49+
client = new XdsTestClient('route1', xdsServer);
50+
client.startCalls(100);
51+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
52+
// After backends receive calls, set invalid EDS resource
53+
const invalidEdsResource = {cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]};
54+
xdsServer.setEdsResource(invalidEdsResource);
55+
let seenNack = false;
56+
xdsServer.addResponseListener((typeUrl, responseState) => {
57+
if (responseState.state === 'NACKED') {
58+
if (seenNack) {
59+
return;
60+
}
61+
seenNack = true;
62+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
63+
client.stopCalls();
64+
done();
65+
});
66+
}
67+
});
68+
}, reason => done(reason));
69+
}, reason => done(reason));
70+
});
71+
it('Should continue to use a valid resource after receiving an invalid CDS update', done => {
72+
const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
73+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
74+
routeGroup.startAllBackends().then(() => {
75+
xdsServer.setEdsResource(cluster.getEndpointConfig());
76+
xdsServer.setCdsResource(cluster.getClusterConfig());
77+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
78+
xdsServer.setLdsResource(routeGroup.getListener());
79+
client = new XdsTestClient('route1', xdsServer);
80+
client.startCalls(100);
81+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
82+
// After backends receive calls, set invalid CDS resource
83+
const invalidCdsResource: Cluster = {name: cluster.getClusterConfig().name, type: 'EDS'};
84+
xdsServer.setCdsResource(invalidCdsResource);
85+
let seenNack = false;
86+
xdsServer.addResponseListener((typeUrl, responseState) => {
87+
if (responseState.state === 'NACKED') {
88+
if (seenNack) {
89+
return;
90+
}
91+
seenNack = true;
92+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
93+
client.stopCalls();
94+
done();
95+
});
96+
}
97+
});
98+
}, reason => done(reason));
99+
}, reason => done(reason));
100+
});
101+
it('Should continue to use a valid resource after receiving an invalid RDS update', done => {
102+
const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
103+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
104+
routeGroup.startAllBackends().then(() => {
105+
xdsServer.setEdsResource(cluster.getEndpointConfig());
106+
xdsServer.setCdsResource(cluster.getClusterConfig());
107+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
108+
xdsServer.setLdsResource(routeGroup.getListener());
109+
client = new XdsTestClient('route1', xdsServer);
110+
client.startCalls(100);
111+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
112+
// After backends receive calls, set invalid RDS resource
113+
const invalidRdsResource = {name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]};
114+
xdsServer.setRdsResource(invalidRdsResource);
115+
let seenNack = false;
116+
xdsServer.addResponseListener((typeUrl, responseState) => {
117+
if (responseState.state === 'NACKED') {
118+
if (seenNack) {
119+
return;
120+
}
121+
seenNack = true;
122+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
123+
client.stopCalls();
124+
done();
125+
});
126+
}
127+
});
128+
}, reason => done(reason));
129+
}, reason => done(reason));
130+
});
131+
it('Should continue to use a valid resource after receiving an invalid LDS update', done => {
132+
const cluster = new FakeEdsCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
133+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
134+
routeGroup.startAllBackends().then(() => {
135+
xdsServer.setEdsResource(cluster.getEndpointConfig());
136+
xdsServer.setCdsResource(cluster.getClusterConfig());
137+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
138+
xdsServer.setLdsResource(routeGroup.getListener());
139+
client = new XdsTestClient('route1', xdsServer);
140+
client.startCalls(100);
141+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
142+
// After backends receive calls, set invalid LDS resource
143+
const invalidLdsResource = {name: routeGroup.getListener().name};
144+
xdsServer.setLdsResource(invalidLdsResource);
145+
let seenNack = false;
146+
xdsServer.addResponseListener((typeUrl, responseState) => {
147+
if (responseState.state === 'NACKED') {
148+
if (seenNack) {
149+
return;
150+
}
151+
seenNack = true;
152+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
153+
client.stopCalls();
154+
done();
155+
});
156+
}
157+
});
158+
}, reason => done(reason));
159+
}, reason => done(reason));
160+
});
161+
});

packages/grpc-js/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js",
3-
"version": "1.8.13",
3+
"version": "1.8.14",
44
"description": "gRPC Library for Node - pure JS implementation",
55
"homepage": "https://grpc.io/",
66
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

packages/grpc-js/src/load-balancer-pick-first.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,12 +322,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
322322
);
323323
}
324324
this.currentPick = subchannel;
325-
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
326325
subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener);
327326
subchannel.ref();
328327
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
329328
this.resetSubchannelList();
330329
clearTimeout(this.connectionDelayTimeout);
330+
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
331331
}
332332

333333
private updateState(newState: ConnectivityState, picker: Picker) {

packages/grpc-js/src/subchannel.ts

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export class Subchannel {
6060
* state changes. Will be modified by `addConnectivityStateListener` and
6161
* `removeConnectivityStateListener`
6262
*/
63-
private stateListeners: ConnectivityStateListener[] = [];
63+
private stateListeners: Set<ConnectivityStateListener> = new Set();
6464

6565
private backoffTimeout: BackoffTimeout;
6666

@@ -261,9 +261,7 @@ export class Subchannel {
261261
default:
262262
throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
263263
}
264-
/* We use a shallow copy of the stateListeners array in case a listener
265-
* is removed during this iteration */
266-
for (const listener of [...this.stateListeners]) {
264+
for (const listener of this.stateListeners) {
267265
listener(this, previousState, newState, this.keepaliveTime);
268266
}
269267
return true;
@@ -291,13 +289,15 @@ export class Subchannel {
291289
if (this.channelzEnabled) {
292290
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
293291
}
294-
this.transitionToState(
295-
[ConnectivityState.CONNECTING, ConnectivityState.READY],
296-
ConnectivityState.IDLE
297-
);
298292
if (this.channelzEnabled) {
299293
unregisterChannelzRef(this.channelzRef);
300294
}
295+
process.nextTick(() => {
296+
this.transitionToState(
297+
[ConnectivityState.CONNECTING, ConnectivityState.READY],
298+
ConnectivityState.IDLE
299+
);
300+
});
301301
}
302302
}
303303

@@ -339,20 +339,22 @@ export class Subchannel {
339339
* Otherwise, do nothing.
340340
*/
341341
startConnecting() {
342-
/* First, try to transition from IDLE to connecting. If that doesn't happen
343-
* because the state is not currently IDLE, check if it is
344-
* TRANSIENT_FAILURE, and if so indicate that it should go back to
345-
* connecting after the backoff timer ends. Otherwise do nothing */
346-
if (
347-
!this.transitionToState(
348-
[ConnectivityState.IDLE],
349-
ConnectivityState.CONNECTING
350-
)
351-
) {
352-
if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
353-
this.continueConnecting = true;
342+
process.nextTick(() => {
343+
/* First, try to transition from IDLE to connecting. If that doesn't happen
344+
* because the state is not currently IDLE, check if it is
345+
* TRANSIENT_FAILURE, and if so indicate that it should go back to
346+
* connecting after the backoff timer ends. Otherwise do nothing */
347+
if (
348+
!this.transitionToState(
349+
[ConnectivityState.IDLE],
350+
ConnectivityState.CONNECTING
351+
)
352+
) {
353+
if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
354+
this.continueConnecting = true;
355+
}
354356
}
355-
}
357+
});
356358
}
357359

358360
/**
@@ -368,7 +370,7 @@ export class Subchannel {
368370
* @param listener
369371
*/
370372
addConnectivityStateListener(listener: ConnectivityStateListener) {
371-
this.stateListeners.push(listener);
373+
this.stateListeners.add(listener);
372374
}
373375

374376
/**
@@ -377,21 +379,20 @@ export class Subchannel {
377379
* `addConnectivityStateListener`
378380
*/
379381
removeConnectivityStateListener(listener: ConnectivityStateListener) {
380-
const listenerIndex = this.stateListeners.indexOf(listener);
381-
if (listenerIndex > -1) {
382-
this.stateListeners.splice(listenerIndex, 1);
383-
}
382+
this.stateListeners.delete(listener);
384383
}
385384

386385
/**
387386
* Reset the backoff timeout, and immediately start connecting if in backoff.
388387
*/
389388
resetBackoff() {
390-
this.backoffTimeout.reset();
391-
this.transitionToState(
392-
[ConnectivityState.TRANSIENT_FAILURE],
393-
ConnectivityState.CONNECTING
394-
);
389+
process.nextTick(() => {
390+
this.backoffTimeout.reset();
391+
this.transitionToState(
392+
[ConnectivityState.TRANSIENT_FAILURE],
393+
ConnectivityState.CONNECTING
394+
);
395+
});
395396
}
396397

397398
getAddress(): string {

0 commit comments

Comments
 (0)