diff --git a/api/src/main/java/io/grpc/EquivalentAddressGroup.java b/api/src/main/java/io/grpc/EquivalentAddressGroup.java index 4b3db006684..038ae91689c 100644 --- a/api/src/main/java/io/grpc/EquivalentAddressGroup.java +++ b/api/src/main/java/io/grpc/EquivalentAddressGroup.java @@ -50,6 +50,11 @@ public final class EquivalentAddressGroup { @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6138") public static final Attributes.Key ATTR_AUTHORITY_OVERRIDE = Attributes.Key.create("io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE"); + /** + * The name of the locality that this EquivalentAddressGroup is in. + */ + public static final Attributes.Key ATTR_LOCALITY_NAME = + Attributes.Key.create("io.grpc.lb.locality"); private final List addrs; private final Attributes attrs; diff --git a/api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java b/api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java new file mode 100644 index 00000000000..07e099cde5d --- /dev/null +++ b/api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java @@ -0,0 +1,32 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import java.util.List; + +/** + * Represents a long-valued up down counter metric instrument. + */ +@Internal +public final class LongUpDownCounterMetricInstrument extends PartialMetricInstrument { + public LongUpDownCounterMetricInstrument(int index, String name, String description, String unit, + List requiredLabelKeys, + List optionalLabelKeys, + boolean enableByDefault) { + super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault); + } +} \ No newline at end of file diff --git a/api/src/main/java/io/grpc/MetricInstrumentRegistry.java b/api/src/main/java/io/grpc/MetricInstrumentRegistry.java index 1b33ed17a71..ce0f8f1b5cb 100644 --- a/api/src/main/java/io/grpc/MetricInstrumentRegistry.java +++ b/api/src/main/java/io/grpc/MetricInstrumentRegistry.java @@ -144,6 +144,47 @@ public LongCounterMetricInstrument registerLongCounter(String name, } } + /** + * Registers a new Long Up Down Counter metric instrument. + * + * @param name the name of the metric + * @param description a description of the metric + * @param unit the unit of measurement for the metric + * @param requiredLabelKeys a list of required label keys + * @param optionalLabelKeys a list of optional label keys + * @param enableByDefault whether the metric should be enabled by default + * @return the newly created LongUpDownCounterMetricInstrument + * @throws IllegalStateException if a metric with the same name already exists + */ + public LongUpDownCounterMetricInstrument registerLongUpDownCounter(String name, + String description, + String unit, + List requiredLabelKeys, + List optionalLabelKeys, + boolean enableByDefault) { + checkArgument(!Strings.isNullOrEmpty(name), "missing metric name"); + checkNotNull(description, "description"); + checkNotNull(unit, "unit"); + checkNotNull(requiredLabelKeys, "requiredLabelKeys"); + checkNotNull(optionalLabelKeys, "optionalLabelKeys"); + synchronized (lock) { + if (registeredMetricNames.contains(name)) { + throw new IllegalStateException("Metric with name " + name + " already exists"); + } + int index = nextAvailableMetricIndex; + if (index + 1 == metricInstruments.length) { + resizeMetricInstruments(); + } + LongUpDownCounterMetricInstrument instrument = new LongUpDownCounterMetricInstrument( + index, name, description, unit, requiredLabelKeys, optionalLabelKeys, + enableByDefault); + metricInstruments[index] = instrument; + registeredMetricNames.add(name); + nextAvailableMetricIndex += 1; + return instrument; + } + } + /** * Registers a new Double Histogram metric instrument. * diff --git a/api/src/main/java/io/grpc/MetricRecorder.java b/api/src/main/java/io/grpc/MetricRecorder.java index d418dcbf590..897c28011cd 100644 --- a/api/src/main/java/io/grpc/MetricRecorder.java +++ b/api/src/main/java/io/grpc/MetricRecorder.java @@ -50,7 +50,7 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do * Adds a value for a long valued counter metric instrument. * * @param metricInstrument The counter metric instrument to add the value against. - * @param value The value to add. + * @param value The value to add. MUST be non-negative. * @param requiredLabelValues A list of required label values for the metric. * @param optionalLabelValues A list of additional, optional label values for the metric. */ @@ -66,6 +66,29 @@ default void addLongCounter(LongCounterMetricInstrument metricInstrument, long v metricInstrument.getOptionalLabelKeys().size()); } + /** + * Adds a value for a long valued up down counter metric instrument. + * + * @param metricInstrument The counter metric instrument to add the value against. + * @param value The value to add. May be positive, negative or zero. + * @param requiredLabelValues A list of required label values for the metric. + * @param optionalLabelValues A list of additional, optional label values for the metric. + */ + default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, + long value, + List requiredLabelValues, + List optionalLabelValues) { + checkArgument(requiredLabelValues != null + && requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(), + "Incorrect number of required labels provided. Expected: %s", + metricInstrument.getRequiredLabelKeys().size()); + checkArgument(optionalLabelValues != null + && optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(), + "Incorrect number of optional labels provided. Expected: %s", + metricInstrument.getOptionalLabelKeys().size()); + } + + /** * Records a value for a double-precision histogram metric instrument. * diff --git a/api/src/main/java/io/grpc/MetricSink.java b/api/src/main/java/io/grpc/MetricSink.java index 0f56b1acb73..ce5d3822520 100644 --- a/api/src/main/java/io/grpc/MetricSink.java +++ b/api/src/main/java/io/grpc/MetricSink.java @@ -65,12 +65,26 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do * Adds a value for a long valued counter metric associated with specified metric instrument. * * @param metricInstrument The counter metric instrument identifies metric measure to add. - * @param value The value to record. + * @param value The value to record. MUST be non-negative. * @param requiredLabelValues A list of required label values for the metric. * @param optionalLabelValues A list of additional, optional label values for the metric. */ default void addLongCounter(LongCounterMetricInstrument metricInstrument, long value, - List requiredLabelValues, List optionalLabelValues) { + List requiredLabelValues, List optionalLabelValues) { + } + + /** + * Adds a value for a long valued up down counter metric associated with specified metric + * instrument. + * + * @param metricInstrument The counter metric instrument identifies metric measure to add. + * @param value The value to record. May be positive, negative or zero. + * @param requiredLabelValues A list of required label values for the metric. + * @param optionalLabelValues A list of additional, optional label values for the metric. + */ + default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { } /** diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index a27e46eaf60..701bf886f95 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -48,6 +48,9 @@ import io.grpc.LoadBalancer; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; +import io.grpc.NameResolver; +import io.grpc.SecurityLevel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -160,6 +163,8 @@ protected void handleNotInUse() { private Status shutdownReason; private volatile Attributes connectedAddressAttributes; + private final SubchannelMetrics subchannelMetrics; + private final String target; InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, @@ -168,7 +173,9 @@ protected void handleNotInUse() { Supplier stopwatchSupplier, SynchronizationContext syncContext, Callback callback, InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer, InternalLogId logId, - ChannelLogger channelLogger, List transportFilters) { + ChannelLogger channelLogger, List transportFilters, + String target, + MetricRecorder metricRecorder) { List addressGroups = args.getAddresses(); Preconditions.checkNotNull(addressGroups, "addressGroups"); Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); @@ -192,6 +199,8 @@ protected void handleNotInUse() { this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.transportFilters = transportFilters; this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY); + this.target = target; + this.subchannelMetrics = new SubchannelMetrics(metricRecorder); } ChannelLogger getChannelLogger() { @@ -593,6 +602,15 @@ public void run() { pendingTransport = null; connectedAddressAttributes = addressIndex.getCurrentEagAttributes(); gotoNonErrorState(READY); + subchannelMetrics.recordConnectionAttemptSucceeded(MetricsAttributes.newBuilder(target) + .backendService(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE)) + .locality(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), + EquivalentAddressGroup.ATTR_LOCALITY_NAME)) + .securityLevel(extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))) + .build()); } } }); @@ -618,11 +636,28 @@ public void run() { activeTransport = null; addressIndex.reset(); gotoNonErrorState(IDLE); + subchannelMetrics.recordDisconnection(MetricsAttributes.newBuilder(target) + .backendService(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE)) + .locality(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), + EquivalentAddressGroup.ATTR_LOCALITY_NAME)) + .disconnectError(SubchannelMetrics.DisconnectError.UNKNOWN.getErrorString(null)) + .securityLevel(extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))) + .build()); } else if (pendingTransport == transport) { + subchannelMetrics.recordConnectionAttemptFailed(MetricsAttributes.newBuilder(target) + .backendService(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE)) + .locality(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), + EquivalentAddressGroup.ATTR_LOCALITY_NAME)) + .build()); Preconditions.checkState(state.getState() == CONNECTING, "Expected state is CONNECTING, actual state is %s", state.getState()); addressIndex.increment(); - // Continue reconnect if there are still addresses to try. + // Continue to reconnect if there are still addresses to try. if (!addressIndex.isValid()) { pendingTransport = null; addressIndex.reset(); @@ -658,6 +693,27 @@ public void run() { } }); } + + private String extractSecurityLevel(SecurityLevel securityLevel) { + if (securityLevel == null) { + return "none"; + } + switch (securityLevel) { + case NONE: + return "none"; + case INTEGRITY: + return "integrity_only"; + case PRIVACY_AND_INTEGRITY: + return "privacy_and_integrity"; + default: + throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel); + } + } + + private String getAttributeOrDefault(Attributes attributes, Attributes.Key key) { + String value = attributes.get(key); + return value == null ? "" : value; + } } // All methods are called in syncContext diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 16b8adbd347..78c5181502f 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -415,7 +415,7 @@ void exitIdleMode() { LbHelperImpl lbHelper = new LbHelperImpl(); lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and - // may throw. We don't want to confuse our state, even if we will enter panic mode. + // may throw. We don't want to confuse our state, even if we enter panic mode. this.lbHelper = lbHelper; channelStateManager.gotoState(CONNECTING); @@ -1464,7 +1464,9 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { subchannelTracer, subchannelLogId, subchannelLogger, - transportFilters); + transportFilters, + target, + lbHelper.getMetricRecorder()); oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() .setDescription("Child Subchannel created") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) @@ -1895,7 +1897,8 @@ void onNotInUse(InternalSubchannel is) { subchannelTracer, subchannelLogId, subchannelLogger, - transportFilters); + transportFilters, target, + lbHelper.getMetricRecorder()); channelTracer.reportEvent(new ChannelTrace.Event.Builder() .setDescription("Child Subchannel started") diff --git a/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java b/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java index 452b1c5df07..ded9d5ce589 100644 --- a/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java +++ b/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java @@ -26,6 +26,7 @@ import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; import io.grpc.LongHistogramMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricRecorder; @@ -82,7 +83,7 @@ public void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, dou * Records a long counter value. * * @param metricInstrument the {@link LongCounterMetricInstrument} to record. - * @param value the value to record. + * @param value the value to record. Must be non-negative. * @param requiredLabelValues the required label values for the metric. * @param optionalLabelValues the optional label values for the metric. */ @@ -103,6 +104,32 @@ public void addLongCounter(LongCounterMetricInstrument metricInstrument, long va } } + /** + * Adds a long up down counter value. + * + * @param metricInstrument the {@link io.grpc.LongUpDownCounterMetricInstrument} to record. + * @param value the value to record. May be positive, negative or zero. + * @param requiredLabelValues the required label values for the metric. + * @param optionalLabelValues the optional label values for the metric. + */ + @Override + public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { + MetricRecorder.super.addLongUpDownCounter(metricInstrument, value, requiredLabelValues, + optionalLabelValues); + for (MetricSink sink : metricSinks) { + int measuresSize = sink.getMeasuresSize(); + if (measuresSize <= metricInstrument.getIndex()) { + // Measures may need updating in two cases: + // 1. When the sink is initially created with an empty list of measures. + // 2. When new metric instruments are registered, requiring the sink to accommodate them. + sink.updateMeasures(registry.getMetricInstruments()); + } + sink.addLongUpDownCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues); + } + } + /** * Records a double histogram value. * diff --git a/core/src/main/java/io/grpc/internal/MetricsAttributes.java b/core/src/main/java/io/grpc/internal/MetricsAttributes.java new file mode 100644 index 00000000000..774ec5251e0 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/MetricsAttributes.java @@ -0,0 +1,78 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +class MetricsAttributes { + final String target; + final String backendService; + final String locality; + final String disconnectError; + final String securityLevel; + + // Constructor is private, only the Builder can call it + private MetricsAttributes(Builder builder) { + this.target = builder.target; + this.backendService = builder.backendService; + this.locality = builder.locality; + this.disconnectError = builder.disconnectError; + this.securityLevel = builder.securityLevel; + } + + // Public static method to get a new builder instance + public static Builder newBuilder(String target) { + return new Builder(target); + } + + public static class Builder { + // Required parameter + private final String target; + + // Optional parameters - initialized to default values + private String backendService = null; + private String locality = null; + private String disconnectError = null; + private String securityLevel = null; + + public Builder(String target) { + this.target = target; + } + + public Builder backendService(String val) { + this.backendService = val; + return this; + } + + public Builder locality(String val) { + this.locality = val; + return this; + } + + public Builder disconnectError(String val) { + this.disconnectError = val; + return this; + } + + public Builder securityLevel(String val) { + this.securityLevel = val; + return this; + } + + public MetricsAttributes build() { + return new MetricsAttributes(this); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index bbc144ea775..ebe329ca591 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -92,7 +92,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { return Status.FAILED_PRECONDITION.withDescription("Already shut down"); } - // Cache whether or not this is a petiole policy, which is based off of an address attribute + // Check whether this is a petiole policy, which is based off of an address attribute Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY); this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy; diff --git a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java new file mode 100644 index 00000000000..2112e7bc2c4 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java @@ -0,0 +1,187 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import io.grpc.LongCounterMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; +import javax.annotation.Nullable; + +final class SubchannelMetrics { + + private static final LongCounterMetricInstrument disconnections; + private static final LongCounterMetricInstrument connectionAttemptsSucceeded; + private static final LongCounterMetricInstrument connectionAttemptsFailed; + private static final LongUpDownCounterMetricInstrument openConnections; + private final MetricRecorder metricRecorder; + + public SubchannelMetrics(MetricRecorder metricRecorder) { + this.metricRecorder = metricRecorder; + } + + static { + MetricInstrumentRegistry metricInstrumentRegistry + = MetricInstrumentRegistry.getDefaultRegistry(); + disconnections = metricInstrumentRegistry.registerLongCounter( + "grpc.subchannel.disconnections", + "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected", + "{disconnection}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"), + false + ); + + connectionAttemptsSucceeded = metricInstrumentRegistry.registerLongCounter( + "grpc.subchannel.connection_attempts_succeeded", + "EXPERIMENTAL. Number of successful connection attempts", + "{attempt}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality"), + false + ); + + connectionAttemptsFailed = metricInstrumentRegistry.registerLongCounter( + "grpc.subchannel.connection_attempts_failed", + "EXPERIMENTAL. Number of failed connection attempts", + "{attempt}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality"), + false + ); + + openConnections = metricInstrumentRegistry.registerLongUpDownCounter( + "grpc.subchannel.open_connections", + "EXPERIMENTAL. Number of open connections.", + "{connection}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.security_level", "grpc.lb.backend_service", "grpc.lb.locality"), + false + ); + } + + public void recordConnectionAttemptSucceeded(MetricsAttributes labelSet) { + metricRecorder + .addLongCounter(connectionAttemptsSucceeded, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.backendService, labelSet.locality)); + metricRecorder + .addLongUpDownCounter(openConnections, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); + } + + public void recordConnectionAttemptFailed(MetricsAttributes labelSet) { + metricRecorder + .addLongCounter(connectionAttemptsFailed, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.backendService, labelSet.locality)); + } + + public void recordDisconnection(MetricsAttributes labelSet) { + metricRecorder + .addLongCounter(disconnections, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.backendService, labelSet.locality, labelSet.disconnectError)); + metricRecorder + .addLongUpDownCounter(openConnections, -1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); + } + + /** + * Represents the reason for a subchannel failure. + */ + public enum DisconnectError { + + /** + * Represents an HTTP/2 GOAWAY frame. The specific error code + * (e.g., "NO_ERROR", "PROTOCOL_ERROR") should be handled separately + * as it is a dynamic part of the error. + * See RFC 9113 for error codes: https://www.rfc-editor.org/rfc/rfc9113.html#name-error-codes + */ + GOAWAY("goaway"), + + /** + * The subchannel was shut down for various reasons like parent channel shutdown, + * idleness, or load balancing policy changes. + */ + SUBCHANNEL_SHUTDOWN("subchannel shutdown"), + + /** + * Connection was reset (e.g., ECONNRESET, WSAECONNERESET). + */ + CONNECTION_RESET("connection reset"), + + /** + * Connection timed out (e.g., ETIMEDOUT, WSAETIMEDOUT), including closures + * from gRPC keepalives. + */ + CONNECTION_TIMED_OUT("connection timed out"), + + /** + * Connection was aborted (e.g., ECONNABORTED, WSAECONNABORTED). + */ + CONNECTION_ABORTED("connection aborted"), + + /** + * Any socket error not covered by other specific disconnect errors. + */ + SOCKET_ERROR("socket error"), + + /** + * A catch-all for any other unclassified reason. + */ + UNKNOWN("unknown"); + + private final String errorTag; + + /** + * Private constructor to associate a description with each enum constant. + * + * @param errorTag The detailed explanation of the error. + */ + DisconnectError(String errorTag) { + this.errorTag = errorTag; + } + + /** + * Gets the error string suitable for use as a metric tag. + * + *

If the reason is {@code GOAWAY}, this method requires the specific + * HTTP/2 error code to create the complete tag (e.g., "goaway PROTOCOL_ERROR"). + * For all other reasons, the parameter is ignored.

+ * + * @param goawayErrorCode The specific HTTP/2 error code. This is only + * used if the reason is GOAWAY and should not be null in that case. + * @return The formatted error string. + */ + public String getErrorString(@Nullable String goawayErrorCode) { + if (this == GOAWAY) { + if (goawayErrorCode == null || goawayErrorCode.isEmpty()) { + // Return the base tag if the code is missing, or consider throwing an exception + // throw new IllegalArgumentException("goawayErrorCode is required for GOAWAY reason."); + return this.errorTag; + } + return this.errorTag + " " + goawayErrorCode; + } + return this.errorTag; + } + } +} diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index bed722f5f3a..4ac5fbac362 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -29,10 +29,13 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -48,6 +51,10 @@ import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; import io.grpc.LoadBalancer; +import io.grpc.MetricInstrument; +import io.grpc.MetricRecorder; +import io.grpc.NameResolver; +import io.grpc.SecurityLevel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.InternalSubchannel.CallTracingTransport; @@ -68,6 +75,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -81,6 +89,9 @@ public class InternalSubchannelTest { public final MockitoRule mocks = MockitoJUnit.rule(); private static final String AUTHORITY = "fakeauthority"; + private static final String BACKEND_SERVICE = "ice-cream-factory-service"; + private static final String LOCALITY = "mars-olympus-mons-datacenter"; + private static final SecurityLevel SECURITY_LEVEL = SecurityLevel.PRIVACY_AND_INTEGRITY; private static final String USER_AGENT = "mosaic"; private static final ConnectivityStateInfo UNAVAILABLE_STATE = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); @@ -108,6 +119,10 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; @Mock private ClientTransportFactory mockTransportFactory; + @Mock private BackoffPolicy mockBackoffPolicy; + private MetricRecorder mockMetricRecorder = mock(MetricRecorder.class, + delegatesTo(new MetricRecorderImpl())); + private final LinkedList callbackInvokes = new LinkedList<>(); private final InternalSubchannel.Callback mockInternalSubchannelCallback = new InternalSubchannel.Callback() { @@ -1446,7 +1461,136 @@ private void createInternalSubchannel(boolean reconnectDisabled, subchannelTracer, logId, new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), - Collections.emptyList()); + Collections.emptyList(), + "", + new MetricRecorder() { + } + ); + } + + @Test + public void subchannelStateChanges_triggersAttemptFailedMetric() { + // 1. Setup: Standard subchannel initialization + when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy); + SocketAddress addr = mock(SocketAddress.class); + Attributes eagAttributes = Attributes.newBuilder() + .set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL) + .build(); + List addressGroups = + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes)); + InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY); + ChannelTracer subchannelTracer = new ChannelTracer(logId, 10, + fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel"); + LoadBalancer.CreateSubchannelArgs createSubchannelArgs = + LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build(); + internalSubchannel = new InternalSubchannel( + createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, + mockTransportFactory, fakeClock.getScheduledExecutorService(), + fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz, + CallTracer.getDefaultFactory().create(), subchannelTracer, logId, + new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), + Collections.emptyList(), AUTHORITY, mockMetricRecorder + ); + + // --- Action: Simulate the "connecting to failed" transition --- + // a. Initiate the connection attempt. The subchannel is now CONNECTING. + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull("A connection attempt should have been made", transportInfo); + + // b. Fail the transport before it can signal `transportReady()`. + transportInfo.listener.transportShutdown( + Status.INTERNAL.withDescription("Simulated connect failure")); + fakeClock.runDueTasks(); // Process the failure event + + // --- Verification --- + // a. Verify that the "connection_attempts_failed" metric was recorded exactly once. + verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY)) + ); + + // b. Verify no other metrics were recorded. This confirms it wasn't incorrectly + // logged as a success, disconnection, or open connection. + verifyNoMoreInteractions(mockMetricRecorder); + } + + @Test + public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() { + // 1. Mock the backoff policy (needed for subchannel creation) + when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy); + + // 2. Setup Subchannel with attributes + SocketAddress addr = mock(SocketAddress.class); + Attributes eagAttributes = Attributes.newBuilder() + .set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL) + .build(); + List addressGroups = + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes)); + createInternalSubchannel(new EquivalentAddressGroup(addr)); + InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY); + ChannelTracer subchannelTracer = new ChannelTracer(logId, 10, + fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel"); + LoadBalancer.CreateSubchannelArgs createSubchannelArgs = + LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build(); + internalSubchannel = new InternalSubchannel( + createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, + mockTransportFactory, fakeClock.getScheduledExecutorService(), + fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz, + CallTracer.getDefaultFactory().create(), subchannelTracer, logId, + new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), + Collections.emptyList(), AUTHORITY, mockMetricRecorder + ); + + // --- Action: Successful connection --- + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull(transportInfo); + transportInfo.listener.transportReady(); + fakeClock.runDueTasks(); // Process the successful connection + + // --- Action: Transport is shut down --- + transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unknown")); + fakeClock.runDueTasks(); // Process the shutdown + + // --- Verification --- + InOrder inOrder = inOrder(mockMetricRecorder); + + // Verify successful connection metrics + inOrder.verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.connection_attempts_succeeded"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY)) + ); + inOrder.verify(mockMetricRecorder).addLongUpDownCounter( + eqMetricInstrumentName("grpc.subchannel.open_connections"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY)) + ); + + // Verify disconnection metrics + inOrder.verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.disconnections"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY, "unknown")) + ); + inOrder.verify(mockMetricRecorder).addLongUpDownCounter( + eqMetricInstrumentName("grpc.subchannel.open_connections"), + eq(-1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY)) + ); + + inOrder.verifyNoMoreInteractions(); } private void assertNoCallbackInvoke() { @@ -1459,5 +1603,13 @@ private void assertExactCallbackInvokes(String ... expectedInvokes) { callbackInvokes.clear(); } + static class MetricRecorderImpl implements MetricRecorder { + } + + @SuppressWarnings("TypeParameterUnusedInFormals") + private T eqMetricInstrumentName(String name) { + return argThat(instrument -> instrument.getName().equals(name)); + } + private static class FakeSocketAddress extends SocketAddress {} } diff --git a/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java b/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java index 08f34a267f9..33bf9bb41e2 100644 --- a/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java +++ b/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java @@ -32,6 +32,7 @@ import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; import io.grpc.LongHistogramMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricInstrumentRegistryAccessor; import io.grpc.MetricRecorder; @@ -79,6 +80,9 @@ public class MetricRecorderImplTest { private final LongGaugeMetricInstrument longGaugeInstrument = registry.registerLongGauge("gauge0", DESCRIPTION, UNIT, REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED); + private final LongUpDownCounterMetricInstrument longUpDownCounterInstrument = + registry.registerLongUpDownCounter("upDownCounter0", DESCRIPTION, UNIT, + REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED); private MetricRecorder recorder; @Before @@ -88,7 +92,7 @@ public void setUp() { @Test public void addCounter() { - when(mockSink.getMeasuresSize()).thenReturn(4); + when(mockSink.getMeasuresSize()).thenReturn(6); recorder.addDoubleCounter(doubleCounterInstrument, 1.0, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES); @@ -100,6 +104,12 @@ public void addCounter() { verify(mockSink, times(2)).addLongCounter(eq(longCounterInstrument), eq(1L), eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES)); + recorder.addLongUpDownCounter(longUpDownCounterInstrument, -10, REQUIRED_LABEL_VALUES, + OPTIONAL_LABEL_VALUES); + verify(mockSink, times(2)) + .addLongUpDownCounter(eq(longUpDownCounterInstrument), eq(-10L), + eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES)); + verify(mockSink, never()).updateMeasures(registry.getMetricInstruments()); } @@ -190,6 +200,13 @@ public void newRegisteredMetricUpdateMeasures() { verify(mockSink, times(2)) .registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)); registration.close(); + + // Long UpDown Counter + recorder.addLongUpDownCounter(longUpDownCounterInstrument, -10, REQUIRED_LABEL_VALUES, + OPTIONAL_LABEL_VALUES); + verify(mockSink, times(12)).updateMeasures(anyList()); + verify(mockSink, times(2)).addLongUpDownCounter(eq(longUpDownCounterInstrument), eq(-10L), + eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES)); } @Test(expected = IllegalArgumentException.class) @@ -208,6 +225,13 @@ public void addLongCounterMismatchedRequiredLabelValues() { OPTIONAL_LABEL_VALUES); } + @Test(expected = IllegalArgumentException.class) + public void addLongUpDownCounterMismatchedRequiredLabelValues() { + when(mockSink.getMeasuresSize()).thenReturn(6); + recorder.addLongUpDownCounter(longUpDownCounterInstrument, 1, ImmutableList.of(), + OPTIONAL_LABEL_VALUES); + } + @Test(expected = IllegalArgumentException.class) public void recordDoubleHistogramMismatchedRequiredLabelValues() { when(mockSink.getMeasuresSize()).thenReturn(4); @@ -260,6 +284,13 @@ public void addLongCounterMismatchedOptionalLabelValues() { ImmutableList.of()); } + @Test(expected = IllegalArgumentException.class) + public void addLongUpDownCounterMismatchedOptionalLabelValues() { + when(mockSink.getMeasuresSize()).thenReturn(6); + recorder.addLongUpDownCounter(longUpDownCounterInstrument, 1, REQUIRED_LABEL_VALUES, + ImmutableList.of()); + } + @Test(expected = IllegalArgumentException.class) public void recordDoubleHistogramMismatchedOptionalLabelValues() { when(mockSink.getMeasuresSize()).thenReturn(4); diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java index 8f612804436..fd8af7f998f 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java @@ -27,6 +27,7 @@ import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; import io.grpc.LongHistogramMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrument; import io.grpc.MetricSink; import io.opentelemetry.api.common.Attributes; @@ -36,6 +37,7 @@ import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.LongUpDownCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.api.metrics.ObservableMeasurement; @@ -117,6 +119,22 @@ public void addLongCounter(LongCounterMetricInstrument metricInstrument, long va counter.add(value, attributes); } + @Override + public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { + MeasuresData instrumentData = measures.get(metricInstrument.getIndex()); + if (instrumentData == null) { + // Disabled metric + return; + } + Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(), + metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues, + instrumentData.getOptionalLabelsBitSet()); + LongUpDownCounter counter = (LongUpDownCounter) instrumentData.getMeasure(); + counter.add(value, attributes); + } + @Override public void recordDoubleHistogram(DoubleHistogramMetricInstrument metricInstrument, double value, List requiredLabelValues, List optionalLabelValues) { @@ -256,6 +274,11 @@ public void updateMeasures(List instruments) { .setDescription(description) .ofLongs() .buildObserver(); + } else if (instrument instanceof LongUpDownCounterMetricInstrument) { + openTelemetryMeasure = openTelemetryMeter.upDownCounterBuilder(name) + .setUnit(unit) + .setDescription(description) + .build(); } else { logger.log(Level.FINE, "Unsupported metric instrument type : {0}", instrument); openTelemetryMeasure = null; diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index 5214804d369..ef21903c8e7 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -36,6 +36,12 @@ public final class OpenTelemetryConstants { public static final AttributeKey BACKEND_SERVICE_KEY = AttributeKey.stringKey("grpc.lb.backend_service"); + public static final AttributeKey DISCONNECT_ERROR_KEY = + AttributeKey.stringKey("grpc.disconnect_error"); + + public static final AttributeKey SECURITY_LEVEL_KEY = + AttributeKey.stringKey("grpc.security_level"); + public static final List LATENCY_BUCKETS = ImmutableList.of( 0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d, diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricSinkTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricSinkTest.java index c538da55dcb..cced4de3cb4 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricSinkTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricSinkTest.java @@ -24,6 +24,7 @@ import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; import io.grpc.LongHistogramMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrument; import io.grpc.MetricSink; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; @@ -144,16 +145,25 @@ public void addCounter_enabledMetric() { "Number of client calls started", "count", Collections.emptyList(), Collections.emptyList(), true); + LongUpDownCounterMetricInstrument longUpDownCounterInstrument = + new LongUpDownCounterMetricInstrument(2, "active_carrier_pigeons", + "Active Carrier Pigeons", "pigeons", + Collections.emptyList(), + Collections.emptyList(), true); + // Create sink sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList()); // Invoke updateMeasures - sink.updateMeasures(Arrays.asList(longCounterInstrument, doubleCounterInstrument)); + sink.updateMeasures(Arrays.asList(longCounterInstrument, doubleCounterInstrument, + longUpDownCounterInstrument)); sink.addLongCounter(longCounterInstrument, 123L, Collections.emptyList(), Collections.emptyList()); sink.addDoubleCounter(doubleCounterInstrument, 12.0, Collections.emptyList(), Collections.emptyList()); + sink.addLongUpDownCounter(longUpDownCounterInstrument, -3L, Collections.emptyList(), + Collections.emptyList()); assertThat(openTelemetryTesting.getMetrics()) .satisfiesExactlyInAnyOrder( @@ -184,7 +194,21 @@ public void addCounter_enabledMetric() { .hasPointsSatisfying( point -> point - .hasValue(12.0D)))); + .hasValue(12.0D))), + metric -> + assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName("active_carrier_pigeons") + .hasDescription("Active Carrier Pigeons") + .hasUnit("pigeons") + .hasLongSumSatisfying( + longSum -> + longSum + .hasPointsSatisfying( + point -> + point + .hasValue(-3L)))); } @Test @@ -192,18 +216,27 @@ public void addCounter_disabledMetric() { // set up sink with disabled metric Map enabledMetrics = new HashMap<>(); enabledMetrics.put("client_latency", false); + enabledMetrics.put("active_carrier_pigeons", false); LongCounterMetricInstrument instrument = new LongCounterMetricInstrument(0, "client_latency", "Client latency", "s", Collections.emptyList(), Collections.emptyList(), true); + LongUpDownCounterMetricInstrument longUpDownCounterInstrument = + new LongUpDownCounterMetricInstrument(1, "active_carrier_pigeons", + "Active Carrier Pigeons", "pigeons", + Collections.emptyList(), + Collections.emptyList(), false); + // Create sink sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, true, Collections.emptyList()); // Invoke updateMeasures - sink.updateMeasures(Arrays.asList(instrument)); + sink.updateMeasures(Arrays.asList(instrument, longUpDownCounterInstrument)); sink.addLongCounter(instrument, 123L, Collections.emptyList(), Collections.emptyList()); + sink.addLongUpDownCounter(longUpDownCounterInstrument, -13L, Collections.emptyList(), + Collections.emptyList()); assertThat(openTelemetryTesting.getMetrics()).isEmpty(); } @@ -377,6 +410,7 @@ public void registerBatchCallback_bothEnabledAndDisabled() { public void recordLabels() { Map enabledMetrics = new HashMap<>(); enabledMetrics.put("client_latency", true); + enabledMetrics.put("ghosts_in_the_wire", true); List optionalLabels = Arrays.asList("optional_label_key_2"); @@ -384,16 +418,24 @@ public void recordLabels() { new LongCounterMetricInstrument(0, "client_latency", "Client latency", "s", ImmutableList.of("required_label_key_1", "required_label_key_2"), ImmutableList.of("optional_label_key_1", "optional_label_key_2"), false); + LongUpDownCounterMetricInstrument longUpDownCounterInstrument = + new LongUpDownCounterMetricInstrument(1, "ghosts_in_the_wire", + "Number of Ghosts Haunting the Wire", "{ghosts}", + ImmutableList.of("required_label_key_1", "required_label_key_2"), + ImmutableList.of("optional_label_key_1", "optional_label_key_2"), false); // Create sink sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, optionalLabels); // Invoke updateMeasures - sink.updateMeasures(Arrays.asList(longCounterInstrument)); + sink.updateMeasures(Arrays.asList(longCounterInstrument, longUpDownCounterInstrument)); sink.addLongCounter(longCounterInstrument, 123L, ImmutableList.of("required_label_value_1", "required_label_value_2"), ImmutableList.of("optional_label_value_1", "optional_label_value_2")); + sink.addLongUpDownCounter(longUpDownCounterInstrument, -400L, + ImmutableList.of("required_label_value_1", "required_label_value_2"), + ImmutableList.of("optional_label_value_1", "optional_label_value_2")); io.opentelemetry.api.common.Attributes expectedAtrributes = io.opentelemetry.api.common.Attributes.of( @@ -417,6 +459,22 @@ public void recordLabels() { point -> point .hasAttributes(expectedAtrributes) - .hasValue(123L)))); + .hasValue(123L))), + metric -> + assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName("ghosts_in_the_wire") + .hasDescription("Number of Ghosts Haunting the Wire") + .hasUnit("{ghosts}") + .hasLongSumSatisfying( + longSum -> + longSum + .hasPointsSatisfying( + point -> + point + .hasAttributes(expectedAtrributes) + .hasValue(-400L)))); + } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 034cdee0815..fba66e2e8d7 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -305,7 +305,7 @@ private List withAdditionalAttributes( private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) { Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY); - String localityName = addressAttributes.get(XdsAttributes.ATTR_LOCALITY_NAME); + String localityName = addressAttributes.get(EquivalentAddressGroup.ATTR_LOCALITY_NAME); // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain // attributes with its locality, including endpoints in LOGICAL_DNS clusters. diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 080760303bf..7a1e9a36603 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -428,7 +428,7 @@ public void run() { Attributes attr = endpoint.eag().getAttributes().toBuilder() .set(XdsAttributes.ATTR_LOCALITY, locality) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) .set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityLbInfo.localityWeight()) .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) @@ -679,7 +679,7 @@ public Status onResult2(final ResolutionResult resolutionResult) { String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); Attributes attr = eag.getAttributes().toBuilder() .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) .build(); eag = new EquivalentAddressGroup(eag.getAddresses(), attr); diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java index ab1abb1da15..1a12412f923 100644 --- a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java @@ -74,7 +74,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { Map localityWeights = new HashMap<>(); for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) { Attributes eagAttrs = eag.getAttributes(); - String locality = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_NAME); + String locality = eagAttrs.get(EquivalentAddressGroup.ATTR_LOCALITY_NAME); Integer localityWeight = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_WEIGHT); if (locality == null) { diff --git a/xds/src/main/java/io/grpc/xds/XdsAttributes.java b/xds/src/main/java/io/grpc/xds/XdsAttributes.java index 4a64fdb1453..2e165201e5f 100644 --- a/xds/src/main/java/io/grpc/xds/XdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/XdsAttributes.java @@ -81,13 +81,6 @@ final class XdsAttributes { static final Attributes.Key ATTR_LOCALITY = Attributes.Key.create("io.grpc.xds.XdsAttributes.locality"); - /** - * The name of the locality that this EquivalentAddressGroup is in. - */ - @EquivalentAddressGroup.Attr - static final Attributes.Key ATTR_LOCALITY_NAME = - Attributes.Key.create("io.grpc.xds.XdsAttributes.localityName"); - /** * Endpoint weight for load balancing purposes. */ diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 7df0630b779..c5e3f80f170 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -1017,7 +1017,7 @@ public String toString() { Attributes.Builder attributes = Attributes.newBuilder() .set(XdsAttributes.ATTR_LOCALITY, locality) // Unique but arbitrary string - .set(XdsAttributes.ATTR_LOCALITY_NAME, locality.toString()); + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, locality.toString()); if (authorityHostname != null) { attributes.set(XdsAttributes.ATTR_ADDRESS_NAME, authorityHostname); } diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java index b6a5d8dbf73..584c32738c5 100644 --- a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java @@ -254,7 +254,7 @@ public String toString() { } Attributes.Builder attrBuilder = Attributes.newBuilder() - .set(XdsAttributes.ATTR_LOCALITY_NAME, locality); + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, locality); if (localityWeight != null) { attrBuilder.set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight); }