Skip to content

Commit 1ec5996

Browse files
committed
Merge branch 'master' into grpc-js-xds_wrr
2 parents f957004 + 479fa71 commit 1ec5996

File tree

9 files changed

+181
-130
lines changed

9 files changed

+181
-130
lines changed

packages/grpc-js-xds/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@
3434
"devDependencies": {
3535
"@grpc/grpc-js": "file:../grpc-js",
3636
"@grpc/proto-loader": "file:../proto-loader",
37+
"@grpc/reflection": "file:../grpc-reflection",
3738
"@types/gulp": "^4.0.6",
3839
"@types/gulp-mocha": "0.0.32",
3940
"@types/mocha": "^5.2.6",
4041
"@types/node": ">=20.11.20",
41-
"@grpc/reflection": "file:../grpc-reflection",
4242
"@types/yargs": "^15.0.5",
43-
"find-free-ports": "^3.1.1",
4443
"grpc-health-check": "file:../grpc-health-check",
4544
"gts": "^5.0.1",
4645
"ncp": "^2.0.0",
46+
"portfinder": "^1.0.37",
4747
"typescript": "^5.1.3",
4848
"yargs": "^15.4.1"
4949
},

packages/grpc-js-xds/test/backend.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ import { ProtoGrpcType } from "./generated/echo";
2121
import { EchoRequest__Output } from "./generated/grpc/testing/EchoRequest";
2222
import { EchoResponse } from "./generated/grpc/testing/EchoResponse";
2323

24-
import * as net from 'net';
2524
import { XdsServer } from "../src";
2625
import { ControlPlaneServer } from "./xds-server";
27-
import { findFreePorts } from 'find-free-ports';
28-
import { XdsServerCredentials } from "../src/xds-credentials";
26+
import { getPortsPromise } from 'portfinder';
2927

3028
const loadedProtos = loadPackageDefinition(loadSync(
3129
[
@@ -148,6 +146,6 @@ export class Backend {
148146
}
149147

150148
export async function createBackends(count: number, useXdsServer?: boolean, creds?: ServerCredentials | undefined, serverOptions?: ServerOptions): Promise<Backend[]> {
151-
const ports = await findFreePorts(count);
149+
const ports = await getPortsPromise(count);
152150
return ports.map(port => new Backend(port, useXdsServer ?? true, creds, serverOptions));
153151
}

packages/grpc-js/src/load-balancer-pick-first.ts

Lines changed: 1 addition & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import {
3434
} from './picker';
3535
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
3636
import * as logging from './logging';
37-
import { LogVerbosity, Status } from './constants';
37+
import { LogVerbosity } from './constants';
3838
import {
3939
SubchannelInterface,
4040
ConnectivityStateListener,
@@ -44,12 +44,6 @@ import { isTcpSubchannelAddress } from './subchannel-address';
4444
import { isIPv6 } from 'net';
4545
import { ChannelOptions } from './channel-options';
4646
import { StatusOr, statusOrFromValue } from './call-interface';
47-
import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport';
48-
import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService';
49-
import { ClientReadableStream, ServiceError } from './call';
50-
import { createOrcaClient, MetricsListener } from './orca';
51-
import { msToDuration } from './duration';
52-
import { BackoffTimeout } from './backoff-timeout';
5347

5448
const TRACER_NAME = 'pick_first';
5549

@@ -245,13 +239,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
245239

246240
private latestResolutionNote: string = '';
247241

248-
private metricsListeners: Map<MetricsListener, number> = new Map();
249-
private orcaClient: OpenRcaServiceClient | null = null;
250-
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
251-
private currentMetricsIntervalMs: number = Infinity;
252-
private orcaUnsupported = false;
253-
private metricsBackoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());
254-
255242
/**
256243
* Load balancer that attempts to connect to each backend in the address list
257244
* in order, and picks the first one that connects, using it for every
@@ -349,12 +336,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
349336
this.currentPick.removeHealthStateWatcher(
350337
this.pickedSubchannelHealthListener
351338
);
352-
this.orcaClient?.close();
353-
this.orcaClient = null;
354-
this.metricsCall?.cancel();
355-
this.metricsCall = null;
356-
this.metricsBackoffTimer.stop();
357-
this.metricsBackoffTimer.reset();
358339
// Unref last, to avoid triggering listeners
359340
this.currentPick.unref();
360341
this.currentPick = null;
@@ -458,7 +439,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
458439
this.currentPick = subchannel;
459440
clearTimeout(this.connectionDelayTimeout);
460441
this.calculateAndReportNewState();
461-
this.updateMetricsSubscription();
462442
}
463443

464444
private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) {
@@ -588,77 +568,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
588568
destroy() {
589569
this.resetSubchannelList();
590570
this.removeCurrentPick();
591-
this.metricsCall?.cancel();
592-
this.metricsCall = null;
593-
this.orcaClient?.close();
594-
this.orcaClient = null;
595-
this.metricsBackoffTimer.stop();
596571
}
597572

598573
getTypeName(): string {
599574
return TYPE_NAME;
600575
}
601-
602-
private getOrCreateOrcaClient(): OpenRcaServiceClient | null {
603-
if (this.orcaClient) {
604-
return this.orcaClient;
605-
}
606-
if (this.currentPick) {
607-
const channel = this.currentPick.getChannel();
608-
this.orcaClient = createOrcaClient(channel);
609-
return this.orcaClient;
610-
}
611-
return null;
612-
}
613-
614-
private updateMetricsSubscription() {
615-
if (this.orcaUnsupported) {
616-
return;
617-
}
618-
if (this.metricsListeners.size > 0) {
619-
const newInterval = Math.min(...Array.from(this.metricsListeners.values()));
620-
if (!this.metricsCall || newInterval !== this.currentMetricsIntervalMs) {
621-
const orcaClient = this.getOrCreateOrcaClient();
622-
if (!orcaClient) {
623-
return;
624-
}
625-
this.metricsCall?.cancel();
626-
this.currentMetricsIntervalMs = newInterval;
627-
const metricsCall = orcaClient.streamCoreMetrics({report_interval: msToDuration(newInterval)});
628-
this.metricsCall = metricsCall;
629-
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
630-
this.metricsListeners.forEach((interval, listener) => {
631-
listener(report);
632-
});
633-
});
634-
metricsCall.on('error', (error: ServiceError) => {
635-
this.metricsCall = null;
636-
if (error.code === Status.UNIMPLEMENTED) {
637-
this.orcaUnsupported = true;
638-
return;
639-
}
640-
if (error.code === Status.CANCELLED) {
641-
return;
642-
}
643-
this.metricsBackoffTimer.runOnce();
644-
});
645-
}
646-
} else {
647-
this.metricsCall?.cancel();
648-
this.metricsCall = null;
649-
this.currentMetricsIntervalMs = Infinity;
650-
}
651-
}
652-
653-
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
654-
this.metricsListeners.set(listener, intervalMs);
655-
this.updateMetricsSubscription();
656-
}
657-
658-
removeMetricsSubscription(listener: MetricsListener): void {
659-
this.metricsListeners.delete(listener);
660-
this.updateMetricsSubscription();
661-
}
662576
}
663577

664578
const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
@@ -736,14 +650,6 @@ export class LeafLoadBalancer {
736650
destroy() {
737651
this.pickFirstBalancer.destroy();
738652
}
739-
740-
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
741-
this.pickFirstBalancer.addMetricsSubscription(listener, intervalMs);
742-
}
743-
744-
removeMetricsSubscription(listener: MetricsListener): void {
745-
this.pickFirstBalancer.removeMetricsSubscription(listener);
746-
}
747653
}
748654

749655
export function setup(): void {

packages/grpc-js/src/load-balancer-weighted-round-robin.ts

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadRep
2424
import { ChannelControlHelper, createChildChannelControlHelper, LoadBalancer, registerLoadBalancerType, TypedLoadBalancingConfig } from './load-balancer';
2525
import { LeafLoadBalancer } from './load-balancer-pick-first';
2626
import * as logging from './logging';
27-
import { createMetricsReader, MetricsListener } from './orca';
27+
import { createMetricsReader, MetricsListener, OrcaOobMetricsSubchannelWrapper } from './orca';
2828
import { PickArgs, Picker, PickResult, QueuePicker, UnavailablePicker } from './picker';
2929
import { PriorityQueue } from './priority-queue';
3030
import { Endpoint, endpointToString } from './subchannel-address';
@@ -387,27 +387,12 @@ class WeightedRoundRobinLoadBalancer implements LoadBalancer {
387387
const now = new Date();
388388
const seenEndpointNames = new Set<string>();
389389
this.updatesPaused = true;
390+
this.latestConfig = lbConfig;
390391
for (const endpoint of maybeEndpointList.value) {
391392
const name = endpointToString(endpoint);
392393
seenEndpointNames.add(name);
393394
let entry = this.children.get(name);
394-
if (entry) {
395-
if (lbConfig.getEnableOobLoadReport()) {
396-
if (!this.latestConfig || !this.latestConfig.getEnableOobLoadReport() || lbConfig.getOobLoadReportingPeriodMs() !== this.latestConfig.getOobLoadReportingPeriodMs()) {
397-
if (!entry.oobMetricsListener) {
398-
entry.oobMetricsListener = loadReport => {
399-
this.updateWeight(entry!, loadReport);
400-
};
401-
}
402-
entry.child.addMetricsSubscription(entry.oobMetricsListener, lbConfig.getOobLoadReportingPeriodMs());
403-
}
404-
} else {
405-
if (entry.oobMetricsListener) {
406-
entry.child.removeMetricsSubscription(entry.oobMetricsListener);
407-
entry.oobMetricsListener = null;
408-
}
409-
}
410-
} else {
395+
if (!entry) {
411396
entry = {
412397
child: new LeafLoadBalancer(endpoint, createChildChannelControlHelper(this.channelControlHelper, {
413398
updateState: (connectivityState, picker, errorMessage) => {
@@ -426,20 +411,29 @@ class WeightedRoundRobinLoadBalancer implements LoadBalancer {
426411
}
427412
this.calculateAndUpdateState();
428413
},
414+
createSubchannel: (subchannelAddress, subchannelArgs) => {
415+
const subchannel = this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
416+
if (entry?.oobMetricsListener) {
417+
return new OrcaOobMetricsSubchannelWrapper(subchannel, entry.oobMetricsListener, this.latestConfig!.getOobLoadReportingPeriodMs());
418+
} else {
419+
return subchannel;
420+
}
421+
}
429422
}), options, resolutionNote),
430423
lastUpdated: now,
431424
nonEmptySince: null,
432425
weight: 0,
433426
oobMetricsListener: null
434427
};
435-
if (lbConfig.getEnableOobLoadReport()) {
436-
entry.oobMetricsListener = loadReport => {
437-
this.updateWeight(entry!, loadReport);
438-
};
439-
entry.child.addMetricsSubscription(entry.oobMetricsListener, lbConfig.getOobLoadReportingPeriodMs());
440-
}
441428
this.children.set(name, entry);
442429
}
430+
if (lbConfig.getEnableOobLoadReport()) {
431+
entry.oobMetricsListener = loadReport => {
432+
this.updateWeight(entry!, loadReport);
433+
};
434+
} else {
435+
entry.oobMetricsListener = null;
436+
}
443437
}
444438
for (const [endpointName, entry] of this.children) {
445439
if (seenEndpointNames.has(endpointName)) {
@@ -449,7 +443,6 @@ class WeightedRoundRobinLoadBalancer implements LoadBalancer {
449443
this.children.delete(endpointName);
450444
}
451445
}
452-
this.latestConfig = lbConfig;
453446
this.updatesPaused = false;
454447
this.calculateAndUpdateState();
455448
if (this.weightUpdateTimer) {

packages/grpc-js/src/orca.ts

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,17 @@ import type { loadSync } from '@grpc/proto-loader';
2121
import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca";
2222
import { loadPackageDefinition } from "./make-client";
2323
import { OpenRcaServiceClient, OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
24-
import { durationMessageToDuration, durationToMs } from "./duration";
24+
import { durationMessageToDuration, durationToMs, msToDuration } from "./duration";
2525
import { Server } from "./server";
2626
import { ChannelCredentials } from "./channel-credentials";
2727
import { Channel } from "./channel";
2828
import { OnCallEnded } from "./picker";
29+
import { DataProducer, Subchannel } from "./subchannel";
30+
import { BaseSubchannelWrapper, DataWatcher, SubchannelInterface } from "./subchannel-interface";
31+
import { ClientReadableStream, ServiceError } from "./call";
32+
import { Status } from "./constants";
33+
import { BackoffTimeout } from "./backoff-timeout";
34+
import { ConnectivityState } from "./connectivity-state";
2935

3036
const loadedOrcaProto: OrcaProtoGrpcType | null = null;
3137
function loadOrcaProto(): OrcaProtoGrpcType {
@@ -246,3 +252,94 @@ export function createMetricsReader(listener: MetricsListener, previousOnCallEnd
246252
}
247253
}
248254
}
255+
256+
const DATA_PRODUCER_KEY = 'orca_oob_metrics';
257+
258+
class OobMetricsDataWatcher implements DataWatcher {
259+
private dataProducer: DataProducer | null = null;
260+
constructor(private metricsListener: MetricsListener, private intervalMs: number) {}
261+
setSubchannel(subchannel: Subchannel): void {
262+
const producer = subchannel.getOrCreateDataProducer(DATA_PRODUCER_KEY, createOobMetricsDataProducer);
263+
this.dataProducer = producer;
264+
producer.addDataWatcher(this);
265+
}
266+
destroy(): void {
267+
this.dataProducer?.removeDataWatcher(this);
268+
}
269+
getInterval(): number {
270+
return this.intervalMs;
271+
}
272+
onMetricsUpdate(metrics: OrcaLoadReport__Output) {
273+
this.metricsListener(metrics);
274+
}
275+
}
276+
277+
class OobMetricsDataProducer implements DataProducer {
278+
private dataWatchers: Set<OobMetricsDataWatcher> = new Set();
279+
private orcaSupported = true;
280+
private client: OpenRcaServiceClient;
281+
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
282+
private currentInterval = Infinity;
283+
private backoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());
284+
private subchannelStateListener = () => this.updateMetricsSubscription();
285+
constructor(private subchannel: Subchannel) {
286+
const channel = subchannel.getChannel();
287+
this.client = createOrcaClient(channel);
288+
subchannel.addConnectivityStateListener(this.subchannelStateListener);
289+
}
290+
addDataWatcher(dataWatcher: OobMetricsDataWatcher): void {
291+
this.dataWatchers.add(dataWatcher);
292+
this.updateMetricsSubscription();
293+
}
294+
removeDataWatcher(dataWatcher: OobMetricsDataWatcher): void {
295+
this.dataWatchers.delete(dataWatcher);
296+
if (this.dataWatchers.size === 0) {
297+
this.subchannel.removeDataProducer(DATA_PRODUCER_KEY);
298+
this.metricsCall?.cancel();
299+
this.metricsCall = null;
300+
this.client.close();
301+
this.subchannel.removeConnectivityStateListener(this.subchannelStateListener);
302+
} else {
303+
this.updateMetricsSubscription();
304+
}
305+
}
306+
private updateMetricsSubscription() {
307+
if (this.dataWatchers.size === 0 || !this.orcaSupported || this.subchannel.getConnectivityState() !== ConnectivityState.READY) {
308+
return;
309+
}
310+
const newInterval = Math.min(...Array.from(this.dataWatchers).map(watcher => watcher.getInterval()));
311+
if (!this.metricsCall || newInterval !== this.currentInterval) {
312+
this.metricsCall?.cancel();
313+
this.currentInterval = newInterval;
314+
const metricsCall = this.client.streamCoreMetrics({report_interval: msToDuration(newInterval)});
315+
this.metricsCall = metricsCall;
316+
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
317+
this.dataWatchers.forEach(watcher => {
318+
watcher.onMetricsUpdate(report);
319+
});
320+
});
321+
metricsCall.on('error', (error: ServiceError) => {
322+
this.metricsCall = null;
323+
if (error.code === Status.UNIMPLEMENTED) {
324+
this.orcaSupported = false;
325+
return;
326+
}
327+
if (error.code === Status.CANCELLED) {
328+
return;
329+
}
330+
this.backoffTimer.runOnce();
331+
});
332+
}
333+
}
334+
}
335+
336+
export class OrcaOobMetricsSubchannelWrapper extends BaseSubchannelWrapper {
337+
constructor(child: SubchannelInterface, metricsListener: MetricsListener, intervalMs: number) {
338+
super(child);
339+
this.addDataWatcher(new OobMetricsDataWatcher(metricsListener, intervalMs));
340+
}
341+
}
342+
343+
function createOobMetricsDataProducer(subchannel: Subchannel) {
344+
return new OobMetricsDataProducer(subchannel);
345+
}

0 commit comments

Comments
 (0)