diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 87963476265..688626d9b3d 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -122,7 +122,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { result.maxConcurrentRequests(), result.upstreamTlsContext(), result.filterMetadata(), - result.outlierDetection()); + result.outlierDetection(), + result.backendMetricPropagation()); } else { instance = DiscoveryMechanism.forLogicalDns( clusterName, @@ -130,7 +131,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { result.lrsServerInfo(), result.maxConcurrentRequests(), result.upstreamTlsContext(), - result.filterMetadata()); + result.filterMetadata(), + result.backendMetricPropagation()); } gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME), diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 034cdee0815..7bc7a3e3c1f 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -46,6 +46,7 @@ import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; +import io.grpc.xds.client.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats; @@ -148,6 +149,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests); childLbHelper.updateSslContextProviderSupplier(config.tlsContext); childLbHelper.updateFilterMetadata(config.filterMetadata); + childLbHelper.updateBackendMetricPropagation(config.backendMetricPropagation); childSwitchLb.handleResolvedAddresses( resolvedAddresses.toBuilder() @@ -208,6 +210,8 @@ private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper { private Map filterMetadata = ImmutableMap.of(); @Nullable private final ServerInfo lrsServerInfo; + @Nullable + private BackendMetricPropagation backendMetricPropagation; private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) { this.inFlights = checkNotNull(inFlights, "inFlights"); @@ -320,7 +324,7 @@ private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAt (lrsServerInfo == null) ? null : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, - edsServiceName, locality); + edsServiceName, locality, backendMetricPropagation); return new ClusterLocality(localityStats, localityName); } @@ -370,6 +374,11 @@ private void updateFilterMetadata(Map filterMetadata) { this.filterMetadata = ImmutableMap.copyOf(filterMetadata); } + private void updateBackendMetricPropagation( + @Nullable BackendMetricPropagation backendMetricPropagation) { + this.backendMetricPropagation = backendMetricPropagation; + } + private class RequestLimitingSubchannelPicker extends SubchannelPicker { private final SubchannelPicker delegate; private final List dropPolicies; @@ -505,11 +514,18 @@ private OrcaPerRpcListener(ClusterLocalityStats stats) { } /** - * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is - * included in the snapshot for the LRS report sent to the LRS server. + * Copies ORCA metrics from {@link MetricReport} to {@link ClusterLocalityStats} + * such that they are included in the snapshot for the LRS report sent to the LRS server. + * This includes both top-level metrics (CPU, memory, application utilization) and named + * metrics, filtered according to the backend metric propagation configuration. */ @Override public void onLoadReport(MetricReport report) { + stats.recordTopLevelMetrics( + report.getCpuUtilization(), + report.getMemoryUtilization(), + report.getApplicationUtilization()); + stats.recordBackendLoadMetricStats(report.getNamedMetrics()); } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java index 4c9c14ba5f5..f369c3b99b4 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java @@ -31,6 +31,7 @@ import io.grpc.Status; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; +import io.grpc.xds.client.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.ServerInfo; import java.util.ArrayList; import java.util.Collections; @@ -98,11 +99,14 @@ static final class ClusterImplConfig { // Provides the direct child policy and its config. final Object childConfig; final Map filterMetadata; + @Nullable + final BackendMetricPropagation backendMetricPropagation; ClusterImplConfig(String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, List dropCategories, Object childConfig, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { + @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation) { this.cluster = checkNotNull(cluster, "cluster"); this.edsServiceName = edsServiceName; this.lrsServerInfo = lrsServerInfo; @@ -112,6 +116,7 @@ static final class ClusterImplConfig { this.dropCategories = Collections.unmodifiableList( new ArrayList<>(checkNotNull(dropCategories, "dropCategories"))); this.childConfig = checkNotNull(childConfig, "childConfig"); + this.backendMetricPropagation = backendMetricPropagation; } @Override diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index e333c46750c..43904e7df7e 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -55,6 +55,7 @@ import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; +import io.grpc.xds.client.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsClient; @@ -190,11 +191,11 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (instance.type == DiscoveryMechanism.Type.EDS) { clusterState = new EdsClusterState(instance.cluster, instance.edsServiceName, instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata, instance.outlierDetection); + instance.filterMetadata, instance.outlierDetection, instance.backendMetricPropagation); } else { // logical DNS clusterState = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName, instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata); + instance.filterMetadata, instance.backendMetricPropagation); } clusterState.start(); return Status.OK; @@ -314,6 +315,8 @@ private abstract class ClusterState { protected final Map filterMetadata; @Nullable protected final OutlierDetection outlierDetection; + @Nullable + protected final BackendMetricPropagation backendMetricPropagation; // Resolution status, may contain most recent error encountered. protected Status status = Status.OK; // True if has received resolution result. @@ -326,13 +329,15 @@ private abstract class ClusterState { private ClusterState(String name, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, @Nullable OutlierDetection outlierDetection) { + Map filterMetadata, @Nullable OutlierDetection outlierDetection, + @Nullable BackendMetricPropagation backendMetricPropagation) { this.name = name; this.lrsServerInfo = lrsServerInfo; this.maxConcurrentRequests = maxConcurrentRequests; this.tlsContext = tlsContext; this.filterMetadata = ImmutableMap.copyOf(filterMetadata); this.outlierDetection = outlierDetection; + this.backendMetricPropagation = backendMetricPropagation; } abstract void start(); @@ -351,9 +356,10 @@ private final class EdsClusterState extends ClusterState implements ResourceWatc private EdsClusterState(String name, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - @Nullable OutlierDetection outlierDetection) { + @Nullable OutlierDetection outlierDetection, + @Nullable BackendMetricPropagation backendMetricPropagation) { super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - outlierDetection); + outlierDetection, backendMetricPropagation); this.edsServiceName = edsServiceName; } @@ -450,8 +456,8 @@ public void run() { Map priorityChildConfigs = generateEdsBasedPriorityChildConfigs( name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext, - filterMetadata, outlierDetection, endpointLbConfig, lbRegistry, - prioritizedLocalityWeights, dropOverloads); + filterMetadata, backendMetricPropagation, outlierDetection, + endpointLbConfig, lbRegistry, prioritizedLocalityWeights, dropOverloads); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityChildConfigs, @@ -565,8 +571,10 @@ private final class LogicalDnsClusterState extends ClusterState { private LogicalDnsClusterState(String name, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); + @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation) { + super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, + filterMetadata, null, backendMetricPropagation); this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName"); nameResolverFactory = checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory"); @@ -668,7 +676,7 @@ public Status onResult2(final ResolutionResult resolutionResult) { } PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - lbRegistry, Collections.emptyList()); + backendMetricPropagation, lbRegistry, Collections.emptyList()); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); @@ -752,13 +760,14 @@ private static class ClusterResolutionResult { private static PriorityChildConfig generateDnsBasedPriorityChildConfig( String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation, LoadBalancerRegistry lbRegistry, List dropOverloads) { // Override endpoint-level LB policy with pick_first for logical DNS cluster. Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( lbRegistry.getProvider("pick_first"), null); ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); + dropOverloads, endpointLbConfig, tlsContext, filterMetadata, backendMetricPropagation); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( @@ -776,6 +785,7 @@ private static Map generateEdsBasedPriorityChildCon String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation, @Nullable OutlierDetection outlierDetection, Object endpointLbConfig, LoadBalancerRegistry lbRegistry, Map> prioritizedLocalityWeights, List dropOverloads) { @@ -783,7 +793,8 @@ private static Map generateEdsBasedPriorityChildCon for (String priority : prioritizedLocalityWeights.keySet()) { ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); + dropOverloads, endpointLbConfig, tlsContext, + filterMetadata, backendMetricPropagation); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java index 48101cd9c54..9d783fe37fd 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java @@ -29,6 +29,7 @@ import io.grpc.Status; import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; +import io.grpc.xds.client.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.ServerInfo; import java.util.Map; import java.util.Objects; @@ -138,6 +139,8 @@ static final class DiscoveryMechanism { @Nullable final OutlierDetection outlierDetection; final Map filterMetadata; + @Nullable + final BackendMetricPropagation backendMetricPropagation; enum Type { EDS, @@ -147,7 +150,8 @@ enum Type { private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName, @Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, @Nullable OutlierDetection outlierDetection) { + Map filterMetadata, @Nullable OutlierDetection outlierDetection, + @Nullable BackendMetricPropagation backendMetricPropagation) { this.cluster = checkNotNull(cluster, "cluster"); this.type = checkNotNull(type, "type"); this.edsServiceName = edsServiceName; @@ -157,27 +161,33 @@ private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServic this.tlsContext = tlsContext; this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata")); this.outlierDetection = outlierDetection; + this.backendMetricPropagation = backendMetricPropagation; } static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - OutlierDetection outlierDetection) { - return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo, - maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection); + OutlierDetection outlierDetection, + @Nullable BackendMetricPropagation backendMetricPropagation) { + return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, + null, lrsServerInfo, maxConcurrentRequests, tlsContext, + filterMetadata, outlierDetection, backendMetricPropagation); } static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { + @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, + @Nullable BackendMetricPropagation backendMetricPropagation) { return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName, - lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); + lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null, + backendMetricPropagation); } @Override public int hashCode() { return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext, - edsServiceName, dnsHostName, filterMetadata, outlierDetection); + edsServiceName, dnsHostName, filterMetadata, + outlierDetection, backendMetricPropagation); } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java index a5220515b6c..f6d5817477b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java @@ -47,6 +47,7 @@ import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.XdsClusterResource.CdsUpdate; +import io.grpc.xds.client.BackendMetricPropagation; import io.grpc.xds.client.XdsClient.ResourceUpdate; import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.internal.security.CommonTlsContextUtil; @@ -67,6 +68,9 @@ class XdsClusterResource extends XdsResourceType { GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS", false); static boolean isEnabledXdsHttpConnect = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false); + @VisibleForTesting + static boolean isEnabledOrcaLrsPropagation = + GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION", false); @VisibleForTesting static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; @@ -227,6 +231,12 @@ private static StructOrError parseNonAggregateCluster( UpstreamTlsContext upstreamTlsContext = null; OutlierDetection outlierDetection = null; boolean isHttp11ProxyAvailable = false; + BackendMetricPropagation backendMetricPropagation = null; + + if (isEnabledOrcaLrsPropagation) { + backendMetricPropagation = BackendMetricPropagation.fromMetricSpecs( + cluster.getLrsReportEndpointMetricsList()); + } if (cluster.hasLrsServer()) { if (!cluster.getLrsServer().hasSelf()) { return StructOrError.fromError( @@ -326,7 +336,7 @@ private static StructOrError parseNonAggregateCluster( return StructOrError.fromStruct(CdsUpdate.forEds( clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext, - outlierDetection, isHttp11ProxyAvailable)); + outlierDetection, isHttp11ProxyAvailable, backendMetricPropagation)); } else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) { if (!cluster.hasLoadAssignment()) { return StructOrError.fromError( @@ -362,7 +372,7 @@ private static StructOrError parseNonAggregateCluster( Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue()); return StructOrError.fromStruct(CdsUpdate.forLogicalDns( clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests, - upstreamTlsContext, isHttp11ProxyAvailable)); + upstreamTlsContext, isHttp11ProxyAvailable, backendMetricPropagation)); } return StructOrError.fromError( "Cluster " + clusterName + ": unsupported built-in discovery type: " + type); @@ -614,6 +624,9 @@ abstract static class CdsUpdate implements ResourceUpdate { abstract ImmutableMap parsedMetadata(); + @Nullable + abstract BackendMetricPropagation backendMetricPropagation(); + private static Builder newBuilder(String clusterName) { return new AutoValue_XdsClusterResource_CdsUpdate.Builder() .clusterName(clusterName) @@ -622,7 +635,8 @@ private static Builder newBuilder(String clusterName) { .choiceCount(0) .filterMetadata(ImmutableMap.of()) .parsedMetadata(ImmutableMap.of()) - .isHttp11ProxyAvailable(false); + .isHttp11ProxyAvailable(false) + .backendMetricPropagation(null); } static Builder forAggregate(String clusterName, List prioritizedClusterNames) { @@ -636,7 +650,8 @@ static Builder forEds(String clusterName, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, @Nullable OutlierDetection outlierDetection, - boolean isHttp11ProxyAvailable) { + boolean isHttp11ProxyAvailable, + BackendMetricPropagation backendMetricPropagation) { return newBuilder(clusterName) .clusterType(ClusterType.EDS) .edsServiceName(edsServiceName) @@ -644,21 +659,24 @@ static Builder forEds(String clusterName, @Nullable String edsServiceName, .maxConcurrentRequests(maxConcurrentRequests) .upstreamTlsContext(upstreamTlsContext) .outlierDetection(outlierDetection) - .isHttp11ProxyAvailable(isHttp11ProxyAvailable); + .isHttp11ProxyAvailable(isHttp11ProxyAvailable) + .backendMetricPropagation(backendMetricPropagation); } static Builder forLogicalDns(String clusterName, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, - boolean isHttp11ProxyAvailable) { + boolean isHttp11ProxyAvailable, + BackendMetricPropagation backendMetricPropagation) { return newBuilder(clusterName) .clusterType(ClusterType.LOGICAL_DNS) .dnsHostName(dnsHostName) .lrsServerInfo(lrsServerInfo) .maxConcurrentRequests(maxConcurrentRequests) .upstreamTlsContext(upstreamTlsContext) - .isHttp11ProxyAvailable(isHttp11ProxyAvailable); + .isHttp11ProxyAvailable(isHttp11ProxyAvailable) + .backendMetricPropagation(backendMetricPropagation); } enum ClusterType { @@ -749,6 +767,9 @@ Builder leastRequestLbPolicy(Integer choiceCount) { protected abstract Builder parsedMetadata(ImmutableMap parsedMetadata); + protected abstract Builder backendMetricPropagation( + BackendMetricPropagation backendMetricPropagation); + abstract CdsUpdate build(); } } diff --git a/xds/src/main/java/io/grpc/xds/client/BackendMetricPropagation.java b/xds/src/main/java/io/grpc/xds/client/BackendMetricPropagation.java new file mode 100644 index 00000000000..7820c66c78d --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/client/BackendMetricPropagation.java @@ -0,0 +1,110 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.client; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableSet; +import io.grpc.Internal; +import javax.annotation.Nullable; + +/** + * Represents the configuration for which ORCA metrics should be propagated from backend + * to LRS load reports, as defined in gRFC A85. + */ +@Internal +public final class BackendMetricPropagation { + + public final boolean propagateCpuUtilization; + public final boolean propagateMemUtilization; + public final boolean propagateApplicationUtilization; + + private final boolean propagateAllNamedMetrics; + private final ImmutableSet namedMetricKeys; + + private BackendMetricPropagation( + boolean propagateCpuUtilization, + boolean propagateMemUtilization, + boolean propagateApplicationUtilization, + boolean propagateAllNamedMetrics, + ImmutableSet namedMetricKeys) { + this.propagateCpuUtilization = propagateCpuUtilization; + this.propagateMemUtilization = propagateMemUtilization; + this.propagateApplicationUtilization = propagateApplicationUtilization; + this.propagateAllNamedMetrics = propagateAllNamedMetrics; + this.namedMetricKeys = checkNotNull(namedMetricKeys, "namedMetricKeys"); + } + + /** + * Creates a BackendMetricPropagation from a list of metric specifications. + * + * @param metricSpecs list of metric specification strings from CDS resource + * @return BackendMetricPropagation instance + */ + public static BackendMetricPropagation fromMetricSpecs( + @Nullable java.util.List metricSpecs) { + if (metricSpecs == null || metricSpecs.isEmpty()) { + return new BackendMetricPropagation(false, false, false, false, ImmutableSet.of()); + } + + boolean propagateCpuUtilization = false; + boolean propagateMemUtilization = false; + boolean propagateApplicationUtilization = false; + boolean propagateAllNamedMetrics = false; + ImmutableSet.Builder namedMetricKeysBuilder = ImmutableSet.builder(); + for (String spec : metricSpecs) { + if (spec == null) { + continue; + } + switch (spec) { + case "cpu_utilization": + propagateCpuUtilization = true; + break; + case "mem_utilization": + propagateMemUtilization = true; + break; + case "application_utilization": + propagateApplicationUtilization = true; + break; + case "named_metrics.*": + propagateAllNamedMetrics = true; + break; + default: + if (spec.startsWith("named_metrics.")) { + String metricKey = spec.substring("named_metrics.".length()); + if (!metricKey.isEmpty()) { + namedMetricKeysBuilder.add(metricKey); + } + } + } + } + + return new BackendMetricPropagation( + propagateCpuUtilization, + propagateMemUtilization, + propagateApplicationUtilization, + propagateAllNamedMetrics, + namedMetricKeysBuilder.build()); + } + + /** + * Returns whether the given named metric key should be propagated. + */ + public boolean shouldPropagateNamedMetric(String metricKey) { + return propagateAllNamedMetrics || namedMetricKeys.contains(metricKey); + } +} \ No newline at end of file diff --git a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java index be9d3587d14..4e71f56180d 100644 --- a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import io.grpc.Internal; import io.grpc.Status; +import io.grpc.internal.GrpcUtil; import io.grpc.xds.client.Stats.BackendLoadMetricStats; import io.grpc.xds.client.Stats.ClusterStats; import io.grpc.xds.client.Stats.DroppedRequests; @@ -57,6 +58,8 @@ public final class LoadStatsManager2 { private final Map>>> allLoadStats = new HashMap<>(); private final Supplier stopwatchSupplier; + static boolean isEnabledOrcaLrsPropagation = + GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION", false); @VisibleForTesting public LoadStatsManager2(Supplier stopwatchSupplier) { @@ -98,13 +101,20 @@ private synchronized void releaseClusterDropCounter( /** * Gets or creates the stats object for recording loads for the specified locality (in the - * specified cluster with edsServiceName). The returned object is reference counted and the - * caller should use {@link ClusterLocalityStats#release} to release its hard reference + * specified cluster with edsServiceName) with the specified backend metric propagation + * configuration. The returned object is reference counted and the caller should + * use {@link ClusterLocalityStats#release} to release its hard reference * when it is safe to discard the future stats for the locality. */ @VisibleForTesting public synchronized ClusterLocalityStats getClusterLocalityStats( String cluster, @Nullable String edsServiceName, Locality locality) { + return getClusterLocalityStats(cluster, edsServiceName, locality, null); + } + + public synchronized ClusterLocalityStats getClusterLocalityStats( + String cluster, @Nullable String edsServiceName, Locality locality, + @Nullable BackendMetricPropagation backendMetricPropagation) { if (!allLoadStats.containsKey(cluster)) { allLoadStats.put( cluster, @@ -121,8 +131,8 @@ public synchronized ClusterLocalityStats getClusterLocalityStats( if (!localityStats.containsKey(locality)) { localityStats.put( locality, - ReferenceCounted.wrap(new ClusterLocalityStats( - cluster, edsServiceName, locality, stopwatchSupplier.get()))); + ReferenceCounted.wrap(new ClusterLocalityStats(cluster, edsServiceName, + locality, stopwatchSupplier.get(), backendMetricPropagation))); } ReferenceCounted ref = localityStats.get(locality); ref.retain(); @@ -325,6 +335,8 @@ public final class ClusterLocalityStats { private final String edsServiceName; private final Locality locality; private final Stopwatch stopwatch; + @Nullable + private final BackendMetricPropagation backendMetricPropagation; private final AtomicLong callsInProgress = new AtomicLong(); private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); @@ -333,11 +345,12 @@ public final class ClusterLocalityStats { private ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, - Stopwatch stopwatch) { + Stopwatch stopwatch, BackendMetricPropagation backendMetricPropagation) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.edsServiceName = edsServiceName; this.locality = checkNotNull(locality, "locality"); this.stopwatch = checkNotNull(stopwatch, "stopwatch"); + this.backendMetricPropagation = backendMetricPropagation; stopwatch.reset().start(); } @@ -367,17 +380,55 @@ public void recordCallFinished(Status status) { * requests counter of 1 and the {@code value} if the key is not present in the map. Otherwise, * increments the finished requests counter and adds the {@code value} to the existing * {@link BackendLoadMetricStats}. + * Metrics are filtered based on the backend metric propagation configuration if configured. */ public synchronized void recordBackendLoadMetricStats(Map namedMetrics) { + if (!isEnabledOrcaLrsPropagation) { + namedMetrics.forEach((name, value) -> updateLoadMetricStats(name, value)); + return; + } + namedMetrics.forEach((name, value) -> { - if (!loadMetricStatsMap.containsKey(name)) { - loadMetricStatsMap.put(name, new BackendLoadMetricStats(1, value)); - } else { - loadMetricStatsMap.get(name).addMetricValueAndIncrementRequestsFinished(value); + if (backendMetricPropagation.shouldPropagateNamedMetric(name)) { + updateLoadMetricStats("named_metrics." + name, value); } }); } + private void updateLoadMetricStats(String metricName, double value) { + if (!loadMetricStatsMap.containsKey(metricName)) { + loadMetricStatsMap.put(metricName, new BackendLoadMetricStats(1, value)); + } else { + loadMetricStatsMap.get(metricName).addMetricValueAndIncrementRequestsFinished(value); + } + } + + /** + * Records top-level ORCA metrics (CPU, memory, application utilization) for per-call load + * reporting. Metrics are filtered based on the backend metric propagation configuration + * if configured. + * + * @param cpuUtilization CPU utilization metric value + * @param memUtilization Memory utilization metric value + * @param applicationUtilization Application utilization metric value + */ + public synchronized void recordTopLevelMetrics(double cpuUtilization, double memUtilization, + double applicationUtilization) { + if (!isEnabledOrcaLrsPropagation) { + return; + } + + if (backendMetricPropagation.propagateCpuUtilization && cpuUtilization > 0) { + updateLoadMetricStats("cpu_utilization", cpuUtilization); + } + if (backendMetricPropagation.propagateMemUtilization && memUtilization > 0) { + updateLoadMetricStats("mem_utilization", memUtilization); + } + if (backendMetricPropagation.propagateApplicationUtilization && applicationUtilization > 0) { + updateLoadMetricStats("application_utilization", applicationUtilization); + } + } + /** * Release the hard reference for this stats object (previously obtained via {@link * LoadStatsManager2#getClusterLocalityStats}). The object may still be diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index e2dd4169bca..cd545c00c1d 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -388,6 +388,23 @@ public LoadStatsManager2.ClusterDropStats addClusterDropStats( public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats( Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, Locality locality) { + return addClusterLocalityStats(serverInfo, clusterName, edsServiceName, locality, null); + } + + /** + * Adds load stats for the specified locality (in the specified cluster with edsServiceName) by + * using the returned object to record RPCs. Load stats recorded with the returned object will + * be reported to the load reporting server. The returned object is reference counted and the + * caller should use {@link LoadStatsManager2.ClusterLocalityStats#release} to release its + * hard reference when it is safe to stop reporting RPC loads for the specified locality + * in the future. + * + * @param backendMetricPropagation Configuration for which backend metrics should be propagated + * to LRS load reports. If null, all metrics will be propagated (legacy behavior). + */ + public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats( + Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, + Locality locality, @Nullable BackendMetricPropagation backendMetricPropagation) { throw new UnsupportedOperationException(); } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index d3384cbbe4e..1b24356c20a 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -409,15 +409,29 @@ public void run() { public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats( final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, Locality locality) { + return addClusterLocalityStats(serverInfo, clusterName, edsServiceName, locality, null); + } + + @Override + public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats( + final ServerInfo serverInfo, + String clusterName, + @Nullable String edsServiceName, + Locality locality, + @Nullable BackendMetricPropagation backendMetricPropagation) { LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo); + LoadStatsManager2.ClusterLocalityStats loadCounter = - loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality); + loadStatsManager.getClusterLocalityStats( + clusterName, edsServiceName, locality, backendMetricPropagation); + syncContext.execute(new Runnable() { @Override public void run() { serverLrsClientMap.get(serverInfo).startLoadReporting(); } }); + return loadCounter; } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 2059022c203..790f4445823 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -257,7 +257,7 @@ public void discoverTopLevelEdsCluster() { CLUSTER, EDS_SERVICE_NAME, lrsServerInfo, 100L, upstreamTlsContext, Collections.emptyMap(), io.grpc.xds.EnvoyServerProtoData.OutlierDetection.create( null, null, null, null, SuccessRateEjection.create(null, null, null, null), - FailurePercentageEjection.create(null, null, null, null)))); + FailurePercentageEjection.create(null, null, null, null)), null)); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) .isEqualTo("wrr_locality_experimental"); @@ -304,7 +304,7 @@ public void discoverTopLevelLogicalDnsCluster() { assertThat(childLbConfig.discoveryMechanism).isEqualTo( DiscoveryMechanism.forLogicalDns( CLUSTER, "dns.example.com:1111", lrsServerInfo, 100L, upstreamTlsContext, - Collections.emptyMap())); + Collections.emptyMap(), null)); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) .isEqualTo("wrr_locality_experimental"); @@ -338,7 +338,7 @@ public void nonAggregateCluster_resourceUpdate() { ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; assertThat(childLbConfig.discoveryMechanism).isEqualTo( DiscoveryMechanism.forEds( - CLUSTER, EDS_SERVICE_NAME, null, 100L, null, Collections.emptyMap(), null)); + CLUSTER, EDS_SERVICE_NAME, null, 100L, null, Collections.emptyMap(), null, null)); cluster = EDS_CLUSTER.toBuilder() .setCircuitBreakers(CircuitBreakers.newBuilder() @@ -353,7 +353,7 @@ public void nonAggregateCluster_resourceUpdate() { childLbConfig = (ClusterResolverConfig) childBalancer.config; assertThat(childLbConfig.discoveryMechanism).isEqualTo( DiscoveryMechanism.forEds( - CLUSTER, EDS_SERVICE_NAME, null, 200L, null, Collections.emptyMap(), null)); + CLUSTER, EDS_SERVICE_NAME, null, 200L, null, Collections.emptyMap(), null, null)); } @Test @@ -367,7 +367,7 @@ public void nonAggregateCluster_resourceRevoked() { ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; assertThat(childLbConfig.discoveryMechanism).isEqualTo( DiscoveryMechanism.forEds( - CLUSTER, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null)); + CLUSTER, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null, null)); controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); @@ -398,7 +398,7 @@ public void dynamicCluster() { ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; assertThat(childLbConfig.discoveryMechanism).isEqualTo( DiscoveryMechanism.forEds( - clusterName, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null)); + clusterName, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null, null)); assertThat(this.lastXdsConfig.getClusters()).containsKey(clusterName); shutdownLoadBalancer(); diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 7df0630b779..26c1eeaf23c 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -68,6 +68,7 @@ import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; +import io.grpc.xds.client.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.LoadReportClient; import io.grpc.xds.client.LoadStatsManager2; @@ -191,7 +192,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -219,7 +220,7 @@ public void handleResolvedAddresses_childPolicyChanges() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWeightedTarget); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -234,7 +235,7 @@ public void handleResolvedAddresses_childPolicyChanges() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( wrrLocalityProvider, wrrLocalityConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWrrLocality); childBalancer = Iterables.getOnlyElement(downstreamBalancers); assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME); @@ -260,7 +261,7 @@ public void nameResolutionError_afterChildPolicyInstantiated_propagateToDownstre null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -281,7 +282,7 @@ public void pick_addsOptionalLabels() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -312,7 +313,7 @@ public void pick_noResult_addsClusterLabel() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -336,7 +337,7 @@ public void recordLoadStats() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -431,7 +432,7 @@ public void pickFirstLoadReport_onUpdateAddress() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(pickFirstProvider, pickFirstConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality1); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality2); deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config); @@ -521,7 +522,7 @@ public void dropRpcsWithRespectToLbConfigDropCategories() { null, Collections.singletonList(DropOverload.create("throttle", 500_000)), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 999_999, 1_000_000); @@ -555,7 +556,7 @@ public void dropRpcsWithRespectToLbConfigDropCategories() { Collections.singletonList(DropOverload.create("lb", 1_000_000)), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(Collections.singletonList(endpoint)) @@ -604,7 +605,7 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu maxConcurrentRequests, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); assertThat(downstreamBalancers).hasSize(1); // one leaf balancer @@ -651,7 +652,7 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu maxConcurrentRequests, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); result = currentPicker.pickSubchannel(pickSubchannelArgs); @@ -699,7 +700,7 @@ private void subtest_maxConcurrentRequests_appliedWithDefaultValue( null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); assertThat(downstreamBalancers).hasSize(1); // one leaf balancer @@ -750,7 +751,7 @@ public void endpointAddressesAttachedWithClusterName() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); // One locality with two endpoints. EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality); @@ -790,7 +791,7 @@ public void endpointAddressesAttachedWithClusterName() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality, "authority-host-name"); deliverAddressesAndConfig(Arrays.asList(endpoint1), config); @@ -841,7 +842,7 @@ public void endpointAddressesAttachedWithClusterName() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality, "authority-host-name"); deliverAddressesAndConfig(Arrays.asList(endpoint1), config); @@ -889,7 +890,7 @@ public void endpointAddressesAttachedWithTlsConfig_securityEnabledByDefault() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - upstreamTlsContext, Collections.emptyMap()); + upstreamTlsContext, Collections.emptyMap(), null); // One locality with two endpoints. EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality); @@ -914,7 +915,7 @@ public void endpointAddressesAttachedWithTlsConfig_securityEnabledByDefault() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - null, Collections.emptyMap()); + null, Collections.emptyMap(), null); deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config); assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer); subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections @@ -931,7 +932,7 @@ public void endpointAddressesAttachedWithTlsConfig_securityEnabledByDefault() { null, Collections.emptyList(), GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( weightedTargetProvider, weightedTargetConfig), - upstreamTlsContext, Collections.emptyMap()); + upstreamTlsContext, Collections.emptyMap(), null); deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config); assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer); subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections @@ -1241,8 +1242,9 @@ public ClusterDropStats addClusterDropStats( @Override public ClusterLocalityStats addClusterLocalityStats( ServerInfo lrsServerInfo, String clusterName, @Nullable String edsServiceName, - Locality locality) { - return loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality); + Locality locality, BackendMetricPropagation backendMetricPropagation) { + return loadStatsManager.getClusterLocalityStats( + clusterName, edsServiceName, locality, backendMetricPropagation); } @Override diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 982677d24da..07235704fcf 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -140,16 +140,16 @@ public class ClusterResolverLoadBalancerTest { FailurePercentageEjection.create(100, 100, 100, 100)); private final DiscoveryMechanism edsDiscoveryMechanism1 = DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - Collections.emptyMap(), null); + Collections.emptyMap(), null, null); private final DiscoveryMechanism edsDiscoveryMechanism2 = DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, tlsContext, - Collections.emptyMap(), null); + Collections.emptyMap(), null, null); private final DiscoveryMechanism edsDiscoveryMechanismWithOutlierDetection = DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - Collections.emptyMap(), outlierDetection); + Collections.emptyMap(), outlierDetection, null); private final DiscoveryMechanism logicalDnsDiscoveryMechanism = DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, DNS_HOST_NAME, LRS_SERVER_INFO, 300L, null, - Collections.emptyMap()); + Collections.emptyMap(), null); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { diff --git a/xds/src/test/java/io/grpc/xds/GcpAuthenticationFilterTest.java b/xds/src/test/java/io/grpc/xds/GcpAuthenticationFilterTest.java index 1d6c97d81e6..7f203e036b7 100644 --- a/xds/src/test/java/io/grpc/xds/GcpAuthenticationFilterTest.java +++ b/xds/src/test/java/io/grpc/xds/GcpAuthenticationFilterTest.java @@ -491,7 +491,7 @@ private static CdsUpdate getCdsUpdate() { parsedMetadata.put("FILTER_INSTANCE_NAME", new AudienceWrapper("TEST_AUDIENCE")); try { CdsUpdate.Builder cdsUpdate = CdsUpdate.forEds( - CLUSTER_NAME, EDS_NAME, null, null, null, null, false) + CLUSTER_NAME, EDS_NAME, null, null, null, null, false, null) .lbPolicyConfig(getWrrLbConfigAsMap()); return cdsUpdate.parsedMetadata(parsedMetadata.build()).build(); } catch (IOException ex) { @@ -504,7 +504,7 @@ private static CdsUpdate getCdsUpdate2() { parsedMetadata.put("FILTER_INSTANCE_NAME", new AudienceWrapper("NEW_TEST_AUDIENCE")); try { CdsUpdate.Builder cdsUpdate = CdsUpdate.forEds( - CLUSTER_NAME, EDS_NAME, null, null, null, null, false) + CLUSTER_NAME, EDS_NAME, null, null, null, null, false, null) .lbPolicyConfig(getWrrLbConfigAsMap()); return cdsUpdate.parsedMetadata(parsedMetadata.build()).build(); } catch (IOException ex) { @@ -516,7 +516,7 @@ private static CdsUpdate getCdsUpdateWithIncorrectAudienceWrapper() throws IOExc ImmutableMap.Builder parsedMetadata = ImmutableMap.builder(); parsedMetadata.put("FILTER_INSTANCE_NAME", "TEST_AUDIENCE"); CdsUpdate.Builder cdsUpdate = CdsUpdate.forEds( - CLUSTER_NAME, EDS_NAME, null, null, null, null, false) + CLUSTER_NAME, EDS_NAME, null, null, null, null, false, null) .lbPolicyConfig(getWrrLbConfigAsMap()); return cdsUpdate.parsedMetadata(parsedMetadata.build()).build(); } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java index 3650e5d5bb9..495a3f31e78 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplDataTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import com.github.udpa.udpa.type.v1.TypedStruct; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -140,6 +141,7 @@ import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher; import io.grpc.xds.WeightedRoundRobinLoadBalancer.WeightedRoundRobinLoadBalancerConfig; import io.grpc.xds.XdsClusterResource.CdsUpdate; +import io.grpc.xds.client.BackendMetricPropagation; import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsResourceType; @@ -2640,6 +2642,42 @@ public void parseNonAggregateCluster_withHttp11ProxyTransportSocket() throws Exc assertThat(result.isHttp11ProxyAvailable()).isTrue(); } + @Test + public void processCluster_parsesOrcaLrsPropagationMetrics() throws ResourceInvalidException { + XdsClusterResource.isEnabledOrcaLrsPropagation = true; + + ImmutableList metricSpecs = ImmutableList.of( + "cpu_utilization", + "named_metrics.foo", + "unknown_metric_spec" + ); + Cluster cluster = Cluster.newBuilder() + .setName("cluster-orca.googleapis.com") + .setType(DiscoveryType.EDS) + .setEdsClusterConfig( + EdsClusterConfig.newBuilder() + .setEdsConfig( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setServiceName("service-orca.googleapis.com")) + .setLbPolicy(LbPolicy.ROUND_ROBIN) + .addAllLrsReportEndpointMetrics(metricSpecs) + .build(); + + CdsUpdate update = XdsClusterResource.processCluster( + cluster, null, LRS_SERVER_INFO, LoadBalancerRegistry.getDefaultRegistry()); + + BackendMetricPropagation propagationConfig = update.backendMetricPropagation(); + assertThat(propagationConfig).isNotNull(); + assertThat(propagationConfig.propagateCpuUtilization).isTrue(); + assertThat(propagationConfig.propagateMemUtilization).isFalse(); + assertThat(propagationConfig.shouldPropagateNamedMetric("foo")).isTrue(); + assertThat(propagationConfig.shouldPropagateNamedMetric("bar")).isFalse(); + assertThat(propagationConfig.shouldPropagateNamedMetric("unknown_metric_spec")) + .isFalse(); + + XdsClusterResource.isEnabledOrcaLrsPropagation = false; + } + @Test public void parseServerSideListener_invalidTrafficDirection() throws ResourceInvalidException { Listener listener = diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java index b2b713e9a8e..ee32c4490af 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java @@ -154,7 +154,6 @@ public void lrsClientsStartedForLocalityStats() throws InterruptedException, Exe } } - /** * Assures that when an {@link XdsClient} is asked to add cluster locality stats it appropriately * starts {@link LoadReportClient}s to do that. diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index b50bdfce01f..dbf7d07aea3 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -1235,7 +1235,7 @@ private static void createAndDeliverClusterUpdates( FakeXdsClient xdsClient, String... clusterNames) { for (String clusterName : clusterNames) { CdsUpdate.Builder forEds = CdsUpdate - .forEds(clusterName, clusterName, null, null, null, null, false) + .forEds(clusterName, clusterName, null, null, null, null, false, null) .roundRobinLbPolicy(); xdsClient.deliverCdsUpdate(clusterName, forEds.build()); EdsUpdate edsUpdate = new EdsUpdate(clusterName, diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java index b1818445bea..63bd139c2cd 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -268,7 +268,7 @@ static XdsConfig getDefaultXdsConfig(String serverHostName) XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate( EDS_NAME, lbEndpointsMap, Collections.emptyList()); XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds( - CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null, false) + CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null, false, null) .lbPolicyConfig(getWrrLbConfigAsMap()).build(); XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig( CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate))); diff --git a/xds/src/test/java/io/grpc/xds/client/BackendMetricPropagationTest.java b/xds/src/test/java/io/grpc/xds/client/BackendMetricPropagationTest.java new file mode 100644 index 00000000000..31ad6f9c47f --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/client/BackendMetricPropagationTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.client; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.Arrays.asList; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link BackendMetricPropagation}. + */ +@RunWith(JUnit4.class) +public class BackendMetricPropagationTest { + + @Test + public void fromMetricSpecs_nullInput() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs(null); + + assertThat(config.propagateCpuUtilization).isFalse(); + assertThat(config.propagateMemUtilization).isFalse(); + assertThat(config.propagateApplicationUtilization).isFalse(); + assertThat(config.shouldPropagateNamedMetric("any")).isFalse(); + } + + @Test + public void fromMetricSpecs_emptyInput() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs(ImmutableList.of()); + + assertThat(config.propagateCpuUtilization).isFalse(); + assertThat(config.propagateMemUtilization).isFalse(); + assertThat(config.propagateApplicationUtilization).isFalse(); + assertThat(config.shouldPropagateNamedMetric("any")).isFalse(); + } + + @Test + public void fromMetricSpecs_partialStandardMetrics() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + ImmutableList.of("cpu_utilization", "mem_utilization")); + + assertThat(config.propagateCpuUtilization).isTrue(); + assertThat(config.propagateMemUtilization).isTrue(); + assertThat(config.propagateApplicationUtilization).isFalse(); + assertThat(config.shouldPropagateNamedMetric("any")).isFalse(); + } + + @Test + public void fromMetricSpecs_allStandardMetrics() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + ImmutableList.of("cpu_utilization", "mem_utilization", "application_utilization")); + + assertThat(config.propagateCpuUtilization).isTrue(); + assertThat(config.propagateMemUtilization).isTrue(); + assertThat(config.propagateApplicationUtilization).isTrue(); + assertThat(config.shouldPropagateNamedMetric("any")).isFalse(); + } + + @Test + public void fromMetricSpecs_wildcardNamedMetrics() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + ImmutableList.of("named_metrics.*")); + + assertThat(config.propagateCpuUtilization).isFalse(); + assertThat(config.propagateMemUtilization).isFalse(); + assertThat(config.propagateApplicationUtilization).isFalse(); + assertThat(config.shouldPropagateNamedMetric("any_key")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("another_key")).isTrue(); + } + + @Test + public void fromMetricSpecs_specificNamedMetrics() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + ImmutableList.of("named_metrics.foo", "named_metrics.bar")); + + assertThat(config.shouldPropagateNamedMetric("foo")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("bar")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("baz")).isFalse(); + assertThat(config.shouldPropagateNamedMetric("any")).isFalse(); + } + + @Test + public void fromMetricSpecs_mixedStandardAndNamed() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + ImmutableList.of("cpu_utilization", "named_metrics.foo", "named_metrics.bar")); + + assertThat(config.propagateCpuUtilization).isTrue(); + assertThat(config.propagateMemUtilization).isFalse(); + assertThat(config.shouldPropagateNamedMetric("foo")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("bar")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("baz")).isFalse(); + } + + @Test + public void fromMetricSpecs_wildcardAndSpecificNamedMetrics() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + ImmutableList.of("named_metrics.foo", "named_metrics.*")); + + assertThat(config.shouldPropagateNamedMetric("foo")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("bar")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("any_other_key")).isTrue(); + } + + @Test + public void fromMetricSpecs_malformedAndUnknownSpecs_areIgnored() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + asList( + "cpu_utilization", + null, // ignored + "disk_utilization", + "named_metrics.", // empty key + "named_metrics.valid" + )); + + assertThat(config.propagateCpuUtilization).isTrue(); + assertThat(config.propagateMemUtilization).isFalse(); + assertThat(config.shouldPropagateNamedMetric("disk_utilization")).isFalse(); + assertThat(config.shouldPropagateNamedMetric("valid")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("")).isFalse(); // from the empty key + } + + @Test + public void fromMetricSpecs_duplicateSpecs_areHandledGracefully() { + BackendMetricPropagation config = BackendMetricPropagation.fromMetricSpecs( + ImmutableList.of( + "cpu_utilization", + "named_metrics.foo", + "cpu_utilization", + "named_metrics.foo")); + + assertThat(config.propagateCpuUtilization).isTrue(); + assertThat(config.shouldPropagateNamedMetric("foo")).isTrue(); + assertThat(config.shouldPropagateNamedMetric("bar")).isFalse(); + } +}