From 6497b7d8655080bea179e9f59fe71334cd00fee2 Mon Sep 17 00:00:00 2001 From: Ronak Date: Thu, 22 Jul 2021 22:12:37 +0530 Subject: [PATCH 01/10] adding otel steps --- .../span-normalizer-api/build.gradle.kts | 1 + .../core/spannormalizer/SpanNormalizer.java | 32 +++++++++++++ .../otel/OtelMetricProcessor.java | 34 ++++++++++++++ .../spannormalizer/otel/OtelMetricSerde.java | 47 +++++++++++++++++++ 4 files changed, 114 insertions(+) create mode 100644 span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java create mode 100644 span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java diff --git a/span-normalizer/span-normalizer-api/build.gradle.kts b/span-normalizer/span-normalizer-api/build.gradle.kts index 81a6197b9..08b8d7f91 100644 --- a/span-normalizer/span-normalizer-api/build.gradle.kts +++ b/span-normalizer/span-normalizer-api/build.gradle.kts @@ -63,4 +63,5 @@ dependencies { because("Multiple vulnerabilities in avro-declared version") } } + api("io.opentelemetry:opentelemetry-proto:0.3.0") } diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java index 10ca32c34..bfa800472 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java @@ -7,6 +7,7 @@ import com.typesafe.config.Config; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import java.util.Collections; import java.util.List; import java.util.Map; @@ -21,6 +22,8 @@ import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToAvroRawSpanTransformer; import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToLogRecordsTransformer; import org.hypertrace.core.spannormalizer.jaeger.PreProcessedSpan; +import org.hypertrace.core.spannormalizer.otel.OtelMetricProcessor; +import org.hypertrace.core.spannormalizer.otel.OtelMetricSerde; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +58,38 @@ public StreamsBuilder buildTopology( inputStream.transform(JaegerSpanPreProcessor::new); preProcessedStream.transform(JaegerSpanToAvroRawSpanTransformer::new).to(outputTopic); preProcessedStream.transform(JaegerSpanToLogRecordsTransformer::new).to(outputTopicRawLogs); + + // add metrics processor + + KStream secondProcessor = + (KStream) inputStreams.get("otel-metrics"); + if (secondProcessor == null) { + secondProcessor = + streamsBuilder.stream( + "otel-metrics", Consumed.with(Serdes.ByteArray(), new OtelMetricSerde())); + inputStreams.put(inputTopic, secondProcessor); + } + + KStream otelMetricStream = + secondProcessor.transform(OtelMetricProcessor::new); + return streamsBuilder; } + private void addStreamIfNeeded( + StreamsBuilder streamsBuilder, + Map> inputStreams, + String inputTopicName, + String nodeName) { + KStream inputStream = inputStreams.get(inputTopicName); + if (inputStream == null) { + inputStream = + streamsBuilder.stream( + inputTopicName, Consumed.with(Serdes.String(), null).withName(nodeName)); + inputStreams.put(inputTopicName, inputStream); + } + } + @Override public String getJobConfigKey() { return SPAN_NORMALIZER_JOB_CONFIG; diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java new file mode 100644 index 000000000..f0f6d8f5d --- /dev/null +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java @@ -0,0 +1,34 @@ +package org.hypertrace.core.spannormalizer.otel; + +import com.google.protobuf.util.JsonFormat; +import com.google.protobuf.util.JsonFormat.Printer; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OtelMetricProcessor + implements Transformer> { + private static final Logger LOG = LoggerFactory.getLogger(OtelMetricProcessor.class); + + private static Printer printer = JsonFormat.printer().omittingInsignificantWhitespace(); + + @Override + public void init(final ProcessorContext context) {} + + @Override + public void close() {} + + @Override + public KeyValue transform( + final byte[] key, final ResourceMetrics value) { + try { + LOG.info(printer.print(value)); + } catch (Exception e) { + LOG.error("parsing exception:", e); + } + return new KeyValue<>(key, value); + } +} diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java new file mode 100644 index 000000000..90a89b2e8 --- /dev/null +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java @@ -0,0 +1,47 @@ +package org.hypertrace.core.spannormalizer.otel; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public class OtelMetricSerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtelMetricSerde.Ser(); + } + + @Override + public Deserializer deserializer() { + return new OtelMetricSerde.De(); + } + + public static class Ser implements Serializer { + + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + return data.toByteArray(); + } + } + + public static class De implements Deserializer { + + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} From f9a94c4e03dc17c9bfc093ad7522ad90d45838c6 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 11 Aug 2021 12:32:43 +0530 Subject: [PATCH 02/10] adds support for creating topic --- .../org/hypertrace/core/spannormalizer/SpanNormalizer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java index bfa800472..95ed1e164 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java @@ -8,7 +8,6 @@ import com.typesafe.config.Config; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; @@ -103,7 +102,7 @@ public Logger getLogger() { @Override public List getInputTopics(Map properties) { Config jobConfig = getJobConfig(properties); - return Collections.singletonList(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY), "otel-metrics"); } @Override From d649fe7818b89190211b02853e448750dbdd53dc Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 17 Sep 2021 09:27:52 +0530 Subject: [PATCH 03/10] update the input --- .../java/org/hypertrace/core/spannormalizer/SpanNormalizer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java index 95ed1e164..7a7ea3f57 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java @@ -66,7 +66,7 @@ public StreamsBuilder buildTopology( secondProcessor = streamsBuilder.stream( "otel-metrics", Consumed.with(Serdes.ByteArray(), new OtelMetricSerde())); - inputStreams.put(inputTopic, secondProcessor); + inputStreams.put("otel-metrics", secondProcessor); } KStream otelMetricStream = From bde5b5c2fb4a6e792d02297071e8d4409a35f268 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 22 Sep 2021 16:50:41 +0530 Subject: [PATCH 04/10] feat : basic framework layout for metrics generator and processor --- hypertrace-ingester/build.gradle.kts | 16 ++-- .../ingester/HypertraceIngester.java | 8 ++ .../hypertrace-ingester/application.conf | 8 +- .../src/main/resources/log4j2.properties | 29 +++++++ hypertrace-metrics-generator/build.gradle.kts | 3 + .../build.gradle.kts | 46 +++++++++++ .../metrics/generator/MetricsExtractor.java | 66 +++++++++++++++ .../metrics/generator/MetricsGenerator.java | 76 ++++++++++++++++++ .../metrics/generator/OtelMetricsSerde.java | 45 +++++++++++ .../resources/configs/common/application.conf | 34 ++++++++ .../src/main/resources/log4j2.properties | 23 ++++++ hypertrace-metrics-processor/build.gradle.kts | 3 + .../build.gradle.kts | 46 +++++++++++ .../metrics/processor/MetricsEnricher.java | 22 +++++ .../metrics/processor/MetricsExporter.java | 22 +++++ .../metrics/processor/MetricsNormalizer.java | 22 +++++ .../metrics/processor/MetricsProcessor.java | 80 +++++++++++++++++++ .../metrics/processor/OtlpMetricsSerde.java | 45 +++++++++++ .../resources/configs/common/application.conf | 34 ++++++++ .../src/main/resources/log4j2.properties | 23 ++++++ settings.gradle.kts | 4 + .../core/spannormalizer/SpanNormalizer.java | 25 +++--- 22 files changed, 660 insertions(+), 20 deletions(-) create mode 100644 hypertrace-ingester/src/main/resources/log4j2.properties create mode 100644 hypertrace-metrics-generator/build.gradle.kts create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsExtractor.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtelMetricsSerde.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties create mode 100644 hypertrace-metrics-processor/build.gradle.kts create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf create mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 4cd0feebc..81d9ab593 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -40,6 +40,8 @@ dependencies { implementation(project(":raw-spans-grouper:raw-spans-grouper")) implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher")) implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) + implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator")) + implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor")) testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") @@ -64,7 +66,9 @@ tasks.register("copyServiceConfigs") { createCopySpec("span-normalizer", "span-normalizer", "main", "common"), createCopySpec("raw-spans-grouper", "raw-spans-grouper", "main", "common"), createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "main", "common"), - createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common") + createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common"), + createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "main", "common"), + createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", "common") ).into("./build/resources/main/configs/") } @@ -101,10 +105,12 @@ tasks.test { tasks.register("copyServiceConfigsTest") { with( - createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"), - createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"), - createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"), - createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator") + createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"), + createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"), + createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"), + createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator"), + createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "test", "hypertrace-metrics-generator"), + createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", "hypertrace-metrics-processor") ).into("./build/resources/test/configs/") } diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index 4ab59dedf..aa62a2b6a 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -17,6 +17,8 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; +import org.hypertrace.metrics.generator.MetricsGenerator; +import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +51,12 @@ private KafkaStreamsApp getSubTopologyInstance(String name) { case "all-views": kafkaStreamsApp = new MultiViewGeneratorLauncher(ConfigClientFactory.getClient()); break; + case "hypertrace-metrics-generator": + kafkaStreamsApp = new MetricsGenerator(ConfigClientFactory.getClient()); + break; + case "hypertrace-metrics-processor": + kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient()); + break; default: throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name)); } diff --git a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf index 5c9946df6..0a09e37b5 100644 --- a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf +++ b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf @@ -3,7 +3,13 @@ main.class = org.hypertrace.ingester.HypertraceIngester service.name = hypertrace-ingester service.admin.port = 8099 -sub.topology.names = ["span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views"] +sub.topology.names = [ + "span-normalizer", + "raw-spans-grouper", + "hypertrace-trace-enricher", + "all-views", + "hypertrace-metrics-generator", + "hypertrace-metrics-processor"] precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS} diff --git a/hypertrace-ingester/src/main/resources/log4j2.properties b/hypertrace-ingester/src/main/resources/log4j2.properties new file mode 100644 index 000000000..bdcf9b332 --- /dev/null +++ b/hypertrace-ingester/src/main/resources/log4j2.properties @@ -0,0 +1,29 @@ +status = error +name = PropertiesConfig + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n + +appender.rolling.type = RollingFile +appender.rolling.name = ROLLING_FILE +appender.rolling.fileName = ${sys:service.name:-hypertrace-ingester}.log +appender.rolling.filePattern = ${sys:service.name:-hypertrace-ingester}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type = PatternLayout +appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type = Policies +appender.rolling.policies.time.type = TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval = 3600 +appender.rolling.policies.time.modulate = true +appender.rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.rolling.policies.size.size = 20MB +appender.rolling.strategy.type = DefaultRolloverStrategy +appender.rolling.strategy.max = 5 + +rootLogger.level = INFO +rootLogger.appenderRef.stdout.ref = STDOUT +rootLogger.appenderRef.rolling.ref = ROLLING_FILE + + + diff --git a/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..b1720497c --- /dev/null +++ b/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.generator" +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..a51868e86 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,46 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-api:1.4.1") + implementation("io.opentelemetry:opentelemetry-api-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-sdk:1.4.1") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsExtractor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsExtractor.java new file mode 100644 index 000000000..dd13c3392 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsExtractor.java @@ -0,0 +1,66 @@ +package org.hypertrace.metrics.generator; + +import io.opentelemetry.api.metrics.GlobalMeterProvider; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.exporter.otlp.internal.MetricAdapter; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.hypertrace.viewgenerator.api.RawServiceView; + +public class MetricsExtractor + implements Transformer> { + + private LongUpDownCounter longUpDownCounter; + private long resetCounter = 0; + private SdkMeterProvider sdkMeterProvider; + + @Override + public void init(ProcessorContext context) { + SdkMeterProviderBuilder sdkMeterProviderBuilder = SdkMeterProvider.builder(); + sdkMeterProvider = sdkMeterProviderBuilder.buildAndRegisterGlobal(); + resetCounter = 0; + + Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.example.metrics", "1.4.1"); + + this.longUpDownCounter = + meter + .longUpDownCounterBuilder("num_calls") + .setDescription("Measure the number of calls") + .setUnit("1") + .build(); + } + + @Override + public KeyValue transform(String key, RawServiceView value) { + Labels labels = + Labels.of( + "tenant_id", value.getTenantId(), + "consumer_id", "1", + "service_id", value.getServiceId(), + "service_name", value.getServiceName(), + "api_id", value.getApiId(), + "api_name", value.getApiName()); + longUpDownCounter.add(value.getNumCalls(), labels); + if (resetCounter % 10 == 0) { + resetCounter = 0; + Collection metricData = sdkMeterProvider.collectAllMetrics(); + List resourceMetrics = MetricAdapter.toProtoResourceMetrics(metricData); + if (resourceMetrics.size() > 0) { + return new KeyValue<>(null, resourceMetrics.get(0)); + } + } + return null; + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java new file mode 100644 index 000000000..a369e742e --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java @@ -0,0 +1,76 @@ +package org.hypertrace.metrics.generator; + +import com.typesafe.config.Config; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsGenerator extends KafkaStreamsApp { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsGenerator.class); + private static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + private static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + private static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; + + public MetricsGenerator(ConfigClient configClient) { + super(configClient); + } + + @Override + public StreamsBuilder buildTopology( + Map streamsProperties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + + Config jobConfig = getJobConfig(streamsProperties); + String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); + String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + KStream inputStream = + (KStream) inputStreams.get(inputTopic); + if (inputStream == null) { + inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), (Serde) null)); + inputStreams.put(inputTopic, inputStream); + } + + inputStream + .transform(MetricsExtractor::new) + .to(outputTopic, Produced.with(Serdes.ByteArray(), new OtelMetricsSerde())); + + return streamsBuilder; + } + + @Override + public String getJobConfigKey() { + return METRICS_GENERATOR_JOB_CONFIG; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public List getInputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + } + + @Override + public List getOutputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); + } + + private Config getJobConfig(Map properties) { + return (Config) properties.get(getJobConfigKey()); + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtelMetricsSerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtelMetricsSerde.java new file mode 100644 index 000000000..16130e807 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtelMetricsSerde.java @@ -0,0 +1,45 @@ +package org.hypertrace.metrics.generator; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public class OtelMetricsSerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtelMetricsSerde.Ser(); + } + + @Override + public Deserializer deserializer() { + return new OtelMetricsSerde.De(); + } + + public static class Ser implements Serializer { + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + return data.toByteArray(); + } + } + + public static class De implements Deserializer { + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..af1d5ad52 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf @@ -0,0 +1,34 @@ +service.name = hypertrace-metrics-generator +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.generator.MetricsGenerator + +input.topic = "raw-service-view-events" +output.topic = "otlp-metrics" +input.class = org.hypertrace.viewgenerator.api.RawServiceView +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-from-raw-service-view-events-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.generator.MetricsGenerator +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/build.gradle.kts new file mode 100644 index 000000000..870768c84 --- /dev/null +++ b/hypertrace-metrics-processor/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.processor" +} \ No newline at end of file diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts new file mode 100644 index 000000000..a51868e86 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts @@ -0,0 +1,46 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-api:1.4.1") + implementation("io.opentelemetry:opentelemetry-api-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-sdk:1.4.1") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java new file mode 100644 index 000000000..32b2a4e69 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java @@ -0,0 +1,22 @@ +package org.hypertrace.metrics.processor; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +public class MetricsEnricher + implements Transformer> { + + @Override + public void init(ProcessorContext context) {} + + @Override + public KeyValue transform(byte[] key, ResourceMetrics value) { + // noop enricher for now + return new KeyValue<>(key, value); + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java new file mode 100644 index 000000000..137674623 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java @@ -0,0 +1,22 @@ +package org.hypertrace.metrics.processor; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +public class MetricsExporter + implements Transformer> { + + @Override + public void init(ProcessorContext context) {} + + @Override + public KeyValue transform(byte[] key, ResourceMetrics value) { + // noop normalizer for now + return new KeyValue<>(key, value); + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java new file mode 100644 index 000000000..e16da2518 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java @@ -0,0 +1,22 @@ +package org.hypertrace.metrics.processor; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +public class MetricsNormalizer + implements Transformer> { + + @Override + public void init(ProcessorContext context) {} + + @Override + public KeyValue transform(byte[] key, ResourceMetrics value) { + // noop normalizer for now + return new KeyValue<>(key, value); + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java new file mode 100644 index 000000000..81c89ac27 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java @@ -0,0 +1,80 @@ +package org.hypertrace.metrics.processor; + +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsProcessor extends KafkaStreamsApp { + private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class); + private static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + private static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + private static final String METRICS_PROCESSOR_JOB_CONFIG = "metrics-processor-job-config"; + + public MetricsProcessor(ConfigClient configClient) { + super(configClient); + } + + @Override + public StreamsBuilder buildTopology( + Map streamsProperties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + + Config jobConfig = getJobConfig(streamsProperties); + String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); + String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + + // input stream + KStream inputStream = + (KStream) inputStreams.get(inputTopic); + if (inputStream == null) { + inputStream = + streamsBuilder.stream( + inputTopic, Consumed.with(Serdes.ByteArray(), new OtlpMetricsSerde())); + inputStreams.put(inputTopic, inputStream); + } + + inputStream + .transform(MetricsNormalizer::new) + .transform(MetricsEnricher::new) + .to(outputTopic, Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde())); + + return streamsBuilder; + } + + @Override + public String getJobConfigKey() { + return METRICS_PROCESSOR_JOB_CONFIG; + } + + @Override + public Logger getLogger() { + return logger; + } + + @Override + public List getInputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + } + + @Override + public List getOutputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); + } + + private Config getJobConfig(Map properties) { + return (Config) properties.get(getJobConfigKey()); + } +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java new file mode 100644 index 000000000..21c82cb75 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java @@ -0,0 +1,45 @@ +package org.hypertrace.metrics.processor; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public class OtlpMetricsSerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtlpMetricsSerde.Ser(); + } + + @Override + public Deserializer deserializer() { + return new OtlpMetricsSerde.De(); + } + + public static class Ser implements Serializer { + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + return data.toByteArray(); + } + } + + public static class De implements Deserializer { + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..a66eb4749 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf @@ -0,0 +1,34 @@ +service.name = hypertrace-metrics-processor +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.processor.MetricsProcessor + +input.topic = "otlp-metrics" +output.topic = "enriched-otlp-metrics" + +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-processor-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-processor" + +metrics.reporter.prefix = org.hypertrace.metrics.processor.MetricsProcessor +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/settings.gradle.kts b/settings.gradle.kts index 2ca1d9e44..72a3dc563 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -34,6 +34,10 @@ include("span-normalizer:span-normalizer") include("span-normalizer:raw-span-constants") include("span-normalizer:span-normalizer-constants") +// metrics pipeline +include("hypertrace-metrics-generator:hypertrace-metrics-generator") +include("hypertrace-metrics-processor:hypertrace-metrics-processor") + // e2e pipeline include("hypertrace-ingester") include("semantic-convention-utils") diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java index 7a7ea3f57..845508f4e 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java @@ -7,7 +7,6 @@ import com.typesafe.config.Config; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import java.util.List; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; @@ -21,8 +20,6 @@ import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToAvroRawSpanTransformer; import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToLogRecordsTransformer; import org.hypertrace.core.spannormalizer.jaeger.PreProcessedSpan; -import org.hypertrace.core.spannormalizer.otel.OtelMetricProcessor; -import org.hypertrace.core.spannormalizer.otel.OtelMetricSerde; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,17 +57,17 @@ public StreamsBuilder buildTopology( // add metrics processor - KStream secondProcessor = - (KStream) inputStreams.get("otel-metrics"); - if (secondProcessor == null) { - secondProcessor = - streamsBuilder.stream( - "otel-metrics", Consumed.with(Serdes.ByteArray(), new OtelMetricSerde())); - inputStreams.put("otel-metrics", secondProcessor); - } - - KStream otelMetricStream = - secondProcessor.transform(OtelMetricProcessor::new); + // KStream secondProcessor = + // (KStream) inputStreams.get("otlp-metrics"); + // if (secondProcessor == null) { + // secondProcessor = + // streamsBuilder.stream( + // "otlp-metrics", Consumed.with(Serdes.ByteArray(), new OtelMetricSerde())); + // inputStreams.put("otlp-metrics", secondProcessor); + // } + // + // KStream otelMetricStream = + // secondProcessor.transform(OtelMetricProcessor::new); return streamsBuilder; } From 7ed5ae6bc05a2ba6891756d510066fb522067795 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 22 Sep 2021 17:35:38 +0530 Subject: [PATCH 05/10] clean up the span normalizer --- .../core/spannormalizer/SpanNormalizer.java | 32 +------------ .../otel/OtelMetricProcessor.java | 34 -------------- .../spannormalizer/otel/OtelMetricSerde.java | 47 ------------------- 3 files changed, 2 insertions(+), 111 deletions(-) delete mode 100644 span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java delete mode 100644 span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java index 845508f4e..10ca32c34 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java @@ -7,6 +7,7 @@ import com.typesafe.config.Config; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; @@ -54,38 +55,9 @@ public StreamsBuilder buildTopology( inputStream.transform(JaegerSpanPreProcessor::new); preProcessedStream.transform(JaegerSpanToAvroRawSpanTransformer::new).to(outputTopic); preProcessedStream.transform(JaegerSpanToLogRecordsTransformer::new).to(outputTopicRawLogs); - - // add metrics processor - - // KStream secondProcessor = - // (KStream) inputStreams.get("otlp-metrics"); - // if (secondProcessor == null) { - // secondProcessor = - // streamsBuilder.stream( - // "otlp-metrics", Consumed.with(Serdes.ByteArray(), new OtelMetricSerde())); - // inputStreams.put("otlp-metrics", secondProcessor); - // } - // - // KStream otelMetricStream = - // secondProcessor.transform(OtelMetricProcessor::new); - return streamsBuilder; } - private void addStreamIfNeeded( - StreamsBuilder streamsBuilder, - Map> inputStreams, - String inputTopicName, - String nodeName) { - KStream inputStream = inputStreams.get(inputTopicName); - if (inputStream == null) { - inputStream = - streamsBuilder.stream( - inputTopicName, Consumed.with(Serdes.String(), null).withName(nodeName)); - inputStreams.put(inputTopicName, inputStream); - } - } - @Override public String getJobConfigKey() { return SPAN_NORMALIZER_JOB_CONFIG; @@ -99,7 +71,7 @@ public Logger getLogger() { @Override public List getInputTopics(Map properties) { Config jobConfig = getJobConfig(properties); - return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY), "otel-metrics"); + return Collections.singletonList(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); } @Override diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java deleted file mode 100644 index f0f6d8f5d..000000000 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricProcessor.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.hypertrace.core.spannormalizer.otel; - -import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Printer; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OtelMetricProcessor - implements Transformer> { - private static final Logger LOG = LoggerFactory.getLogger(OtelMetricProcessor.class); - - private static Printer printer = JsonFormat.printer().omittingInsignificantWhitespace(); - - @Override - public void init(final ProcessorContext context) {} - - @Override - public void close() {} - - @Override - public KeyValue transform( - final byte[] key, final ResourceMetrics value) { - try { - LOG.info(printer.print(value)); - } catch (Exception e) { - LOG.error("parsing exception:", e); - } - return new KeyValue<>(key, value); - } -} diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java deleted file mode 100644 index 90a89b2e8..000000000 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/otel/OtelMetricSerde.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.hypertrace.core.spannormalizer.otel; - -import com.google.protobuf.InvalidProtocolBufferException; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import java.util.Map; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serializer; - -public class OtelMetricSerde implements Serde { - - @Override - public void configure(Map configs, boolean isKey) {} - - @Override - public void close() {} - - @Override - public Serializer serializer() { - return new OtelMetricSerde.Ser(); - } - - @Override - public Deserializer deserializer() { - return new OtelMetricSerde.De(); - } - - public static class Ser implements Serializer { - - @Override - public byte[] serialize(String topic, ResourceMetrics data) { - return data.toByteArray(); - } - } - - public static class De implements Deserializer { - - @Override - public ResourceMetrics deserialize(String topic, byte[] data) { - try { - return ResourceMetrics.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - } -} From f0e329b408fda0067c5fb9ce97f033789a401bef Mon Sep 17 00:00:00 2001 From: Ronak Date: Thu, 23 Sep 2021 16:43:40 +0530 Subject: [PATCH 06/10] feat: adds metrics exporter --- hypertrace-ingester/build.gradle.kts | 7 +- .../ingester/HypertraceIngester.java | 25 +++++ .../hypertrace-ingester/application.conf | 3 +- hypertrace-metrics-exporter/build.gradle.kts | 3 + .../build.gradle.kts | 49 ++++++++++ .../metrics/exporter/MetricsConsumer.java | 89 ++++++++++++++++++ .../metrics/exporter/MetricsExporter.java | 64 +++++++++++++ .../metrics/exporter/OtlpGrpcExporter.java | 91 +++++++++++++++++++ .../resources/configs/common/application.conf | 24 +++++ .../src/main/resources/log4j2.properties | 23 +++++ .../metrics/processor/MetricsExporter.java | 22 ----- settings.gradle.kts | 1 + 12 files changed, 376 insertions(+), 25 deletions(-) create mode 100644 hypertrace-metrics-exporter/build.gradle.kts create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties delete mode 100644 hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 81d9ab593..0c63f5398 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -42,6 +42,7 @@ dependencies { implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator")) implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor")) + implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter")) testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") @@ -68,7 +69,8 @@ tasks.register("copyServiceConfigs") { createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "main", "common"), createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common"), createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "main", "common"), - createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", "common") + createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", "common"), + createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "main", "common") ).into("./build/resources/main/configs/") } @@ -110,7 +112,8 @@ tasks.register("copyServiceConfigsTest") { createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"), createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator"), createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "test", "hypertrace-metrics-generator"), - createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", "hypertrace-metrics-processor") + createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", "hypertrace-metrics-processor"), + createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "test", "hypertrace-metrics-exporter") ).into("./build/resources/test/configs/") } diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index aa62a2b6a..747dffe0f 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -17,6 +17,7 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; +import org.hypertrace.metrics.exporter.MetricsExporter; import org.hypertrace.metrics.generator.MetricsGenerator; import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; @@ -31,9 +32,33 @@ public class HypertraceIngester extends KafkaStreamsApp { private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config"; private Map> jobNameToSubTopology = new HashMap<>(); + private MetricsExporter metricsExporter; + private Thread metricsExporterThread; public HypertraceIngester(ConfigClient configClient) { super(configClient); + metricsExporter = + new MetricsExporter(configClient, getSubJobConfig("hypertrace-metrics-exporter")); + } + + @Override + protected void doInit() { + super.doInit(); + metricsExporter.doInit(); + } + + @Override + protected void doStart() { + super.doStart(); + metricsExporterThread = new Thread(() -> metricsExporter.doStart()); + metricsExporterThread.start(); + } + + @Override + protected void doStop() { + super.doStop(); + metricsExporter.doStop(); + metricsExporterThread.stop(); } private KafkaStreamsApp getSubTopologyInstance(String name) { diff --git a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf index 0a09e37b5..093475361 100644 --- a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf +++ b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf @@ -9,7 +9,8 @@ sub.topology.names = [ "hypertrace-trace-enricher", "all-views", "hypertrace-metrics-generator", - "hypertrace-metrics-processor"] + "hypertrace-metrics-processor" +] precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS} diff --git a/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..612342977 --- /dev/null +++ b/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.exporter" +} \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..2da99545c --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,49 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-api:1.4.1") + implementation("io.opentelemetry:opentelemetry-api-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-sdk:1.4.1") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") + + // kafka + implementation("org.apache.kafka:kafka-clients:2.6.0") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java new file mode 100644 index 000000000..c3963d0ad --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java @@ -0,0 +1,89 @@ +package org.hypertrace.metrics.exporter; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsConsumer.class); + private static final int CONSUMER_POLL_TIMEOUT_MS = 100; + + private static final String KAFKA_CONFIG_KEY = "kafka.config"; + private static final String INPUT_TOPIC_KEY = "input.topic"; + + private final KafkaConsumer consumer; + + public MetricsConsumer(Config config) { + Properties props = new Properties(); + props.putAll( + mergeProperties(getBaseProperties(), getFlatMapConfig(config.getConfig(KAFKA_CONFIG_KEY)))); + consumer = new KafkaConsumer(props); + consumer.subscribe(Collections.singletonList(config.getString(INPUT_TOPIC_KEY))); + } + + public List consume() { + List resourceMetrics = new ArrayList<>(); + + ConsumerRecords records = + consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); + records.forEach( + record -> { + try { + resourceMetrics.add(ResourceMetrics.parseFrom(record.value())); + } catch (InvalidProtocolBufferException e) { + LOGGER.error("Invalid record with exception", e); + } + }); + + return resourceMetrics; + } + + public void close() { + consumer.close(); + } + + private Map getBaseProperties() { + Map baseProperties = new HashMap<>(); + baseProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "hypertrace-metrics-exporter"); + baseProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + baseProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + baseProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + baseProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + baseProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + return baseProperties; + } + + private Map getFlatMapConfig(Config config) { + Map propertiesMap = new HashMap(); + config.entrySet().stream() + .forEach( + (entry) -> { + propertiesMap.put((String) entry.getKey(), config.getString((String) entry.getKey())); + }); + return propertiesMap; + } + + private Map mergeProperties( + Map baseProps, Map props) { + Objects.requireNonNull(baseProps); + props.forEach(baseProps::put); + return baseProps; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java new file mode 100644 index 000000000..2bf731032 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java @@ -0,0 +1,64 @@ +package org.hypertrace.metrics.exporter; + +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.hypertrace.core.serviceframework.PlatformService; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsExporter extends PlatformService { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class); + private static final String OTLP_CONFIG_KEY = "otlp.collector.config"; + + private MetricsConsumer metricsConsumer; + private OtlpGrpcExporter otlpGrpcExporter; + private Config config; + + public MetricsExporter(ConfigClient configClient, Config config) { + super(configClient); + this.config = config; + } + + @Override + public void doInit() { + config = (config != null) ? config : getAppConfig(); + metricsConsumer = new MetricsConsumer(config); + otlpGrpcExporter = new OtlpGrpcExporter(config.getConfig(OTLP_CONFIG_KEY)); + } + + @Override + public void doStart() { + while (true) { + List resourceMetrics = metricsConsumer.consume(); + if (!resourceMetrics.isEmpty()) { + CompletableResultCode result = otlpGrpcExporter.export(resourceMetrics); + result.join(1, TimeUnit.MINUTES); + } + waitForSec(1); + } + } + + @Override + public void doStop() { + metricsConsumer.close(); + otlpGrpcExporter.close(); + } + + @Override + public boolean healthCheck() { + return true; + } + + private void waitForSec(int secs) { + try { + Thread.sleep(1000L * secs); + } catch (InterruptedException e) { + LOGGER.debug("waiting for pushing next records were intruppted"); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java new file mode 100644 index 000000000..e175c52a5 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java @@ -0,0 +1,91 @@ +package org.hypertrace.metrics.exporter; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc.MetricsServiceFutureStub; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OtlpGrpcExporter { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpGrpcExporter.class); + + private MetricsServiceFutureStub metricsService; + private ManagedChannel managedChannel; + private long timeoutNanos; + + public OtlpGrpcExporter(Config config) { + String host = config.getString("host"); + int port = config.getInt("port"); + int timeOut = config.hasPath("timeout_nanos") ? config.getInt("timeout_nanos") : 0; + + ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); + + managedChannel = channel; + timeoutNanos = timeOut; + metricsService = MetricsServiceGrpc.newFutureStub(channel); + } + + public CompletableResultCode export(List metrics) { + ExportMetricsServiceRequest exportMetricsServiceRequest = + ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(metrics).build(); + + final CompletableResultCode result = new CompletableResultCode(); + + MetricsServiceFutureStub exporter; + if (timeoutNanos > 0) { + exporter = metricsService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); + } else { + exporter = metricsService; + } + + Futures.addCallback( + exporter.export(exportMetricsServiceRequest), + new FutureCallback() { + @Override + public void onSuccess(@Nullable ExportMetricsServiceResponse response) { + result.succeed(); + } + + @Override + public void onFailure(Throwable t) { + Status status = Status.fromThrowable(t); + switch (status.getCode()) { + case UNIMPLEMENTED: + LOGGER.error("Failed to export metrics. Server responded with UNIMPLEMENTED. ", t); + break; + case UNAVAILABLE: + LOGGER.error("Failed to export metrics. Server is UNAVAILABLE. ", t); + break; + default: + LOGGER.warn("Failed to export metrics. Error message: " + t.getMessage()); + break; + } + result.fail(); + } + }, + MoreExecutors.directExecutor()); + return result; + } + + public void close() { + try { + managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Failed to shutdown the gRPC channel", e); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..481ef47fd --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -0,0 +1,24 @@ +service.name = hypertrace-metrics-exporter +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.exporter.MetricsExporter + +input.topic = "enriched-otlp-metrics" + +otlp.collector.config = { + host = localhost + port = 5555 +} + +kafka.config = { + application.id = metrics-from-enriched-otlp-metrics-job + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporter +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java deleted file mode 100644 index 137674623..000000000 --- a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsExporter.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.hypertrace.metrics.processor; - -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; - -public class MetricsExporter - implements Transformer> { - - @Override - public void init(ProcessorContext context) {} - - @Override - public KeyValue transform(byte[] key, ResourceMetrics value) { - // noop normalizer for now - return new KeyValue<>(key, value); - } - - @Override - public void close() {} -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 72a3dc563..352b5771b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -37,6 +37,7 @@ include("span-normalizer:span-normalizer-constants") // metrics pipeline include("hypertrace-metrics-generator:hypertrace-metrics-generator") include("hypertrace-metrics-processor:hypertrace-metrics-processor") +include("hypertrace-metrics-exporter:hypertrace-metrics-exporter") // e2e pipeline include("hypertrace-ingester") From 93edd27ca73469c2e8d03344e8e9b4322598adbf Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 24 Sep 2021 09:29:57 +0530 Subject: [PATCH 07/10] feat : working demo e2e for ingestion side --- .../java/org/hypertrace/metrics/exporter/MetricsExporter.java | 3 ++- .../src/main/resources/configs/common/application.conf | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java index 2bf731032..50c4bca93 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java @@ -35,8 +35,9 @@ public void doInit() { public void doStart() { while (true) { List resourceMetrics = metricsConsumer.consume(); + CompletableResultCode result; if (!resourceMetrics.isEmpty()) { - CompletableResultCode result = otlpGrpcExporter.export(resourceMetrics); + result = otlpGrpcExporter.export(resourceMetrics); result.join(1, TimeUnit.MINUTES); } waitForSec(1); diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf index 481ef47fd..10d0f817d 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -7,7 +7,9 @@ input.topic = "enriched-otlp-metrics" otlp.collector.config = { host = localhost - port = 5555 + host = ${?OTLP_COLLECTOR_HOST} + port = 4317 + port = ${?OTLP_COLLECTOR_PORT} } kafka.config = { From a1220e80beb94cb41c161f696dcc1b8a85ce9b9c Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 29 Sep 2021 17:46:03 +0530 Subject: [PATCH 08/10] feat: adding exporter for direct sink from kafka --- .../metrics/exporter/MetricsProducerImpl.java | 29 ++++++ .../exporter/OtlpToObjectConverter.java | 90 +++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java new file mode 100644 index 000000000..b104b8370 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java @@ -0,0 +1,29 @@ +package org.hypertrace.metrics.exporter; + +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricProducer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class MetricsProducerImpl implements MetricProducer { + private ConcurrentLinkedQueue metricDataQueue; + + public MetricsProducerImpl() { + this.metricDataQueue = new ConcurrentLinkedQueue(); + } + + public void addMetricData(List metricData) { + this.metricDataQueue.addAll(metricData); + } + + public Collection collectAllMetrics() { + List metricDataList = new ArrayList<>(); + int maxItems = 0; + while (this.metricDataQueue.peek() != null & maxItems < 1000) { + metricDataList.add(this.metricDataQueue.poll()); + } + return metricDataList; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java new file mode 100644 index 000000000..711c6efa0 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java @@ -0,0 +1,90 @@ +package org.hypertrace.metrics.exporter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Metric.DataCase; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class OtlpToObjectConverter { + + private Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { + return Resource.create(toAttributes(otlpResource.getAttributesList())); + } + + private InstrumentationLibraryInfo toInstrumentationLibraryInfo( + InstrumentationLibrary otlpInstrumentationLibraryInfo) { + return InstrumentationLibraryInfo.create( + otlpInstrumentationLibraryInfo.getName(), otlpInstrumentationLibraryInfo.getVersion()); + } + + private Attributes toAttributes(List keyValues) { + AttributesBuilder attributesBuilder = Attributes.builder(); + keyValues.forEach( + keyValue -> { + attributesBuilder.put(keyValue.getKey(), keyValue.getValue().getStringValue()); + }); + return attributesBuilder.build(); + } + + private List toDoublePointData(List numberDataPoints) { + return numberDataPoints.stream() + .map( + numberDataPoint -> + DoublePointData.create( + numberDataPoint.getStartTimeUnixNano(), + numberDataPoint.getTimeUnixNano(), + toAttributes(numberDataPoint.getAttributesList()), + numberDataPoint.getAsDouble())) + .collect(Collectors.toList()); + } + + public List toMetricData(ResourceMetrics resourceMetrics) { + List metricData = new ArrayList<>(); + Resource resource = toResource(resourceMetrics.getResource()); + resourceMetrics + .getInstrumentationLibraryMetricsList() + .forEach( + instrumentationLibraryMetrics -> { + InstrumentationLibraryInfo instrumentationLibraryInfo = + toInstrumentationLibraryInfo( + instrumentationLibraryMetrics.getInstrumentationLibrary()); + instrumentationLibraryMetrics + .getMetricsList() + .forEach( + metric -> { + // get type : for now only support gauge + if (metric.getDataCase().equals(DataCase.GAUGE)) { + Gauge gaugeMetric = metric.getGauge(); + String name = metric.getName(); + String description = metric.getDescription(); + String unit = metric.getUnit(); + DoubleGaugeData data = + DoubleGaugeData.create( + toDoublePointData(gaugeMetric.getDataPointsList())); + MetricData md = + MetricData.createDoubleGauge( + resource, + instrumentationLibraryInfo, + name, + description, + unit, + data); + metricData.add(md); + } + }); + }); + return List.of(); + } +} From 47a12e878786bc7f4c2afcab3fabf7ae904be21c Mon Sep 17 00:00:00 2001 From: Ronak Date: Thu, 30 Sep 2021 20:53:51 +0530 Subject: [PATCH 09/10] feat: adding promethues exporter --- .../build.gradle.kts | 2 ++ .../metrics/exporter/MetricsConsumer.java | 2 +- .../metrics/exporter/MetricsProducerImpl.java | 22 +++++++++++-------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts index 2da99545c..8d9895ac0 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts @@ -38,6 +38,8 @@ dependencies { implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") + implementation("io.opentelemetry:opentelemetry-exporters-prometheus:0.9.1") + // kafka implementation("org.apache.kafka:kafka-clients:2.6.0") diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java index c3963d0ad..4818783d4 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java @@ -47,7 +47,7 @@ record -> { LOGGER.error("Invalid record with exception", e); } }); - + consumer.commitSync(); return resourceMetrics; } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java index b104b8370..c6f51b312 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java @@ -5,25 +5,29 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MetricsProducerImpl implements MetricProducer { - private ConcurrentLinkedQueue metricDataQueue; + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsProducerImpl.class); + private BlockingQueue metricDataQueue; - public MetricsProducerImpl() { - this.metricDataQueue = new ConcurrentLinkedQueue(); + public MetricsProducerImpl(int maxQueueSize) { + this.metricDataQueue = new ArrayBlockingQueue(maxQueueSize); } - public void addMetricData(List metricData) { + public void addMetricData(List metricData) throws InterruptedException { this.metricDataQueue.addAll(metricData); + for (MetricData md : metricData) { + this.metricDataQueue.put(md); + } } public Collection collectAllMetrics() { List metricDataList = new ArrayList<>(); - int maxItems = 0; - while (this.metricDataQueue.peek() != null & maxItems < 1000) { - metricDataList.add(this.metricDataQueue.poll()); - } + this.metricDataQueue.drainTo(metricDataList); return metricDataList; } } From 471c08451c3f9e2cb63aa0120839623465576678 Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 1 Oct 2021 23:17:06 +0530 Subject: [PATCH 10/10] feat: working copy of pull based exporter --- hypertrace-ingester/build.gradle.kts | 4 +- .../ingester/HypertraceIngester.java | 7 +- .../build.gradle.kts | 12 ++- .../exporter/InMemoryMetricsProducer.java | 58 +++++++++++++ .../metrics/exporter/MetricsConsumer.java | 40 ++++++++- .../metrics/exporter/MetricsExporter.java | 65 --------------- .../exporter/MetricsExporterEntryService.java | 83 +++++++++++++++++++ .../exporter/MetricsExporterServlet.java | 36 ++++++++ .../metrics/exporter/MetricsProducerImpl.java | 33 -------- .../metrics/exporter/MetricsServer.java | 43 ++++++++++ .../metrics/exporter/OtlpGrpcExporter.java | 4 +- .../exporter/OtlpToObjectConverter.java | 51 ++++++++++-- .../resources/configs/common/application.conf | 7 +- .../build.gradle.kts | 4 +- .../build.gradle.kts | 4 +- .../build.gradle.kts | 4 +- .../raw-spans-grouper/build.gradle.kts | 4 +- .../span-normalizer/build.gradle.kts | 4 +- 18 files changed, 334 insertions(+), 129 deletions(-) create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/InMemoryMetricsProducer.java delete mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterEntryService.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterServlet.java delete mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsServer.java diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 0c63f5398..b154f5a2e 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -26,8 +26,8 @@ hypertraceDocker { dependencies { implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.datamodel:data-model:0.1.18") implementation("org.hypertrace.core.viewgenerator:view-generator-framework:0.3.1") implementation("com.typesafe:config:1.4.1") diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index 747dffe0f..a5de7f1e1 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -17,7 +17,7 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; -import org.hypertrace.metrics.exporter.MetricsExporter; +import org.hypertrace.metrics.exporter.MetricsExporterEntryService; import org.hypertrace.metrics.generator.MetricsGenerator; import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; @@ -32,13 +32,14 @@ public class HypertraceIngester extends KafkaStreamsApp { private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config"; private Map> jobNameToSubTopology = new HashMap<>(); - private MetricsExporter metricsExporter; + private MetricsExporterEntryService metricsExporter; private Thread metricsExporterThread; public HypertraceIngester(ConfigClient configClient) { super(configClient); metricsExporter = - new MetricsExporter(configClient, getSubJobConfig("hypertrace-metrics-exporter")); + new MetricsExporterEntryService( + configClient, getSubJobConfig("hypertrace-metrics-exporter")); } @Override diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts index 8d9895ac0..a784fd8f0 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts @@ -27,8 +27,8 @@ tasks.test { dependencies { // common and framework implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // open telemetry @@ -38,8 +38,14 @@ dependencies { implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.4.1") implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.4.1-alpah") implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.4.1-alpha") - implementation("io.opentelemetry:opentelemetry-exporters-prometheus:0.9.1") + implementation("io.opentelemetry:opentelemetry-exporter-prometheus:1.4.1-alpha") + // jetty server + implementation("org.eclipse.jetty:jetty-server:9.4.42.v20210604") + implementation("org.eclipse.jetty:jetty-servlet:9.4.42.v20210604") + + // prometheus metrics servelet + implementation("io.prometheus:simpleclient_servlet:0.6.0") // kafka implementation("org.apache.kafka:kafka-clients:2.6.0") diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/InMemoryMetricsProducer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/InMemoryMetricsProducer.java new file mode 100644 index 000000000..57e19142d --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/InMemoryMetricsProducer.java @@ -0,0 +1,58 @@ +package org.hypertrace.metrics.exporter; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricProducer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InMemoryMetricsProducer implements MetricProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryMetricsProducer.class); + private BlockingQueue metricDataQueue; + private final AtomicBoolean commitOffset = new AtomicBoolean(false); + + public InMemoryMetricsProducer(int maxQueueSize) { + this.metricDataQueue = new ArrayBlockingQueue(maxQueueSize); + } + + public void addMetricData(List resourceMetrics) { + try { + for (ResourceMetrics rm : resourceMetrics) { + List metricData = OtlpToObjectConverter.toMetricData(rm); + for (MetricData md : metricData) { + this.metricDataQueue.put(md); + } + } + } catch (InterruptedException exception) { + LOGGER.info("This thread was intruppted, so we might loose copying some metrics "); + } + } + + public Collection collectAllMetrics() { + List metricDataList = new ArrayList<>(); + int max = 0; + while (max < 100 && this.metricDataQueue.peek() != null) { + metricDataList.add(this.metricDataQueue.poll()); + max++; + } + return metricDataList; + } + + public void setCommitOffset() { + commitOffset.set(true); + } + + public void clearCommitOffset() { + commitOffset.set(false); + } + + public boolean isCommitOffset() { + return commitOffset.get(); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java index 4818783d4..c0eb194ed 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsConsumer.java @@ -11,13 +11,14 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MetricsConsumer { +public class MetricsConsumer implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(MetricsConsumer.class); private static final int CONSUMER_POLL_TIMEOUT_MS = 100; @@ -25,13 +26,38 @@ public class MetricsConsumer { private static final String INPUT_TOPIC_KEY = "input.topic"; private final KafkaConsumer consumer; + private final InMemoryMetricsProducer inMemoryMetricsProducer; + private final AtomicBoolean running = new AtomicBoolean(false); - public MetricsConsumer(Config config) { + public MetricsConsumer(Config config, InMemoryMetricsProducer inMemoryMetricsProducer) { Properties props = new Properties(); props.putAll( mergeProperties(getBaseProperties(), getFlatMapConfig(config.getConfig(KAFKA_CONFIG_KEY)))); consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(config.getString(INPUT_TOPIC_KEY))); + this.inMemoryMetricsProducer = inMemoryMetricsProducer; + } + + public void run() { + running.set(true); + while (running.get()) { + // check if any message to commit + if (inMemoryMetricsProducer.isCommitOffset()) { + // consumer.commitSync(); + inMemoryMetricsProducer.clearCommitOffset(); + } + + // read new data + List resourceMetrics = consume(); + if (!resourceMetrics.isEmpty()) { + inMemoryMetricsProducer.addMetricData(resourceMetrics); + } + waitForSec((long) (1000L * 0.1)); + } + } + + public void stop() { + running.set(false); } public List consume() { @@ -47,7 +73,7 @@ record -> { LOGGER.error("Invalid record with exception", e); } }); - consumer.commitSync(); + return resourceMetrics; } @@ -86,4 +112,12 @@ private Map mergeProperties( props.forEach(baseProps::put); return baseProps; } + + private void waitForSec(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + LOGGER.debug("waiting for pushing next records were intruppted"); + } + } } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java deleted file mode 100644 index 50c4bca93..000000000 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporter.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.hypertrace.metrics.exporter; - -import com.typesafe.config.Config; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import io.opentelemetry.sdk.common.CompletableResultCode; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.hypertrace.core.serviceframework.PlatformService; -import org.hypertrace.core.serviceframework.config.ConfigClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MetricsExporter extends PlatformService { - - private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class); - private static final String OTLP_CONFIG_KEY = "otlp.collector.config"; - - private MetricsConsumer metricsConsumer; - private OtlpGrpcExporter otlpGrpcExporter; - private Config config; - - public MetricsExporter(ConfigClient configClient, Config config) { - super(configClient); - this.config = config; - } - - @Override - public void doInit() { - config = (config != null) ? config : getAppConfig(); - metricsConsumer = new MetricsConsumer(config); - otlpGrpcExporter = new OtlpGrpcExporter(config.getConfig(OTLP_CONFIG_KEY)); - } - - @Override - public void doStart() { - while (true) { - List resourceMetrics = metricsConsumer.consume(); - CompletableResultCode result; - if (!resourceMetrics.isEmpty()) { - result = otlpGrpcExporter.export(resourceMetrics); - result.join(1, TimeUnit.MINUTES); - } - waitForSec(1); - } - } - - @Override - public void doStop() { - metricsConsumer.close(); - otlpGrpcExporter.close(); - } - - @Override - public boolean healthCheck() { - return true; - } - - private void waitForSec(int secs) { - try { - Thread.sleep(1000L * secs); - } catch (InterruptedException e) { - LOGGER.debug("waiting for pushing next records were intruppted"); - } - } -} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterEntryService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterEntryService.java new file mode 100644 index 000000000..70e067acc --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterEntryService.java @@ -0,0 +1,83 @@ +package org.hypertrace.metrics.exporter; + +import com.typesafe.config.Config; +import org.hypertrace.core.serviceframework.PlatformService; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsExporterEntryService extends PlatformService { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporterEntryService.class); + private static final String OTLP_CONFIG_KEY = "otlp.collector.config"; + private static final String PULL_EXPORTER_COFING_KEY = "pull.exporter"; + + private MetricsConsumer metricsConsumer; + private OtlpGrpcExporter otlpGrpcExporter; + private Config config; + private InMemoryMetricsProducer inMemoryMetricsProducer; + private Boolean isPullExporter; + private MetricsServer metricsServer; + + public MetricsExporterEntryService(ConfigClient configClient, Config config) { + super(configClient); + this.config = config; + } + + @Override + public void doInit() { + config = (config != null) ? config : getAppConfig(); + inMemoryMetricsProducer = new InMemoryMetricsProducer(5000); + metricsConsumer = new MetricsConsumer(config, inMemoryMetricsProducer); + isPullExporter = config.getBoolean(PULL_EXPORTER_COFING_KEY); + if (!isPullExporter) { + otlpGrpcExporter = new OtlpGrpcExporter(config.getConfig(OTLP_CONFIG_KEY)); + } else { + metricsServer = new MetricsServer(config, inMemoryMetricsProducer); + } + } + + @Override + public void doStart() { + + Thread metricsConsumerThread = new Thread(metricsConsumer); + Thread metricsExporterThread = null; + if (isPullExporter) { + metricsExporterThread = new Thread(() -> metricsServer.start()); + } else { + metricsConsumerThread = new Thread(otlpGrpcExporter); + } + + metricsExporterThread.start(); + metricsConsumerThread.start(); + + try { + metricsExporterThread.join(); + } catch (InterruptedException exception) { + exception.printStackTrace(); + } + + // stop consuming if metric server thread has stopped + metricsConsumer.stop(); + try { + metricsConsumerThread.join(); + } catch (InterruptedException exception) { + exception.printStackTrace(); + } + } + + @Override + public void doStop() { + metricsConsumer.close(); + if (!isPullExporter) { + otlpGrpcExporter.close(); + } else { + metricsServer.stop(); + } + } + + @Override + public boolean healthCheck() { + return true; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterServlet.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterServlet.java new file mode 100644 index 000000000..a273f8c18 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterServlet.java @@ -0,0 +1,36 @@ +package org.hypertrace.metrics.exporter; + +import io.opentelemetry.exporter.prometheus.PrometheusCollector; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.MetricsServlet; +import java.io.IOException; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class MetricsExporterServlet extends MetricsServlet { + private PrometheusCollector prometheusCollector; + private static final CollectorRegistry collectorRegistry = new CollectorRegistry(false); + private InMemoryMetricsProducer inMemoryMetricsProducer; + + public MetricsExporterServlet(InMemoryMetricsProducer producer) { + super(collectorRegistry); + prometheusCollector = PrometheusCollector.builder().setMetricProducer(producer).build(); + collectorRegistry.register(prometheusCollector); + inMemoryMetricsProducer = producer; + } + + @Override + protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) + throws ServletException, IOException { + try { + // List samples = prometheusCollector.collect(); + super.doGet(req, resp); + inMemoryMetricsProducer.setCommitOffset(); + } catch (ServletException e) { + throw e; + } catch (IOException e) { + throw e; + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java deleted file mode 100644 index c6f51b312..000000000 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsProducerImpl.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.hypertrace.metrics.exporter; - -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricProducer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MetricsProducerImpl implements MetricProducer { - private static final Logger LOGGER = LoggerFactory.getLogger(MetricsProducerImpl.class); - private BlockingQueue metricDataQueue; - - public MetricsProducerImpl(int maxQueueSize) { - this.metricDataQueue = new ArrayBlockingQueue(maxQueueSize); - } - - public void addMetricData(List metricData) throws InterruptedException { - this.metricDataQueue.addAll(metricData); - for (MetricData md : metricData) { - this.metricDataQueue.put(md); - } - } - - public Collection collectAllMetrics() { - List metricDataList = new ArrayList<>(); - this.metricDataQueue.drainTo(metricDataList); - return metricDataList; - } -} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsServer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsServer.java new file mode 100644 index 000000000..d0147229b --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsServer.java @@ -0,0 +1,43 @@ +package org.hypertrace.metrics.exporter; + +import com.typesafe.config.Config; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsServer { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsServer.class); + + private Server server; + + public MetricsServer(Config config, InMemoryMetricsProducer producer) { + server = new Server(8098); + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + this.server.setHandler(context); + this.server.setStopAtShutdown(true); + this.server.setStopTimeout(2000L); + context.addServlet( + new ServletHolder(new MetricsExporterServlet(producer)), "/ingestion/metrics"); + } + + public void start() { + try { + this.server.start(); + LOGGER.info("Started metrics service on port: {}.", 8098); + this.server.join(); + } catch (Exception var4) { + LOGGER.error("Failed to start metrics servlet."); + } + } + + public void stop() { + try { + server.stop(); + } catch (Exception e) { + LOGGER.error("Error stopping metrics server"); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java index e175c52a5..ec4afe23c 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpGrpcExporter.java @@ -19,7 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class OtlpGrpcExporter { +public class OtlpGrpcExporter implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(OtlpGrpcExporter.class); @@ -39,6 +39,8 @@ public OtlpGrpcExporter(Config config) { metricsService = MetricsServiceGrpc.newFutureStub(channel); } + public void run() {} + public CompletableResultCode export(List metrics) { ExportMetricsServiceRequest exportMetricsServiceRequest = ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(metrics).build(); diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java index 711c6efa0..9ceba19e0 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/OtlpToObjectConverter.java @@ -8,9 +8,12 @@ import io.opentelemetry.proto.metrics.v1.Metric.DataCase; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; @@ -19,17 +22,17 @@ public class OtlpToObjectConverter { - private Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { + public static Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { return Resource.create(toAttributes(otlpResource.getAttributesList())); } - private InstrumentationLibraryInfo toInstrumentationLibraryInfo( + public static InstrumentationLibraryInfo toInstrumentationLibraryInfo( InstrumentationLibrary otlpInstrumentationLibraryInfo) { return InstrumentationLibraryInfo.create( otlpInstrumentationLibraryInfo.getName(), otlpInstrumentationLibraryInfo.getVersion()); } - private Attributes toAttributes(List keyValues) { + public static Attributes toAttributes(List keyValues) { AttributesBuilder attributesBuilder = Attributes.builder(); keyValues.forEach( keyValue -> { @@ -38,7 +41,7 @@ private Attributes toAttributes(List keyValues) { return attributesBuilder.build(); } - private List toDoublePointData(List numberDataPoints) { + public static List toDoublePointData(List numberDataPoints) { return numberDataPoints.stream() .map( numberDataPoint -> @@ -46,11 +49,11 @@ private List toDoublePointData(List numberData numberDataPoint.getStartTimeUnixNano(), numberDataPoint.getTimeUnixNano(), toAttributes(numberDataPoint.getAttributesList()), - numberDataPoint.getAsDouble())) + numberDataPoint.getAsInt())) .collect(Collectors.toList()); } - public List toMetricData(ResourceMetrics resourceMetrics) { + public static List toMetricData(ResourceMetrics resourceMetrics) { List metricData = new ArrayList<>(); Resource resource = toResource(resourceMetrics.getResource()); resourceMetrics @@ -82,9 +85,43 @@ public List toMetricData(ResourceMetrics resourceMetrics) { unit, data); metricData.add(md); + } else if (metric.getDataCase().equals(DataCase.SUM)) { + Sum sumMetric = metric.getSum(); + boolean isMonotonic = sumMetric.getIsMonotonic(); + AggregationTemporality temporality; + if (sumMetric + .getAggregationTemporality() + .equals( + io.opentelemetry.proto.metrics.v1.AggregationTemporality + .AGGREGATION_TEMPORALITY_CUMULATIVE)) { + temporality = AggregationTemporality.CUMULATIVE; + } else if (sumMetric + .getAggregationTemporality() + .equals( + io.opentelemetry.proto.metrics.v1.AggregationTemporality + .AGGREGATION_TEMPORALITY_DELTA)) { + temporality = AggregationTemporality.DELTA; + } else { + temporality = AggregationTemporality.CUMULATIVE; + } + + DoubleSumData doubleSumData = + DoubleSumData.create( + isMonotonic, + temporality, + toDoublePointData(sumMetric.getDataPointsList())); + MetricData md = + MetricData.createDoubleSum( + resource, + instrumentationLibraryInfo, + metric.getName(), + metric.getDescription(), + metric.getUnit(), + doubleSumData); + metricData.add(md); } }); }); - return List.of(); + return metricData; } } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf index 10d0f817d..69694c89b 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -1,10 +1,13 @@ service.name = hypertrace-metrics-exporter service.admin.port = 8099 -main.class = org.hypertrace.metrics.exporter.MetricsExporter +main.class = org.hypertrace.metrics.exporter.MetricsExporterEntryService input.topic = "enriched-otlp-metrics" + +pull.exporter = true + otlp.collector.config = { host = localhost host = ${?OTLP_COLLECTOR_HOST} @@ -21,6 +24,6 @@ kafka.config = { logger.names = ["file"] logger.file.dir = "/var/logs/metrics-generator" -metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporter +metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporterEntryService metrics.reporter.names = ["prometheus"] metrics.reportInterval = 60 \ No newline at end of file diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts index a51868e86..8afca34c7 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -27,8 +27,8 @@ tasks.test { dependencies { // common and framework implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // open telemetry diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts index a51868e86..8afca34c7 100644 --- a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts @@ -27,8 +27,8 @@ tasks.test { dependencies { // common and framework implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // open telemetry diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts index 818180c41..4fb9c8bcf 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts @@ -36,8 +36,8 @@ tasks.test { dependencies { implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl")) implementation("org.hypertrace.core.datamodel:data-model:0.1.18") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.entity.service:entity-service-client:0.8.0") implementation("com.typesafe:config:1.4.1") diff --git a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts index 85b3c7431..a473a8f87 100644 --- a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts +++ b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts @@ -39,8 +39,8 @@ dependencies { } implementation(project(":span-normalizer:span-normalizer-api")) implementation("org.hypertrace.core.datamodel:data-model:0.1.18") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") implementation("com.typesafe:config:1.4.1") diff --git a/span-normalizer/span-normalizer/build.gradle.kts b/span-normalizer/span-normalizer/build.gradle.kts index 2b2c92b3f..27652e8ea 100644 --- a/span-normalizer/span-normalizer/build.gradle.kts +++ b/span-normalizer/span-normalizer/build.gradle.kts @@ -35,8 +35,8 @@ dependencies { implementation(project(":semantic-convention-utils")) implementation("org.hypertrace.core.datamodel:data-model:0.1.18") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // Required for the GRPC clients.