-
Notifications
You must be signed in to change notification settings - Fork 4k
[xds] Implement A114: WRR support for custom backend metrics #12645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ | |
| import io.grpc.services.MetricReport; | ||
| import io.grpc.util.ForwardingSubchannel; | ||
| import io.grpc.util.MultiChildLoadBalancer; | ||
| import io.grpc.xds.internal.MetricReportUtils; | ||
| import io.grpc.xds.orca.OrcaOobUtil; | ||
| import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener; | ||
| import io.grpc.xds.orca.OrcaPerRequestUtil; | ||
|
|
@@ -49,6 +50,7 @@ | |
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.OptionalDouble; | ||
| import java.util.Random; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
|
|
@@ -87,6 +89,9 @@ | |
| * See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config | ||
| */ | ||
| final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer { | ||
| @VisibleForTesting | ||
| static boolean enableCustomConfig = | ||
| Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_WRR_CUSTOM_METRICS")); | ||
|
|
||
| private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER; | ||
| private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER; | ||
|
|
@@ -189,7 +194,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { | |
| this.backendService = ""; | ||
| } | ||
| config = | ||
| (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); | ||
| (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); | ||
|
|
||
| if (weightUpdateTimer != null && weightUpdateTimer.isPending()) { | ||
| weightUpdateTimer.cancel(); | ||
|
|
@@ -236,7 +241,8 @@ protected void updateOverallBalancingState() { | |
|
|
||
| private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) { | ||
| WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList), | ||
| config.enableOobLoadReport, config.errorUtilizationPenalty, sequence); | ||
| config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, | ||
| config.metricNamesForComputingUtilization); | ||
| updateWeight(picker); | ||
| return picker; | ||
| } | ||
|
|
@@ -325,12 +331,16 @@ public void addSubchannel(WrrSubchannel wrrSubchannel) { | |
| subchannels.add(wrrSubchannel); | ||
| } | ||
|
|
||
| public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) { | ||
| public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty, | ||
| ImmutableList<String> metricNamesForComputingUtilization) { | ||
| if (orcaReportListener != null | ||
| && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) { | ||
| && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty | ||
| && Objects.equals(orcaReportListener.metricNamesForComputingUtilization, | ||
|
||
| metricNamesForComputingUtilization)) { | ||
| return orcaReportListener; | ||
| } | ||
| orcaReportListener = new OrcaReportListener(errorUtilizationPenalty); | ||
| orcaReportListener = | ||
| new OrcaReportListener(errorUtilizationPenalty, metricNamesForComputingUtilization); | ||
| return orcaReportListener; | ||
| } | ||
|
|
||
|
|
@@ -355,18 +365,19 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne | |
|
|
||
| final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener { | ||
| private final float errorUtilizationPenalty; | ||
| private final ImmutableList<String> metricNamesForComputingUtilization; | ||
|
|
||
| OrcaReportListener(float errorUtilizationPenalty) { | ||
| OrcaReportListener(float errorUtilizationPenalty, | ||
| ImmutableList<String> metricNamesForComputingUtilization) { | ||
| this.errorUtilizationPenalty = errorUtilizationPenalty; | ||
| this.metricNamesForComputingUtilization = metricNamesForComputingUtilization; | ||
| } | ||
|
|
||
| @Override | ||
| public void onLoadReport(MetricReport report) { | ||
| double utilization = getUtilization(report, metricNamesForComputingUtilization); | ||
|
|
||
| double newWeight = 0; | ||
| // Prefer application utilization and fallback to CPU utilization if unset. | ||
| double utilization = | ||
| report.getApplicationUtilization() > 0 ? report.getApplicationUtilization() | ||
| : report.getCpuUtilization(); | ||
| if (utilization > 0 && report.getQps() > 0) { | ||
| double penalty = 0; | ||
| if (report.getEps() > 0 && errorUtilizationPenalty > 0) { | ||
|
|
@@ -383,6 +394,37 @@ public void onLoadReport(MetricReport report) { | |
| lastUpdated = ticker.nanoTime(); | ||
| weight = newWeight; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the utilization value computed from the specified metric names. If the application | ||
| * utilization is present and valid, it is returned. Otherwise, the maximum of the custom | ||
| * metrics specified is returned. If none of the custom metrics are present, the CPU | ||
| * utilization is returned. | ||
| */ | ||
| private double getUtilization(MetricReport report, ImmutableList<String> metricNames) { | ||
| double appUtil = report.getApplicationUtilization(); | ||
| if (appUtil > 0) { | ||
| return appUtil; | ||
| } | ||
| return getCustomMetricUtilization(report, metricNames) | ||
| .orElse(report.getCpuUtilization()); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the maximum utilization value among the specified metric names. | ||
| * Returns OptionalDouble.empty() if NONE of the specified metrics are present in the report, | ||
| * or if all present metrics are NaN. | ||
| * Returns OptionalDouble.of(maxUtil) if at least one non-NaN metric is present. | ||
| */ | ||
| private OptionalDouble getCustomMetricUtilization(MetricReport report, | ||
| ImmutableList<String> metricNames) { | ||
| return metricNames.stream() | ||
| .map(name -> MetricReportUtils.getMetric(report, name)) | ||
| .filter(OptionalDouble::isPresent) | ||
| .mapToDouble(OptionalDouble::getAsDouble) | ||
| .filter(d -> !Double.isNaN(d) && d > 0) | ||
| .max(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -403,10 +445,10 @@ private void createAndApplyOrcaListeners() { | |
| for (WrrSubchannel weightedSubchannel : wChild.subchannels) { | ||
| if (config.enableOobLoadReport) { | ||
| OrcaOobUtil.setListener(weightedSubchannel, | ||
| wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty), | ||
| wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty, | ||
| config.metricNamesForComputingUtilization), | ||
| OrcaOobUtil.OrcaReportingConfig.newBuilder() | ||
| .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS) | ||
| .build()); | ||
| .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS).build()); | ||
| } else { | ||
| OrcaOobUtil.setListener(weightedSubchannel, null, null); | ||
| } | ||
|
|
@@ -473,7 +515,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker { | |
| private volatile StaticStrideScheduler scheduler; | ||
|
|
||
| WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport, | ||
| float errorUtilizationPenalty, AtomicInteger sequence) { | ||
| float errorUtilizationPenalty, AtomicInteger sequence, | ||
| ImmutableList<String> metricNamesForComputingUtilization) { | ||
| checkNotNull(children, "children"); | ||
| Preconditions.checkArgument(!children.isEmpty(), "empty child list"); | ||
| this.children = children; | ||
|
|
@@ -482,7 +525,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker { | |
| for (ChildLbState child : children) { | ||
| WeightedChildLbState wChild = (WeightedChildLbState) child; | ||
| pickers.add(wChild.getCurrentPicker()); | ||
| reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty)); | ||
| reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty, | ||
| metricNamesForComputingUtilization)); | ||
| } | ||
| this.pickers = pickers; | ||
| this.reportListeners = reportListeners; | ||
|
|
@@ -565,11 +609,11 @@ public boolean equals(Object o) { | |
| * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler | ||
| * in which each object's deadline is the multiplicative inverse of the object's weight. | ||
| * <p> | ||
| * The way in which this is implemented is through a static stride scheduler. | ||
| * The way in which this is implemented is through a static stride scheduler. | ||
| * The Static Stride Scheduler works by iterating through the list of subchannel weights | ||
| * and using modular arithmetic to proportionally distribute picks, favoring entries | ||
| * with higher weights. It is based on the observation that the intended sequence generated | ||
| * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. | ||
| * and using modular arithmetic to proportionally distribute picks, favoring entries | ||
| * with higher weights. It is based on the observation that the intended sequence generated | ||
| * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. | ||
| * The Static Stride Scheduler is more performant than other implementations of the EDF | ||
| * Scheduler, as it removes the need for a priority queue (and thus mutex locks). | ||
| * <p> | ||
|
|
@@ -720,23 +764,23 @@ static final class WeightedRoundRobinLoadBalancerConfig { | |
| final long oobReportingPeriodNanos; | ||
| final long weightUpdatePeriodNanos; | ||
| final float errorUtilizationPenalty; | ||
| final ImmutableList<String> metricNamesForComputingUtilization; | ||
|
|
||
| public static Builder newBuilder() { | ||
| return new Builder(); | ||
| } | ||
|
|
||
| private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos, | ||
| long weightExpirationPeriodNanos, | ||
| boolean enableOobLoadReport, | ||
| long oobReportingPeriodNanos, | ||
| long weightUpdatePeriodNanos, | ||
| float errorUtilizationPenalty) { | ||
| long weightExpirationPeriodNanos, boolean enableOobLoadReport, long oobReportingPeriodNanos, | ||
| long weightUpdatePeriodNanos, float errorUtilizationPenalty, | ||
| ImmutableList<String> metricNamesForComputingUtilization) { | ||
| this.blackoutPeriodNanos = blackoutPeriodNanos; | ||
| this.weightExpirationPeriodNanos = weightExpirationPeriodNanos; | ||
| this.enableOobLoadReport = enableOobLoadReport; | ||
| this.oobReportingPeriodNanos = oobReportingPeriodNanos; | ||
| this.weightUpdatePeriodNanos = weightUpdatePeriodNanos; | ||
| this.errorUtilizationPenalty = errorUtilizationPenalty; | ||
| this.metricNamesForComputingUtilization = metricNamesForComputingUtilization; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -751,27 +795,26 @@ public boolean equals(Object o) { | |
| && this.oobReportingPeriodNanos == that.oobReportingPeriodNanos | ||
| && this.weightUpdatePeriodNanos == that.weightUpdatePeriodNanos | ||
| // Float.compare considers NaNs equal | ||
| && Float.compare(this.errorUtilizationPenalty, that.errorUtilizationPenalty) == 0; | ||
| && Float.compare(this.errorUtilizationPenalty, that.errorUtilizationPenalty) == 0 | ||
| && Objects.equals(this.metricNamesForComputingUtilization, | ||
| that.metricNamesForComputingUtilization); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash( | ||
| blackoutPeriodNanos, | ||
| weightExpirationPeriodNanos, | ||
| enableOobLoadReport, | ||
| oobReportingPeriodNanos, | ||
| weightUpdatePeriodNanos, | ||
| errorUtilizationPenalty); | ||
| return Objects.hash(blackoutPeriodNanos, weightExpirationPeriodNanos, enableOobLoadReport, | ||
| oobReportingPeriodNanos, weightUpdatePeriodNanos, errorUtilizationPenalty, | ||
| metricNamesForComputingUtilization); | ||
| } | ||
|
|
||
| static final class Builder { | ||
| long blackoutPeriodNanos = 10_000_000_000L; // 10s | ||
| long weightExpirationPeriodNanos = 180_000_000_000L; //3min | ||
| long weightExpirationPeriodNanos = 180_000_000_000L; // 3min | ||
| boolean enableOobLoadReport = false; | ||
| long oobReportingPeriodNanos = 10_000_000_000L; // 10s | ||
| long weightUpdatePeriodNanos = 1_000_000_000L; // 1s | ||
| float errorUtilizationPenalty = 1.0F; | ||
| ImmutableList<String> metricNamesForComputingUtilization = ImmutableList.of(); | ||
|
|
||
| private Builder() { | ||
|
|
||
|
|
@@ -809,10 +852,17 @@ Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) { | |
| return this; | ||
| } | ||
|
|
||
| Builder setMetricNamesForComputingUtilization( | ||
| List<String> metricNamesForComputingUtilization) { | ||
| this.metricNamesForComputingUtilization = | ||
| ImmutableList.copyOf(metricNamesForComputingUtilization); | ||
| return this; | ||
| } | ||
|
|
||
| WeightedRoundRobinLoadBalancerConfig build() { | ||
| return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos, | ||
| weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos, | ||
| weightUpdatePeriodNanos, errorUtilizationPenalty); | ||
| weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos, | ||
| weightUpdatePeriodNanos, errorUtilizationPenalty, metricNamesForComputingUtilization); | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.