Skip to content

Commit cea1537

Browse files
committed
Merge remote-tracking branch 'upstream/@grpc/[email protected]' into upmerge_1.4.3
2 parents 564edde + cb29b6a commit cea1537

File tree

11 files changed

+258
-92
lines changed

11 files changed

+258
-92
lines changed

packages/grpc-js-xds/src/xds-stream-state/cds-state.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,21 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
137137
}
138138

139139
handleResponses(responses: Cluster__Output[], isV2: boolean): string | null {
140+
const validResponses: Cluster__Output[] = [];
141+
let errorMessage: string | null = null;
140142
for (const message of responses) {
141-
if (!this.validateResponse(message)) {
143+
if (this.validateResponse(message)) {
144+
validResponses.push(message);
145+
} else {
142146
trace('CDS validation failed for message ' + JSON.stringify(message));
143-
return 'CDS Error: Cluster validation failed';
147+
errorMessage = 'CDS Error: Cluster validation failed';
144148
}
145149
}
146-
this.latestResponses = responses;
150+
this.latestResponses = validResponses;
147151
this.latestIsV2 = isV2;
148152
const allEdsServiceNames: Set<string> = new Set<string>();
149153
const allClusterNames: Set<string> = new Set<string>();
150-
for (const message of responses) {
154+
for (const message of validResponses) {
151155
allClusterNames.add(message.name);
152156
const edsServiceName = message.eds_cluster_config?.service_name ?? '';
153157
allEdsServiceNames.add(
@@ -161,7 +165,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
161165
trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']');
162166
this.handleMissingNames(allClusterNames);
163167
this.edsState.handleMissingNames(allEdsServiceNames);
164-
return null;
168+
return errorMessage;
165169
}
166170

167171
reportStreamError(status: StatusObject): void {

packages/grpc-js-xds/src/xds-stream-state/eds-state.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,24 +146,28 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
146146
}
147147

148148
handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) {
149+
const validResponses: ClusterLoadAssignment__Output[] = [];
150+
let errorMessage: string | null = null;
149151
for (const message of responses) {
150-
if (!this.validateResponse(message)) {
152+
if (this.validateResponse(message)) {
153+
validResponses.push(message);
154+
} else {
151155
trace('EDS validation failed for message ' + JSON.stringify(message));
152-
return 'EDS Error: ClusterLoadAssignment validation failed';
156+
errorMessage = 'EDS Error: ClusterLoadAssignment validation failed';
153157
}
154158
}
155-
this.latestResponses = responses;
159+
this.latestResponses = validResponses;
156160
this.latestIsV2 = isV2;
157161
const allClusterNames: Set<string> = new Set<string>();
158-
for (const message of responses) {
162+
for (const message of validResponses) {
159163
allClusterNames.add(message.cluster_name);
160164
const watchers = this.watchers.get(message.cluster_name) ?? [];
161165
for (const watcher of watchers) {
162166
watcher.onValidUpdate(message, isV2);
163167
}
164168
}
165169
trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']');
166-
return null;
170+
return errorMessage;
167171
}
168172

169173
reportStreamError(status: StatusObject): void {

packages/grpc-js-xds/src/xds-stream-state/lds-state.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,17 +154,21 @@ export class LdsState implements XdsStreamState<Listener__Output> {
154154
}
155155

156156
handleResponses(responses: Listener__Output[], isV2: boolean): string | null {
157+
const validResponses: Listener__Output[] = [];
158+
let errorMessage: string | null = null;
157159
for (const message of responses) {
158-
if (!this.validateResponse(message, isV2)) {
160+
if (this.validateResponse(message, isV2)) {
161+
validResponses.push(message);
162+
} else {
159163
trace('LDS validation failed for message ' + JSON.stringify(message));
160-
return 'LDS Error: Route validation failed';
164+
errorMessage = 'LDS Error: Route validation failed';
161165
}
162166
}
163-
this.latestResponses = responses;
167+
this.latestResponses = validResponses;
164168
this.latestIsV2 = isV2;
165169
const allTargetNames = new Set<string>();
166170
const allRouteConfigNames = new Set<string>();
167-
for (const message of responses) {
171+
for (const message of validResponses) {
168172
allTargetNames.add(message.name);
169173
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener!.value);
170174
if (httpConnectionManager.rds) {
@@ -178,7 +182,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
178182
trace('Received LDS response with listener names [' + Array.from(allTargetNames) + ']');
179183
this.handleMissingNames(allTargetNames);
180184
this.rdsState.handleMissingNames(allRouteConfigNames);
181-
return null;
185+
return errorMessage;
182186
}
183187

184188
reportStreamError(status: StatusObject): void {

packages/grpc-js-xds/src/xds-stream-state/rds-state.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,24 +183,28 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
183183
}
184184

185185
handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null {
186+
const validResponses: RouteConfiguration__Output[] = [];
187+
let errorMessage: string | null = null;
186188
for (const message of responses) {
187-
if (!this.validateResponse(message, isV2)) {
189+
if (this.validateResponse(message, isV2)) {
190+
validResponses.push(message);
191+
} else {
188192
trace('RDS validation failed for message ' + JSON.stringify(message));
189-
return 'RDS Error: Route validation failed';
193+
errorMessage = 'RDS Error: Route validation failed';
190194
}
191195
}
192-
this.latestResponses = responses;
196+
this.latestResponses = validResponses;
193197
this.latestIsV2 = isV2;
194198
const allRouteConfigNames = new Set<string>();
195-
for (const message of responses) {
199+
for (const message of validResponses) {
196200
allRouteConfigNames.add(message.name);
197201
const watchers = this.watchers.get(message.name) ?? [];
198202
for (const watcher of watchers) {
199203
watcher.onValidUpdate(message, isV2);
200204
}
201205
}
202206
trace('Received RDS response with route config names [' + Array.from(allRouteConfigNames) + ']');
203-
return null;
207+
return errorMessage;
204208
}
205209

206210
reportStreamError(status: StatusObject): void {

packages/grpc-js/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js",
3-
"version": "1.4.1",
3+
"version": "1.4.3",
44
"description": "gRPC Library for Node - pure JS implementation",
55
"homepage": "https://grpc.io/",
66
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

packages/grpc-js/src/channel-options.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export interface ChannelOptions {
3636
'grpc.enable_http_proxy'?: number;
3737
'grpc.http_connect_target'?: string;
3838
'grpc.http_connect_creds'?: string;
39+
'grpc.enable_channelz'?: number;
3940
'grpc-node.max_session_memory'?: number;
4041
// eslint-disable-next-line @typescript-eslint/no-explicit-any
4142
[key: string]: any;
@@ -61,6 +62,7 @@ export const recognizedOptions = {
6162
'grpc.max_send_message_length': true,
6263
'grpc.max_receive_message_length': true,
6364
'grpc.enable_http_proxy': true,
65+
'grpc.enable_channelz': true,
6466
'grpc-node.max_session_memory': true,
6567
};
6668

packages/grpc-js/src/channel.ts

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ export class ChannelImplementation implements Channel {
172172
private configSelector: ConfigSelector | null = null;
173173

174174
// Channelz info
175+
private readonly channelzEnabled: boolean = true;
175176
private originalTarget: string;
176177
private channelzRef: ChannelRef;
177178
private channelzTrace: ChannelzTrace;
@@ -213,9 +214,22 @@ export class ChannelImplementation implements Channel {
213214
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
214215
this.callRefTimer.unref?.();
215216

216-
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
217+
if (this.options['grpc.enable_channelz'] === 0) {
218+
this.channelzEnabled = false;
219+
}
220+
217221
this.channelzTrace = new ChannelzTrace();
218-
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
222+
if (this.channelzEnabled) {
223+
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
224+
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
225+
} else {
226+
// Dummy channelz ref that will never be used
227+
this.channelzRef = {
228+
kind: 'channel',
229+
id: -1,
230+
name: ''
231+
};
232+
}
219233

220234
if (this.options['grpc.default_authority']) {
221235
this.defaultAuthority = this.options['grpc.default_authority'] as string;
@@ -242,7 +256,9 @@ export class ChannelImplementation implements Channel {
242256
Object.assign({}, this.options, subchannelArgs),
243257
this.credentials
244258
);
245-
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
259+
if (this.channelzEnabled) {
260+
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
261+
}
246262
return subchannel;
247263
},
248264
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
@@ -262,18 +278,24 @@ export class ChannelImplementation implements Channel {
262278
);
263279
},
264280
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
265-
this.childrenTracker.refChild(child);
281+
if (this.channelzEnabled) {
282+
this.childrenTracker.refChild(child);
283+
}
266284
},
267285
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
268-
this.childrenTracker.unrefChild(child);
286+
if (this.channelzEnabled) {
287+
this.childrenTracker.unrefChild(child);
288+
}
269289
}
270290
};
271291
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
272292
this.target,
273293
channelControlHelper,
274294
options,
275295
(configSelector) => {
276-
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
296+
if (this.channelzEnabled) {
297+
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
298+
}
277299
this.configSelector = configSelector;
278300
/* We process the queue asynchronously to ensure that the corresponding
279301
* load balancer update has completed. */
@@ -288,7 +310,9 @@ export class ChannelImplementation implements Channel {
288310
});
289311
},
290312
(status) => {
291-
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
313+
if (this.channelzEnabled) {
314+
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
315+
}
292316
if (this.configSelectionQueue.length > 0) {
293317
this.trace('Name resolution failed with calls queued for config selection');
294318
}
@@ -553,7 +577,9 @@ export class ChannelImplementation implements Channel {
553577
' -> ' +
554578
ConnectivityState[newState]
555579
);
556-
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
580+
if (this.channelzEnabled) {
581+
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
582+
}
557583
this.connectivityState = newState;
558584
const watchersCopy = this.connectivityStateWatchers.slice();
559585
for (const watcherObject of watchersCopy) {
@@ -638,7 +664,9 @@ export class ChannelImplementation implements Channel {
638664
this.resolvingLoadBalancer.destroy();
639665
this.updateState(ConnectivityState.SHUTDOWN);
640666
clearInterval(this.callRefTimer);
641-
unregisterChannelzRef(this.channelzRef);
667+
if (this.channelzEnabled) {
668+
unregisterChannelzRef(this.channelzRef);
669+
}
642670

643671
this.subchannelPool.unrefUnusedSubchannels();
644672
}
@@ -690,6 +718,11 @@ export class ChannelImplementation implements Channel {
690718
this.connectivityStateWatchers.push(watcherObject);
691719
}
692720

721+
/**
722+
* Get the channelz reference object for this channel. The returned value is
723+
* garbage if channelz is disabled for this channel.
724+
* @returns
725+
*/
693726
getChannelzRef() {
694727
return this.channelzRef;
695728
}
@@ -735,14 +768,16 @@ export class ChannelImplementation implements Channel {
735768
this.credentials._getCallCredentials(),
736769
callNumber
737770
);
738-
this.callTracker.addCallStarted();
739-
stream.addStatusWatcher(status => {
740-
if (status.code === Status.OK) {
741-
this.callTracker.addCallSucceeded();
742-
} else {
743-
this.callTracker.addCallFailed();
744-
}
745-
});
771+
if (this.channelzEnabled) {
772+
this.callTracker.addCallStarted();
773+
stream.addStatusWatcher(status => {
774+
if (status.code === Status.OK) {
775+
this.callTracker.addCallSucceeded();
776+
} else {
777+
this.callTracker.addCallFailed();
778+
}
779+
});
780+
}
746781
return stream;
747782
}
748783
}

packages/grpc-js/src/channelz.ts

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,14 @@ interface TraceEvent {
113113
childSubchannel?: SubchannelRef;
114114
}
115115

116+
/**
117+
* The loose upper bound on the number of events that should be retained in a
118+
* trace. This may be exceeded by up to a factor of 2. Arbitrarily chosen as a
119+
* number that should be large enough to contain the recent relevant
120+
* information, but small enough to not use excessive memory.
121+
*/
122+
const TARGET_RETAINED_TRACES = 32;
123+
116124
export class ChannelzTrace {
117125
events: TraceEvent[] = [];
118126
creationTimestamp: Date;
@@ -131,6 +139,10 @@ export class ChannelzTrace {
131139
childChannel: child?.kind === 'channel' ? child : undefined,
132140
childSubchannel: child?.kind === 'subchannel' ? child : undefined
133141
});
142+
// Whenever the trace array gets too large, discard the first half
143+
if (this.events.length >= TARGET_RETAINED_TRACES * 2) {
144+
this.events = this.events.slice(TARGET_RETAINED_TRACES);
145+
}
134146
this.eventsLogged += 1;
135147
}
136148

@@ -380,18 +392,29 @@ export function unregisterChannelzRef(ref: ChannelRef | SubchannelRef | ServerRe
380392
}
381393
}
382394

383-
export interface ChannelzClientView {
384-
updateState(connectivityState: ConnectivityState): void;
385-
addTrace(severity: TraceSeverity, description: string, child?: ChannelRef | SubchannelRef): void;
386-
addCallStarted(): void;
387-
addCallSucceeded(): void;
388-
addCallFailed(): void;
389-
addChild(child: ChannelRef | SubchannelRef): void;
390-
removeChild(child: ChannelRef | SubchannelRef): void;
395+
/**
396+
* Parse a single section of an IPv6 address as two bytes
397+
* @param addressSection A hexadecimal string of length up to 4
398+
* @returns The pair of bytes representing this address section
399+
*/
400+
function parseIPv6Section(addressSection: string): [number, number] {
401+
const numberValue = Number.parseInt(addressSection, 16);
402+
return [numberValue / 256 | 0, numberValue % 256];
391403
}
392404

393-
export interface ChannelzSubchannelView extends ChannelzClientView {
394-
getRef(): SubchannelRef;
405+
/**
406+
* Parse a chunk of an IPv6 address string to some number of bytes
407+
* @param addressChunk Some number of segments of up to 4 hexadecimal
408+
* characters each, joined by colons.
409+
* @returns The list of bytes representing this address chunk
410+
*/
411+
function parseIPv6Chunk(addressChunk: string): number[] {
412+
if (addressChunk === '') {
413+
return [];
414+
}
415+
const bytePairs = addressChunk.split(':').map(section => parseIPv6Section(section));
416+
const result: number[] = [];
417+
return result.concat(...bytePairs);
395418
}
396419

397420
/**
@@ -405,17 +428,17 @@ function ipAddressStringToBuffer(ipAddress: string): Buffer | null {
405428
return Buffer.from(Uint8Array.from(ipAddress.split('.').map(segment => Number.parseInt(segment))));
406429
} else if (isIPv6(ipAddress)) {
407430
let leftSection: string;
408-
let rightSection: string | null;
431+
let rightSection: string;
409432
const doubleColonIndex = ipAddress.indexOf('::');
410433
if (doubleColonIndex === -1) {
411434
leftSection = ipAddress;
412-
rightSection = null;
435+
rightSection = '';
413436
} else {
414437
leftSection = ipAddress.substring(0, doubleColonIndex);
415438
rightSection = ipAddress.substring(doubleColonIndex + 2);
416439
}
417-
const leftBuffer = Uint8Array.from(leftSection.split(':').map(segment => Number.parseInt(segment, 16)));
418-
const rightBuffer = rightSection ? Uint8Array.from(rightSection.split(':').map(segment => Number.parseInt(segment, 16))) : new Uint8Array();
440+
const leftBuffer = Buffer.from(parseIPv6Chunk(leftSection));
441+
const rightBuffer = Buffer.from(parseIPv6Chunk(rightSection));
419442
const middleBuffer = Buffer.alloc(16 - leftBuffer.length - rightBuffer.length, 0);
420443
return Buffer.concat([leftBuffer, middleBuffer, rightBuffer]);
421444
} else {

0 commit comments

Comments
 (0)