4040import io .grpc .services .MetricReport ;
4141import io .grpc .util .ForwardingSubchannel ;
4242import io .grpc .util .MultiChildLoadBalancer ;
43+ import io .grpc .xds .internal .MetricReportUtils ;
4344import io .grpc .xds .orca .OrcaOobUtil ;
4445import io .grpc .xds .orca .OrcaOobUtil .OrcaOobReportListener ;
4546import io .grpc .xds .orca .OrcaPerRequestUtil ;
4950import java .util .HashSet ;
5051import java .util .List ;
5152import java .util .Objects ;
53+ import java .util .OptionalDouble ;
5254import java .util .Random ;
5355import java .util .Set ;
5456import java .util .concurrent .ScheduledExecutorService ;
8789 * See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config
8890 */
8991final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
92+ @ VisibleForTesting
93+ static boolean enableCustomConfig =
94+ Boolean .parseBoolean (System .getenv ("GRPC_EXPERIMENTAL_WRR_CUSTOM_METRICS" ));
9095
9196 private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER ;
9297 private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER ;
@@ -189,7 +194,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
189194 this .backendService = "" ;
190195 }
191196 config =
192- (WeightedRoundRobinLoadBalancerConfig ) resolvedAddresses .getLoadBalancingPolicyConfig ();
197+ (WeightedRoundRobinLoadBalancerConfig ) resolvedAddresses .getLoadBalancingPolicyConfig ();
193198
194199 if (weightUpdateTimer != null && weightUpdateTimer .isPending ()) {
195200 weightUpdateTimer .cancel ();
@@ -236,7 +241,8 @@ protected void updateOverallBalancingState() {
236241
237242 private SubchannelPicker createReadyPicker (Collection <ChildLbState > activeList ) {
238243 WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker (ImmutableList .copyOf (activeList ),
239- config .enableOobLoadReport , config .errorUtilizationPenalty , sequence );
244+ config .enableOobLoadReport , config .errorUtilizationPenalty , sequence ,
245+ config .metricNamesForComputingUtilization );
240246 updateWeight (picker );
241247 return picker ;
242248 }
@@ -325,12 +331,16 @@ public void addSubchannel(WrrSubchannel wrrSubchannel) {
325331 subchannels .add (wrrSubchannel );
326332 }
327333
328- public OrcaReportListener getOrCreateOrcaListener (float errorUtilizationPenalty ) {
334+ public OrcaReportListener getOrCreateOrcaListener (float errorUtilizationPenalty ,
335+ ImmutableList <String > metricNamesForComputingUtilization ) {
329336 if (orcaReportListener != null
330- && orcaReportListener .errorUtilizationPenalty == errorUtilizationPenalty ) {
337+ && orcaReportListener .errorUtilizationPenalty == errorUtilizationPenalty
338+ && Objects .equals (orcaReportListener .metricNamesForComputingUtilization ,
339+ metricNamesForComputingUtilization )) {
331340 return orcaReportListener ;
332341 }
333- orcaReportListener = new OrcaReportListener (errorUtilizationPenalty );
342+ orcaReportListener =
343+ new OrcaReportListener (errorUtilizationPenalty , metricNamesForComputingUtilization );
334344 return orcaReportListener ;
335345 }
336346
@@ -355,18 +365,19 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
355365
356366 final class OrcaReportListener implements OrcaPerRequestReportListener , OrcaOobReportListener {
357367 private final float errorUtilizationPenalty ;
368+ private final ImmutableList <String > metricNamesForComputingUtilization ;
358369
359- OrcaReportListener (float errorUtilizationPenalty ) {
370+ OrcaReportListener (float errorUtilizationPenalty ,
371+ ImmutableList <String > metricNamesForComputingUtilization ) {
360372 this .errorUtilizationPenalty = errorUtilizationPenalty ;
373+ this .metricNamesForComputingUtilization = metricNamesForComputingUtilization ;
361374 }
362375
363376 @ Override
364377 public void onLoadReport (MetricReport report ) {
378+ double utilization = getUtilization (report , metricNamesForComputingUtilization );
379+
365380 double newWeight = 0 ;
366- // Prefer application utilization and fallback to CPU utilization if unset.
367- double utilization =
368- report .getApplicationUtilization () > 0 ? report .getApplicationUtilization ()
369- : report .getCpuUtilization ();
370381 if (utilization > 0 && report .getQps () > 0 ) {
371382 double penalty = 0 ;
372383 if (report .getEps () > 0 && errorUtilizationPenalty > 0 ) {
@@ -383,6 +394,37 @@ public void onLoadReport(MetricReport report) {
383394 lastUpdated = ticker .nanoTime ();
384395 weight = newWeight ;
385396 }
397+
398+ /**
399+ * Returns the utilization value computed from the specified metric names. If the application
400+ * utilization is present and valid, it is returned. Otherwise, the maximum of the custom
401+ * metrics specified is returned. If none of the custom metrics are present, the CPU
402+ * utilization is returned.
403+ */
404+ private double getUtilization (MetricReport report , ImmutableList <String > metricNames ) {
405+ double appUtil = report .getApplicationUtilization ();
406+ if (appUtil > 0 ) {
407+ return appUtil ;
408+ }
409+ return getCustomMetricUtilization (report , metricNames )
410+ .orElse (report .getCpuUtilization ());
411+ }
412+
413+ /**
414+ * Returns the maximum utilization value among the specified metric names.
415+ * Returns OptionalDouble.empty() if NONE of the specified metrics are present in the report,
416+ * or if all present metrics are NaN.
417+ * Returns OptionalDouble.of(maxUtil) if at least one non-NaN metric is present.
418+ */
419+ private OptionalDouble getCustomMetricUtilization (MetricReport report ,
420+ ImmutableList <String > metricNames ) {
421+ return metricNames .stream ()
422+ .map (name -> MetricReportUtils .getMetric (report , name ))
423+ .filter (OptionalDouble ::isPresent )
424+ .mapToDouble (OptionalDouble ::getAsDouble )
425+ .filter (d -> !Double .isNaN (d ) && d > 0 )
426+ .max ();
427+ }
386428 }
387429 }
388430
@@ -403,10 +445,10 @@ private void createAndApplyOrcaListeners() {
403445 for (WrrSubchannel weightedSubchannel : wChild .subchannels ) {
404446 if (config .enableOobLoadReport ) {
405447 OrcaOobUtil .setListener (weightedSubchannel ,
406- wChild .getOrCreateOrcaListener (config .errorUtilizationPenalty ),
448+ wChild .getOrCreateOrcaListener (config .errorUtilizationPenalty ,
449+ config .metricNamesForComputingUtilization ),
407450 OrcaOobUtil .OrcaReportingConfig .newBuilder ()
408- .setReportInterval (config .oobReportingPeriodNanos , TimeUnit .NANOSECONDS )
409- .build ());
451+ .setReportInterval (config .oobReportingPeriodNanos , TimeUnit .NANOSECONDS ).build ());
410452 } else {
411453 OrcaOobUtil .setListener (weightedSubchannel , null , null );
412454 }
@@ -473,7 +515,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
473515 private volatile StaticStrideScheduler scheduler ;
474516
475517 WeightedRoundRobinPicker (List <ChildLbState > children , boolean enableOobLoadReport ,
476- float errorUtilizationPenalty , AtomicInteger sequence ) {
518+ float errorUtilizationPenalty , AtomicInteger sequence ,
519+ ImmutableList <String > metricNamesForComputingUtilization ) {
477520 checkNotNull (children , "children" );
478521 Preconditions .checkArgument (!children .isEmpty (), "empty child list" );
479522 this .children = children ;
@@ -482,7 +525,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
482525 for (ChildLbState child : children ) {
483526 WeightedChildLbState wChild = (WeightedChildLbState ) child ;
484527 pickers .add (wChild .getCurrentPicker ());
485- reportListeners .add (wChild .getOrCreateOrcaListener (errorUtilizationPenalty ));
528+ reportListeners .add (wChild .getOrCreateOrcaListener (errorUtilizationPenalty ,
529+ metricNamesForComputingUtilization ));
486530 }
487531 this .pickers = pickers ;
488532 this .reportListeners = reportListeners ;
@@ -565,11 +609,11 @@ public boolean equals(Object o) {
565609 * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
566610 * in which each object's deadline is the multiplicative inverse of the object's weight.
567611 * <p>
568- * The way in which this is implemented is through a static stride scheduler.
612+ * The way in which this is implemented is through a static stride scheduler.
569613 * The Static Stride Scheduler works by iterating through the list of subchannel weights
570- * and using modular arithmetic to proportionally distribute picks, favoring entries
571- * with higher weights. It is based on the observation that the intended sequence generated
572- * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic.
614+ * and using modular arithmetic to proportionally distribute picks, favoring entries
615+ * with higher weights. It is based on the observation that the intended sequence generated
616+ * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic.
573617 * The Static Stride Scheduler is more performant than other implementations of the EDF
574618 * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
575619 * <p>
@@ -720,23 +764,23 @@ static final class WeightedRoundRobinLoadBalancerConfig {
720764 final long oobReportingPeriodNanos ;
721765 final long weightUpdatePeriodNanos ;
722766 final float errorUtilizationPenalty ;
767+ final ImmutableList <String > metricNamesForComputingUtilization ;
723768
724769 public static Builder newBuilder () {
725770 return new Builder ();
726771 }
727772
728773 private WeightedRoundRobinLoadBalancerConfig (long blackoutPeriodNanos ,
729- long weightExpirationPeriodNanos ,
730- boolean enableOobLoadReport ,
731- long oobReportingPeriodNanos ,
732- long weightUpdatePeriodNanos ,
733- float errorUtilizationPenalty ) {
774+ long weightExpirationPeriodNanos , boolean enableOobLoadReport , long oobReportingPeriodNanos ,
775+ long weightUpdatePeriodNanos , float errorUtilizationPenalty ,
776+ ImmutableList <String > metricNamesForComputingUtilization ) {
734777 this .blackoutPeriodNanos = blackoutPeriodNanos ;
735778 this .weightExpirationPeriodNanos = weightExpirationPeriodNanos ;
736779 this .enableOobLoadReport = enableOobLoadReport ;
737780 this .oobReportingPeriodNanos = oobReportingPeriodNanos ;
738781 this .weightUpdatePeriodNanos = weightUpdatePeriodNanos ;
739782 this .errorUtilizationPenalty = errorUtilizationPenalty ;
783+ this .metricNamesForComputingUtilization = metricNamesForComputingUtilization ;
740784 }
741785
742786 @ Override
@@ -751,27 +795,26 @@ public boolean equals(Object o) {
751795 && this .oobReportingPeriodNanos == that .oobReportingPeriodNanos
752796 && this .weightUpdatePeriodNanos == that .weightUpdatePeriodNanos
753797 // Float.compare considers NaNs equal
754- && Float .compare (this .errorUtilizationPenalty , that .errorUtilizationPenalty ) == 0 ;
798+ && Float .compare (this .errorUtilizationPenalty , that .errorUtilizationPenalty ) == 0
799+ && Objects .equals (this .metricNamesForComputingUtilization ,
800+ that .metricNamesForComputingUtilization );
755801 }
756802
757803 @ Override
758804 public int hashCode () {
759- return Objects .hash (
760- blackoutPeriodNanos ,
761- weightExpirationPeriodNanos ,
762- enableOobLoadReport ,
763- oobReportingPeriodNanos ,
764- weightUpdatePeriodNanos ,
765- errorUtilizationPenalty );
805+ return Objects .hash (blackoutPeriodNanos , weightExpirationPeriodNanos , enableOobLoadReport ,
806+ oobReportingPeriodNanos , weightUpdatePeriodNanos , errorUtilizationPenalty ,
807+ metricNamesForComputingUtilization );
766808 }
767809
768810 static final class Builder {
769811 long blackoutPeriodNanos = 10_000_000_000L ; // 10s
770- long weightExpirationPeriodNanos = 180_000_000_000L ; //3min
812+ long weightExpirationPeriodNanos = 180_000_000_000L ; // 3min
771813 boolean enableOobLoadReport = false ;
772814 long oobReportingPeriodNanos = 10_000_000_000L ; // 10s
773815 long weightUpdatePeriodNanos = 1_000_000_000L ; // 1s
774816 float errorUtilizationPenalty = 1.0F ;
817+ ImmutableList <String > metricNamesForComputingUtilization = ImmutableList .of ();
775818
776819 private Builder () {
777820
@@ -809,10 +852,17 @@ Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
809852 return this ;
810853 }
811854
855+ Builder setMetricNamesForComputingUtilization (
856+ List <String > metricNamesForComputingUtilization ) {
857+ this .metricNamesForComputingUtilization =
858+ ImmutableList .copyOf (metricNamesForComputingUtilization );
859+ return this ;
860+ }
861+
812862 WeightedRoundRobinLoadBalancerConfig build () {
813863 return new WeightedRoundRobinLoadBalancerConfig (blackoutPeriodNanos ,
814- weightExpirationPeriodNanos , enableOobLoadReport , oobReportingPeriodNanos ,
815- weightUpdatePeriodNanos , errorUtilizationPenalty );
864+ weightExpirationPeriodNanos , enableOobLoadReport , oobReportingPeriodNanos ,
865+ weightUpdatePeriodNanos , errorUtilizationPenalty , metricNamesForComputingUtilization );
816866 }
817867 }
818868 }
0 commit comments