diff --git a/grpc-gcp/build.gradle b/grpc-gcp/build.gradle index 08c5f93..d1e6224 100644 --- a/grpc-gcp/build.gradle +++ b/grpc-gcp/build.gradle @@ -21,11 +21,12 @@ description = 'GRPC-GCP-Extension Java' sourceCompatibility = '1.8' def title = 'gRPC extension library for Google Cloud Platform' -def grpcVersion = '1.36.3' +def grpcVersion = '1.59.1' def protobufVersion = '3.20.3' def protocVersion = '3.20.3' -def spannerVersion = '6.1.0' +def spannerVersion = '6.81.0' def opencensusVersion = '0.28.3' +def opentelemetryVersion = '1.41.0' dependencies { implementation "com.google.protobuf:protobuf-java:${protobufVersion}" @@ -35,6 +36,7 @@ dependencies { implementation "io.grpc:grpc-stub:${grpcVersion}" implementation "io.opencensus:opencensus-api:${opencensusVersion}" implementation "com.google.api:api-common:2.1.5" + implementation "io.opentelemetry:opentelemetry-api:${opentelemetryVersion}" compileOnly "org.apache.tomcat:annotations-api:6.0.53" // necessary for Java 9+ diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetrics.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetrics.java new file mode 100644 index 0000000..f3779fd --- /dev/null +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetrics.java @@ -0,0 +1,436 @@ +package com.google.cloud.grpc.metrics; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * The GcpMetrics class provides a structured way to define and configure metrics. It defines + * standard attributes, predefined metrics, metric types, units of measurement, and a flexible + * configuration system using builders. This framework helps ensure consistency and ease of use when + * instrumenting GCP API clients. + */ +public class GcpMetrics { + // Convenient map of all built-in attributes to their names. + public static final Map allBuiltInAttributes = + Collections.unmodifiableMap( + Stream.of(Attr.values()).collect(Collectors.toMap((Attr x) -> x, Objects::toString))); + + // All predefined metrics. + public static final Set allMetrics = + Collections.unmodifiableSet( + new HashSet<>( + Stream.of(Metric.values()) + .map(ConfigurableMetric::getDefaultConfig) + .collect(Collectors.toSet()))); + + /** + * The Attr enum defines a set of standard attributes that can be associated with metrics. These + * attributes provide additional context about the metric being recorded. For example, STATUS + * represents the gRPC status code, METHOD represents the gRPC method name, and so on. + */ + public enum Attr { + STATUS("grpc.status"), + METHOD("grpc.method"), + METHOD_TYPE("grpcgcp.method_type"), + TRANSPARENT_RETRY("grpcgcp.tr_retry"); + + private final String attr; + + Attr(String attr) { + this.attr = attr; + } + + @Override + public String toString() { + return attr; + } + } + + /** + * The Metric enum defines a set of predefined metrics that can be recorded. Each metric has a + * Kind (e.g., COUNTER, HISTOGRAM), an internal Unit (e.g., NANOSECONDS, OCCURENCES), and a + * default MetricConfig. + */ + public enum Metric implements ConfigurableMetric { + START_DELAY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("start_delay") + .withDescription("Delay by invocating interceptors in the beginning of the call.") + .withUnit(Unit.SECONDS)) + .build()), + RESOLUTION_DELAY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("name_resolution_delay") + .withDescription("Delay caused by name resolution of the target.") + .withUnit(Unit.SECONDS)) + .build()), + CONNECTION_DELAY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("connection_delay") + .withDescription("Delay caused by establishing a connection.") + .withUnit(Unit.SECONDS)) + .build()), + SEND_DELAY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("send_delay") + .withDescription( + "Time spent after establishing a connection (or invoking " + + "interceptors if the connection is already established) and till sending " + + "headers.") + .withUnit(Unit.SECONDS)) + .build()), + HEADERS_LATENCY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("headers_latency") + .withDescription("Time passed between sending and receiving headers.") + .withUnit(Unit.SECONDS)) + .build()), + RESPONSE_LATENCY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("response_latency") + .withDescription( + "Time passed between sending headers and receiving complete " + "response.") + .withUnit(Unit.SECONDS)) + .build()), + PROCESSING_DELAY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("client_processing_latency") + .withDescription( + "Time spent after receiving complete response and till all " + + "interceptors processed onClose.") + .withUnit(Unit.SECONDS)) + .build()), + TOTAL_LATENCY( + Kind.HISTOGRAM, + Unit.NANOSECONDS, + (new DoubleMetricConfig.Builder("total_rpc_latency") + .withDescription("Total time ovserved by GcpMetricsInterceptor.") + .withUnit(Unit.SECONDS)) + .build()); + + private final Kind kind; + private final Unit internalUnit; + private final MetricConfig templateConfig; + private MetricConfig defaultConfig = null; + + Metric(Kind kind, Unit internalUnit, MetricConfig templateConfig) { + this.kind = kind; + this.internalUnit = internalUnit; + this.templateConfig = templateConfig; + } + + public MetricConfig getDefaultConfig() { + if (defaultConfig == null) { + defaultConfig = + templateConfig + .toBuilder() + .withMetric(this) + .withBuiltInAttributes(allBuiltInAttributes) + .build(); + } + return defaultConfig; + } + + public Kind getKind() { + return kind; + } + + public Unit getInternalUnit() { + return internalUnit; + } + } + + public interface ConfigurableMetric { + MetricConfig getDefaultConfig(); + + Kind getKind(); + + Unit getInternalUnit(); + } + + // Kind of a metric. + public enum Kind { + COUNTER, + // Has buckets. + HISTOGRAM, + // UPDOWNCOUNTER, + // GAUGE + } + + public enum Unit { + NANOSECONDS("ns", 1), + MICROSECONDS("us", 1_000), + MILLISECONDS("ms", 1_000_000), + SECONDS("s", 1_000_000_000), + OCCURENCES("1"); + + private final String unit; + private final long baseFactor; + + Unit(String unit, long baseFactor) { + this.unit = unit; + this.baseFactor = baseFactor; + } + + Unit(String unit) { + this.unit = unit; + this.baseFactor = 1; + } + + @Override + public String toString() { + return unit; + } + + public double convert(double value, Unit to) { + return value * ((double) this.baseFactor / to.baseFactor); + } + + public long convert(long value, Unit to) { + return Math.round(value * ((double) this.baseFactor / to.baseFactor)); + } + } + + /** + * The MetricConfig abstract class represents the configuration of a metric. There are two + * concrete subclasses: LongMetricConfig and DoubleMetricConfig, which represent configurations + * for metrics with long and double values, respectively. These subclasses also allow specifying + * buckets for histograms. + */ + public abstract static class MetricConfig { + protected final ConfigurableMetric metric; + protected final String name; + protected final String description; + protected final String unit; + protected final Unit reportedUnit; + protected final Map attributes; + protected final Map builtInAttributes; + + private MetricConfig(Builder builder) { + this.metric = builder.metric; + this.name = builder.name; + this.description = builder.description; + this.unit = builder.unit; + this.reportedUnit = builder.reportedUnit; + this.attributes = builder.attributes; + this.builtInAttributes = builder.builtInAttributes; + } + + protected abstract Builder toBuilder(); + + public abstract DoubleMetricConfig.Builder toDoubleBuilder(); + + public abstract LongMetricConfig.Builder toLongBuilder(); + + public abstract static class Builder> { + // These are the defaults for a metric config. + protected ConfigurableMetric metric; + protected String name; + protected String description = ""; + protected String unit = "1"; + protected Unit reportedUnit = Unit.OCCURENCES; + // Attributes allowed. A map from what internal attribute name is to how it should be + // reported. + protected Map attributes = Collections.unmodifiableMap(new HashMap<>()); + // Built-in attributes. + protected Map builtInAttributes = Collections.unmodifiableMap(new HashMap<>()); + + public abstract MetricConfig build(); + + protected abstract T self(); + + private Builder(String name) { + this.withName(name); + } + + protected T withMetric(ConfigurableMetric metric) { + this.metric = metric; + return self(); + } + + public T withName(String name) { + Preconditions.checkNotNull(name); + Preconditions.checkArgument(!name.isEmpty()); + this.name = name; + return self(); + } + + public T withDescription(String description) { + Preconditions.checkNotNull(description); + this.description = description; + return self(); + } + + public T withUnit(String unit) { + Preconditions.checkNotNull(unit); + this.unit = unit; + return self(); + } + + public T withUnit(Unit unit) { + Preconditions.checkNotNull(unit); + this.reportedUnit = unit; + this.unit = unit.toString(); + return self(); + } + + public T withAttributes(Map attributes) { + Preconditions.checkNotNull(attributes); + this.attributes = Collections.unmodifiableMap(new HashMap<>(attributes)); + return self(); + } + + public T withBuiltInAttributes(Map builtInAttributes) { + Preconditions.checkNotNull(builtInAttributes); + this.builtInAttributes = Collections.unmodifiableMap(new HashMap<>(builtInAttributes)); + return self(); + } + } + } + + public static class LongMetricConfig extends MetricConfig { + private final List buckets; + + private LongMetricConfig(Builder builder) { + super(builder); + this.buckets = builder.buckets; + } + + @Override + protected Builder toBuilder() { + return new Builder(name) + .withMetric(metric) + .withDescription(description) + .withUnit(unit) + .withAttributes(attributes) + .withBuiltInAttributes(builtInAttributes) + .withBuckets(buckets); + } + + @Override + public Builder toLongBuilder() { + return toBuilder(); + } + + @Override + public DoubleMetricConfig.Builder toDoubleBuilder() { + return new DoubleMetricConfig.Builder(name) + .withMetric(metric) + .withDescription(description) + .withUnit(unit) + .withAttributes(attributes) + .withBuiltInAttributes(builtInAttributes) + .withBuckets(buckets.stream().map(Long::doubleValue).collect(Collectors.toList())); + } + + public static class Builder extends MetricConfig.Builder { + private List buckets = new ArrayList<>(); + + private Builder(String name) { + super(name); + } + + public Builder withBuckets(List buckets) { + Preconditions.checkNotNull(buckets); + if (!buckets.isEmpty() && metric != null) { + Preconditions.checkArgument(metric.getKind() == Kind.HISTOGRAM); + } + this.buckets = buckets; + return this; + } + + @Override + public LongMetricConfig build() { + return new LongMetricConfig(this); + } + + @Override + protected Builder self() { + return this; + } + } + } + + public static class DoubleMetricConfig extends MetricConfig { + private final List buckets; + + public List getBuckets() { + return buckets; + } + + private DoubleMetricConfig(Builder builder) { + super(builder); + this.buckets = builder.buckets; + } + + @Override + protected Builder toBuilder() { + return new Builder(name) + .withMetric(metric) + .withDescription(description) + .withUnit(unit) + .withAttributes(attributes) + .withBuiltInAttributes(builtInAttributes) + .withBuckets(buckets); + } + + @Override + public Builder toDoubleBuilder() { + return toBuilder(); + } + + @Override + public LongMetricConfig.Builder toLongBuilder() { + return new LongMetricConfig.Builder(name) + .withMetric(metric) + .withDescription(description) + .withUnit(unit) + .withAttributes(attributes) + .withBuiltInAttributes(builtInAttributes) + .withBuckets(buckets.stream().map(Double::longValue).collect(Collectors.toList())); + } + + public static class Builder extends MetricConfig.Builder { + private List buckets = new ArrayList<>(); + + private Builder(String name) { + super(name); + } + + public Builder withBuckets(List buckets) { + Preconditions.checkNotNull(buckets); + if (!buckets.isEmpty() && metric != null) { + Preconditions.checkArgument(metric.getKind() == Kind.HISTOGRAM); + } + this.buckets = buckets; + return this; + } + + @Override + public DoubleMetricConfig build() { + return new DoubleMetricConfig(this); + } + + @Override + protected Builder self() { + return this; + } + } + } +} diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsConfig.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsConfig.java new file mode 100644 index 0000000..a3a1ad2 --- /dev/null +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsConfig.java @@ -0,0 +1,120 @@ +package com.google.cloud.grpc.metrics; + +import com.google.cloud.grpc.metrics.GcpMetrics.Attr; +import com.google.cloud.grpc.metrics.GcpMetrics.MetricConfig; +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class GcpMetricsConfig { + private static final String DEFAULT_METER_NAME = "grpc-gcp"; + + private final String meterName; + private final String metricPrefix; + private final Map staticAttributes; + // Attrs allowed for all metrics. + private final Map commonAttrKeysMap; + private final Map commonBuiltInAttrKeysMap; + private final Set enabledMetrics; + + public static Builder newBuilder() { + return new Builder(); + } + + public static GcpMetricsConfig defaultConfig() { + return newBuilder().build(); + } + + private GcpMetricsConfig(Builder builder) { + meterName = builder.meterName; + metricPrefix = builder.metricPrefix; + staticAttributes = builder.staticAttributes; + commonAttrKeysMap = builder.commonAttrKeysMap; + commonBuiltInAttrKeysMap = builder.commonBuiltInAttrKeysMap; + enabledMetrics = builder.enabledMetrics; + } + + public String getMeterName() { + return meterName; + } + + public String getMetricPrefix() { + return metricPrefix; + } + + public Map getStaticAttributes() { + return staticAttributes; + } + + public Map getCommonAttrKeysMap() { + return commonAttrKeysMap; + } + + public Map getCommonBuiltInAttrKeysMap() { + return commonBuiltInAttrKeysMap; + } + + public Set getEnabledMetrics() { + return enabledMetrics; + } + + public static class Builder { + private String meterName = DEFAULT_METER_NAME; + private String metricPrefix = ""; + private Map staticAttributes = + Collections.unmodifiableMap(Collections.emptyMap()); + // Attrs allowed for all metrics. + Map commonAttrKeysMap = new HashMap<>(); + // All built-in attributes enabled by default. + Map commonBuiltInAttrKeysMap = GcpMetrics.allBuiltInAttributes; + // All metrics enabled by default. + Set enabledMetrics = GcpMetrics.allMetrics; + + public Builder() {} + + public Builder withMeterName(String meterName) { + Preconditions.checkNotNull(meterName); + Preconditions.checkArgument(!meterName.isEmpty()); + this.meterName = meterName; + return this; + } + + public Builder withMetricPrefix(String metricPrefix) { + Preconditions.checkNotNull(metricPrefix); + this.metricPrefix = metricPrefix; + return this; + } + + public Builder withStaticAttributes(Map staticAttributes) { + Preconditions.checkNotNull(staticAttributes); + this.staticAttributes = Collections.unmodifiableMap(new HashMap<>(staticAttributes)); + return this; + } + + public Builder withCommonAttrKeysMap(Map commonAttrKeysMap) { + Preconditions.checkNotNull(commonAttrKeysMap); + this.commonAttrKeysMap = Collections.unmodifiableMap(new HashMap<>(commonAttrKeysMap)); + return this; + } + + public Builder withCommonBuiltInAttrKeysMap(Map commonBuiltInAttrKeysMap) { + Preconditions.checkNotNull(commonBuiltInAttrKeysMap); + this.commonBuiltInAttrKeysMap = + Collections.unmodifiableMap(new HashMap<>(commonBuiltInAttrKeysMap)); + return this; + } + + public Builder withEnabledMetrics(Set enabledMetrics) { + Preconditions.checkNotNull(enabledMetrics); + this.enabledMetrics = Collections.unmodifiableSet(new HashSet<>(enabledMetrics)); + return this; + } + + public GcpMetricsConfig build() { + return new GcpMetricsConfig(this); + } + } +} diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsInterceptor.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsInterceptor.java new file mode 100644 index 0000000..59ed4f9 --- /dev/null +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsInterceptor.java @@ -0,0 +1,180 @@ +package com.google.cloud.grpc.metrics; + +import com.google.cloud.grpc.metrics.GcpMetrics.Attr; +import com.google.cloud.grpc.metrics.GcpMetrics.ConfigurableMetric; +import com.google.cloud.grpc.metrics.GcpMetrics.Metric; +import com.google.common.base.Preconditions; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientStreamTracer; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import java.util.HashMap; +import java.util.Map; + +public class GcpMetricsInterceptor implements ClientInterceptor { + public static final CallOptions.Key GCP_TIMEKEEPER_KEY = + CallOptions.Key.create("GCPCallTimekeeper"); + + private static final ClientStreamTracer.Factory metricStreamTracerFactory = + new GcpMetricsStreamTracer.Factory(); + + private final GcpMetricsRecorder recorder; + + public static class CallTimekeeper { + private final Map attributes = new HashMap<>(); + private long start = 0; + private long streamTracerCreated = 0; + private long nameDelayed = 0; + private long pendingStreamCreated = 0; + private long streamCreated = 0; + private long headersSent = 0; + private long headersReceived = 0; + private long streamClosed = 0; + private long finish = 0; + + private GcpMetricsRecorder recorder; + + public CallTimekeeper() { + start = System.nanoTime(); + } + + public CallTimekeeper(CallOptions callOptions, GcpMetricsRecorder recorder) { + start = System.nanoTime(); + this.recorder = recorder; + } + + public void onStreamTracer() { + streamTracerCreated = System.nanoTime(); + } + + public void nameResolutionDelayedBy(Long nanos) { + this.nameDelayed = nanos; + } + + public void pendingStreamCreated() { + pendingStreamCreated = System.nanoTime(); + } + + public void streamCreated() { + streamCreated = System.nanoTime(); + } + + public void outboundHeaders() { + headersSent = System.nanoTime(); + } + + public void inboundHeaders() { + headersReceived = System.nanoTime(); + } + + public void streamClosed() { + streamClosed = System.nanoTime(); + } + + public void onClose() { + finish = System.nanoTime(); + calcAndReport(); + } + + private void calcAndReport() { + long totalTime = finish - start; + + Map recs = new HashMap<>(); + + if (streamTracerCreated > 0) { + recs.put(Metric.START_DELAY, streamTracerCreated - start); + } + + if (pendingStreamCreated > 0) { + recs.put(Metric.RESOLUTION_DELAY, pendingStreamCreated - streamTracerCreated); + } + + if (streamCreated > 0) { + if (pendingStreamCreated > 0) { + recs.put(Metric.CONNECTION_DELAY, streamCreated - pendingStreamCreated); + } else { + recs.put(Metric.CONNECTION_DELAY, 0); + } + } + if (headersSent > 0) { + // May include marshalling. + if (pendingStreamCreated > 0) { + // When connection was establishing. + recs.put(Metric.SEND_DELAY, headersSent - streamCreated); + } else { + // When connection was ready. + recs.put(Metric.SEND_DELAY, headersSent - streamTracerCreated); + } + } + if (headersReceived > 0) { + recs.put(Metric.HEADERS_LATENCY, headersReceived - headersSent); + } + if (streamClosed > 0) { + if (headersSent > 0) { + recs.put(Metric.RESPONSE_LATENCY, streamClosed - headersSent); + } + // Includes unmarshalling and interceptors logic. I.e. does not include application + // processing after onClose in GcpMetricsInterceptor is called. + recs.put(Metric.PROCESSING_DELAY, finish - streamClosed); + } + + recs.put(Metric.TOTAL_LATENCY, totalTime); + + recorder.recordAll(recs, attributes); + } + + public void setAttribute(Attr attr, String value) { + setAttribute(attr.toString(), value); + } + + public void setAttribute(String name, String value) { + if (value == null) { + attributes.remove(name); + return; + } + attributes.put(name, value); + } + } + + public GcpMetricsInterceptor(GcpMetricsRecorder recorder) { + Preconditions.checkNotNull(recorder); + this.recorder = recorder; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + + CallTimekeeper tk = new CallTimekeeper(callOptions, recorder); + tk.setAttribute(GcpMetrics.Attr.METHOD, methodDescriptor.getFullMethodName()); + tk.setAttribute(GcpMetrics.Attr.METHOD_TYPE, methodDescriptor.getType().toString()); + + ClientCall call = + channel.newCall( + methodDescriptor, + callOptions + .withStreamTracerFactory(metricStreamTracerFactory) + .withOption(GCP_TIMEKEEPER_KEY, tk)); + + return new SimpleForwardingClientCall(call) { + public void start(Listener responseListener, Metadata headers) { + Listener newListener = + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onClose(Status status, Metadata trailers) { + super.onClose(status, trailers); + tk.setAttribute(GcpMetrics.Attr.STATUS, status.getCode().toString()); + tk.onClose(); + } + }; + delegate().start(newListener, headers); + } + }; + } +} diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsRecorder.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsRecorder.java new file mode 100644 index 0000000..5defafc --- /dev/null +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsRecorder.java @@ -0,0 +1,12 @@ +package com.google.cloud.grpc.metrics; + +import com.google.cloud.grpc.metrics.GcpMetrics.ConfigurableMetric; +import java.util.Map; + +public interface GcpMetricsRecorder { + void record(ConfigurableMetric metric, long value, Map attributes); + + void record(ConfigurableMetric metric, double value, Map attributes); + + void recordAll(Map recs, Map attributes); +} diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsStreamTracer.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsStreamTracer.java new file mode 100644 index 0000000..e27c162 --- /dev/null +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpMetricsStreamTracer.java @@ -0,0 +1,89 @@ +package com.google.cloud.grpc.metrics; + +import com.google.cloud.grpc.metrics.GcpMetricsInterceptor.CallTimekeeper; +import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; +import io.grpc.Metadata; +import io.grpc.Status; +import java.util.logging.Logger; + +public class GcpMetricsStreamTracer extends ClientStreamTracer { + private static final Logger logger = Logger.getLogger(GcpMetricsStreamTracer.class.getName()); + private final long callStartedAt; + private volatile long nameResolvedIn = 0; + private volatile long connectionReadyIn = 0; + private volatile long sentIn = 0; + private long headersLatency = 0; + + private CallTimekeeper tk; + + public GcpMetricsStreamTracer(StreamInfo streamInfo) { + tk = streamInfo.getCallOptions().getOption(GcpMetricsInterceptor.GCP_TIMEKEEPER_KEY); + tk.onStreamTracer(); + tk.setAttribute( + GcpMetrics.Attr.TRANSPARENT_RETRY, String.valueOf(streamInfo.isTransparentRetry())); + + callStartedAt = System.nanoTime(); + Long nameDelayed = + streamInfo.getCallOptions().getOption(ClientStreamTracer.NAME_RESOLUTION_DELAYED); + + if (nameDelayed != null) { + tk.nameResolutionDelayedBy(nameDelayed); + } + } + + public void createPendingStream() { + // Name resolution is completed and the connection starts getting established. This method is + // only invoked on the streams that encounter such delay. + if (nameResolvedIn == 0) { + tk.pendingStreamCreated(); + nameResolvedIn = System.nanoTime() - callStartedAt; + } else { + logger.fine( + "createPendingStream called more than once. Already measures as " + nameResolvedIn); + } + } + + public void streamCreated(Attributes transportAttrs, Metadata headers) { + // The stream is being created on a ready transport. + if (connectionReadyIn == 0) { + tk.streamCreated(); + connectionReadyIn = System.nanoTime() - callStartedAt - nameResolvedIn; + } else { + logger.fine( + "OOOPS, streamCreated called more than once. Already measures as " + connectionReadyIn); + } + } + + public void outboundHeaders() { + // Headers has been sent to the socket. + if (sentIn == 0) { + tk.outboundHeaders(); + sentIn = System.nanoTime() - callStartedAt - nameResolvedIn - connectionReadyIn; + } else { + logger.fine("outboundHeaders called more than once. Already measures as " + sentIn); + } + } + + public void inboundHeaders() { + // Headers has been received from the server. + if (headersLatency == 0) { + tk.inboundHeaders(); + headersLatency = + System.nanoTime() - callStartedAt - nameResolvedIn - connectionReadyIn - sentIn; + } else { + logger.fine("inboundHeaders called more than once. Already measures as " + headersLatency); + } + } + + public void streamClosed(Status status) { + // Stream is closed. This will be called exactly once. + tk.streamClosed(); + } + + public static class Factory extends ClientStreamTracer.Factory { + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new GcpMetricsStreamTracer(info); + } + } +} diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpOtMetricsRecorder.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpOtMetricsRecorder.java new file mode 100644 index 0000000..56a228b --- /dev/null +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/metrics/GcpOtMetricsRecorder.java @@ -0,0 +1,294 @@ +package com.google.cloud.grpc.metrics; + +import com.google.cloud.grpc.metrics.GcpMetrics.ConfigurableMetric; +import com.google.cloud.grpc.metrics.GcpMetrics.DoubleMetricConfig; +import com.google.cloud.grpc.metrics.GcpMetrics.MetricConfig; +import com.google.common.base.Preconditions; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class GcpOtMetricsRecorder implements GcpMetricsRecorder { + private final Map recordMap = new HashMap<>(); + private final Attributes staticAttributes; + private final Map> attrsKeyMap; + private final Map> oTelAttrKeyMap; + + private enum MType { + DOUBLE_HISTOGRAM, + LONG_HISTOGRAM, + DOUBLE_COUNTER, + LONG_COUNTER + } + + private class MetricWrapper { + private final MetricConfig metricConfig; + private final Map> attrMap; + + private DoubleHistogram dh; + private LongHistogram lh; + private DoubleCounter dc; + private LongCounter lc; + + private MType mType; + + public MetricWrapper(Meter meter, GcpMetrics.MetricConfig metricConfig) { + this.metricConfig = metricConfig; + + // Prepare allowed attribute keys map for the metric. + Map> aMap = new HashMap<>(); + // Include commonly allowed attributes. + aMap.putAll(attrsKeyMap); + + // Include built-in attributes specific to this metric. + metricConfig.builtInAttributes.forEach( + (attr, reportAs) -> { + aMap.put(attr.toString(), oTelAttrKeyMap.get(reportAs)); + }); + // Include attributes specific to this metric. + metricConfig.attributes.forEach( + (attr, reportAs) -> { + aMap.put(attr, oTelAttrKeyMap.get(reportAs)); + }); + this.attrMap = Collections.unmodifiableMap(aMap); + + if (metricConfig instanceof DoubleMetricConfig) { + switch (metricConfig.metric.getKind()) { + case HISTOGRAM: + mType = MType.DOUBLE_HISTOGRAM; + DoubleHistogramBuilder builder = + meter + .histogramBuilder(metricConfig.name) + .setDescription(metricConfig.description) + .setUnit(metricConfig.unit); + if (!((DoubleMetricConfig) metricConfig).getBuckets().isEmpty()) { + builder.setExplicitBucketBoundariesAdvice( + ((DoubleMetricConfig) metricConfig).getBuckets()); + } + dh = builder.build(); + + break; + case COUNTER: + mType = MType.DOUBLE_COUNTER; + } + } else { + switch (metricConfig.metric.getKind()) { + case HISTOGRAM: + mType = MType.LONG_HISTOGRAM; + break; + case COUNTER: + mType = MType.LONG_COUNTER; + lc = + meter + .counterBuilder(metricConfig.name) + .setDescription(metricConfig.description) + .setUnit(metricConfig.unit) + .build(); + } + } + } + + public void record(long value, Attributes attributes) { + switch (mType) { + case LONG_COUNTER: + lc.add( + metricConfig.metric.getInternalUnit().convert(value, metricConfig.reportedUnit), + attributes, + Context.current()); + break; + case DOUBLE_COUNTER: + dc.add( + metricConfig + .metric + .getInternalUnit() + .convert(((Long) value).doubleValue(), metricConfig.reportedUnit), + attributes, + Context.current()); + break; + case LONG_HISTOGRAM: + lh.record( + metricConfig.metric.getInternalUnit().convert(value, metricConfig.reportedUnit), + attributes, + Context.current()); + break; + case DOUBLE_HISTOGRAM: + dh.record( + metricConfig + .metric + .getInternalUnit() + .convert(((Long) value).doubleValue(), metricConfig.reportedUnit), + attributes, + Context.current()); + break; + } + } + + public void record(double value, Attributes attributes) { + switch (mType) { + case LONG_COUNTER: + lc.add( + ((Double) + metricConfig + .metric + .getInternalUnit() + .convert(value, metricConfig.reportedUnit)) + .longValue(), + attributes, + Context.current()); + break; + case DOUBLE_COUNTER: + dc.add( + metricConfig.metric.getInternalUnit().convert(value, metricConfig.reportedUnit), + attributes, + Context.current()); + break; + case LONG_HISTOGRAM: + lh.record( + ((Double) + metricConfig + .metric + .getInternalUnit() + .convert(value, metricConfig.reportedUnit)) + .longValue(), + attributes, + Context.current()); + break; + case DOUBLE_HISTOGRAM: + dh.record( + metricConfig.metric.getInternalUnit().convert(value, metricConfig.reportedUnit), + attributes, + Context.current()); + break; + } + } + } + + public GcpOtMetricsRecorder(OpenTelemetry openTelemetry, GcpMetricsConfig config) { + Meter meter = + openTelemetry + .meterBuilder(config.getMeterName()) + // TODO: move to config. + .setInstrumentationVersion( + GcpOtMetricsRecorder.class.getPackage().getImplementationVersion()) + .build(); + + Map> oMap = new HashMap<>(); + config + .getStaticAttributes() + .keySet() + .forEach((key) -> oMap.put(key, AttributeKey.stringKey(key))); + + config + .getEnabledMetrics() + .forEach( + metricConfig -> { + metricConfig + .builtInAttributes + .values() + .forEach((recordAs) -> oMap.put(recordAs, AttributeKey.stringKey(recordAs))); + metricConfig + .attributes + .values() + .forEach((recordAs) -> oMap.put(recordAs, AttributeKey.stringKey(recordAs))); + }); + + oTelAttrKeyMap = Collections.unmodifiableMap(oMap); + + AttributesBuilder staticAttributesBuilder = Attributes.builder(); + config.getStaticAttributes().forEach(staticAttributesBuilder::put); + staticAttributes = staticAttributesBuilder.build(); + + Map> aMap = new HashMap<>(); + config + .getCommonBuiltInAttrKeysMap() + .forEach( + (attr, reportAs) -> { + aMap.put(attr.toString(), oTelAttrKeyMap.get(reportAs)); + }); + config + .getCommonAttrKeysMap() + .forEach( + (attr, reportAs) -> { + aMap.put(attr, oTelAttrKeyMap.get(reportAs)); + }); + attrsKeyMap = Collections.unmodifiableMap(aMap); + + config + .getEnabledMetrics() + .forEach( + metricConfig -> { + recordMap.put(metricConfig.metric, new MetricWrapper(meter, metricConfig)); + }); + } + + @Override + public void record(ConfigurableMetric metric, long value, Map attributes) { + MetricWrapper wrapper = recordMap.get(metric); + if (wrapper == null) { + return; + } + wrapper.record(value, toOtelAttributes(attributes, wrapper)); + } + + @Override + public void record(ConfigurableMetric metric, double value, Map attributes) { + MetricWrapper wrapper = recordMap.get(metric); + if (wrapper == null) { + return; + } + wrapper.record(value, toOtelAttributes(attributes, wrapper)); + } + + @Override + public void recordAll(Map recs, Map attributes) { + // Cache for resulting attributes. + Map>, Attributes> attrCache = new HashMap<>(); + + recs.forEach( + (metric, number) -> { + MetricWrapper wrapper = recordMap.get(metric); + if (wrapper != null) { + attrCache.computeIfAbsent( + wrapper.attrMap, (attrMap) -> toOtelAttributes(attributes, attrMap)); + Attributes attrs = attrCache.get(wrapper.attrMap); + + if (number instanceof Long) { + wrapper.record((long) number, attrs); + } + if (number instanceof Double) { + wrapper.record((double) number, attrs); + } + } + }); + } + + private Attributes toOtelAttributes( + Map attributes, Map> attrMap) { + Preconditions.checkNotNull(attributes); + + AttributesBuilder attributesBuilder = staticAttributes.toBuilder(); + attributes.forEach( + (key, value) -> { + if (attrMap.containsKey(key)) { + attributesBuilder.put(attrMap.get(key), value); + } + }); + + return attributesBuilder.build(); + } + + private Attributes toOtelAttributes(Map attributes, MetricWrapper wrapper) { + return toOtelAttributes(attributes, wrapper.attrMap); + } +}