From 1dee97ecf0be0738c895924d8155ef3c004a7901 Mon Sep 17 00:00:00 2001 From: lu-xiaoshuang <121755080+lu-xiaoshuang@users.noreply.github.com> Date: Mon, 22 Jan 2024 08:00:08 +0800 Subject: [PATCH] add Logs Kafka Exporter and Metrics Kafka Exporter --- dependencyManagement/build.gradle.kts | 1 + kafka-exporter/build.gradle.kts | 1 + .../contrib/kafka/KafkaLogRecordExporter.java | 96 +++++++++ .../kafka/KafkaLogRecordExporterBuilder.java | 97 +++++++++ .../contrib/kafka/KafkaMetricExporter.java | 117 +++++++++++ .../kafka/KafkaMetricExporterBuilder.java | 122 +++++++++++ .../contrib/kafka/impl/KafkaExporter.java | 83 ++++++++ .../kafka/impl/KafkaExporterBuilder.java | 139 +++++++++++++ .../contrib/kafka/impl/KafkaSender.java | 97 +++++++++ .../kafka/impl/KafkaSenderProvider.java | 25 +++ .../KafkaLogRecordExporterProvider.java | 40 ++++ .../internal/KafkaMetricExporterProvider.java | 40 ++++ .../internal/KafkaSpanExporterProvider.java | 41 ++++ ...logs.ConfigurableLogRecordExporterProvider | 1 + ...metrics.ConfigurableMetricExporterProvider | 1 + ...pi.traces.ConfigurableSpanExporterProvider | 1 + .../kafka/KafkaLogRecordExporterTest.java | 195 ++++++++++++++++++ .../kafka/KafkaMetricExporterTest.java | 194 +++++++++++++++++ .../internal/KafkaExporterProviderTest.java | 45 ++++ 19 files changed, 1336 insertions(+) create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporter.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterBuilder.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporter.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterBuilder.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporter.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporterBuilder.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSender.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSenderProvider.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaLogRecordExporterProvider.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaMetricExporterProvider.java create mode 100644 kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaSpanExporterProvider.java create mode 100644 kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider create mode 100644 kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider create mode 100644 kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider create mode 100644 kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterTest.java create mode 100644 kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterTest.java create mode 100644 kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/internal/KafkaExporterProviderTest.java diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index 56b109817..95667ef65 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -33,6 +33,7 @@ val CORE_DEPENDENCIES = listOf( "com.google.errorprone:error_prone_core:${errorProneVersion}", "io.github.netmikey.logunit:logunit-jul:2.0.0", "io.opentelemetry.proto:opentelemetry-proto:1.0.0-alpha", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:1.33.0", "io.prometheus:simpleclient:${prometheusVersion}", "io.prometheus:simpleclient_common:${prometheusVersion}", "io.prometheus:simpleclient_httpserver:${prometheusVersion}", diff --git a/kafka-exporter/build.gradle.kts b/kafka-exporter/build.gradle.kts index db99896bb..9b7bef287 100644 --- a/kafka-exporter/build.gradle.kts +++ b/kafka-exporter/build.gradle.kts @@ -24,6 +24,7 @@ dependencies { implementation("com.google.protobuf:protobuf-java") testImplementation("io.opentelemetry:opentelemetry-api") + testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") testImplementation("com.google.guava:guava") testImplementation("org.testcontainers:junit-jupiter") diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporter.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporter.java new file mode 100644 index 000000000..e21b1d1d2 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporter.java @@ -0,0 +1,96 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import io.opentelemetry.contrib.kafka.impl.KafkaExporter; +import io.opentelemetry.contrib.kafka.impl.KafkaExporterBuilder; +import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A {@link LogRecordExporter} which writes {@linkplain LogRecordData logs} to Kafka in OLTP format. + */ +public final class KafkaLogRecordExporter implements LogRecordExporter { + + private static final Logger logger = Logger.getLogger(KafkaLogRecordExporter.class.getName()); + + private final AtomicBoolean isShutdown = new AtomicBoolean(); + + private final KafkaExporterBuilder builder; + + private final KafkaExporter delegate; + + /** + * Returns a new {@link KafkaLogRecordExporter} using the default values. + * + *

To load configuration values from environment variables and system properties, use opentelemetry-sdk-extension-autoconfigure. + * + * @return a new {@link KafkaLogRecordExporter} instance. + */ + public static KafkaLogRecordExporter getDefault() { + return builder().build(); + } + + /** + * Returns a new builder instance for this exporter. + * + * @return a new builder instance for this exporter. + */ + public static KafkaLogRecordExporterBuilder builder() { + return new KafkaLogRecordExporterBuilder(); + } + + KafkaLogRecordExporter( + KafkaExporterBuilder builder, + KafkaExporter delegate) { + this.builder = builder; + this.delegate = delegate; + } + + public KafkaLogRecordExporterBuilder toBuilder() { + return new KafkaLogRecordExporterBuilder(builder.copy()); + } + + @Override + public CompletableResultCode export(Collection logs) { + if (isShutdown.get()) { + return CompletableResultCode.ofFailure(); + } + try { + LogsRequestMarshaler exportRequest = LogsRequestMarshaler.create(logs); + return delegate.export(exportRequest, logs.size()); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "send log records to kafka failed.", e); + return CompletableResultCode.ofFailure(); + } + } + + @Override + public CompletableResultCode flush() { + // TODO + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + logger.log(Level.INFO, "Calling shutdown() multiple times."); + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public String toString() { + return "KafkaLogRecordExporter{" + builder.toString(false) + "}"; + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterBuilder.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterBuilder.java new file mode 100644 index 000000000..41e035f30 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterBuilder.java @@ -0,0 +1,97 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.contrib.kafka.impl.KafkaExporterBuilder; +import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Builder utility for {@link KafkaLogRecordExporter}. + * + * @since 1.27.0 + */ +public final class KafkaLogRecordExporterBuilder { + + private final KafkaExporterBuilder delegate; + + KafkaLogRecordExporterBuilder(KafkaExporterBuilder delegate) { + this.delegate = delegate; + } + + KafkaLogRecordExporterBuilder() { + this(new KafkaExporterBuilder<>("kafka", "log")); + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setProtocolVersion(String protocolVersion) { + requireNonNull(protocolVersion, "protocolVersion"); + delegate.setProtocolVersion(protocolVersion); + return this; + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setBrokers(String brokers) { + requireNonNull(brokers, "brokers"); + delegate.setBrokers(brokers); + return this; + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setTopic(String topic) { + requireNonNull(topic, "topic"); + delegate.setTopic(topic); + return this; + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + delegate.setTimeout(timeout, unit); + return this; + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setRetryPolicy(RetryPolicy retryPolicy) { + requireNonNull(retryPolicy, "retryPolicy"); + delegate.setRetryPolicy(retryPolicy); + return this; + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setMeterProvider(MeterProvider meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + setMeterProvider(() -> meterProvider); + return this; + } + + @CanIgnoreReturnValue + public KafkaLogRecordExporterBuilder setMeterProvider( + Supplier meterProviderSupplier) { + requireNonNull(meterProviderSupplier, "meterProviderSupplier"); + delegate.setMeterProvider(meterProviderSupplier); + return this; + } + + public KafkaLogRecordExporter build() { + return new KafkaLogRecordExporter(delegate, delegate.build()); + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporter.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporter.java new file mode 100644 index 000000000..d0d57e7dd --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporter.java @@ -0,0 +1,117 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import io.opentelemetry.contrib.kafka.impl.KafkaExporter; +import io.opentelemetry.contrib.kafka.impl.KafkaExporterBuilder; +import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** A {@link MetricExporter} which writes {@linkplain MetricData spans} to Kafka in OTLP format. */ +public final class KafkaMetricExporter implements MetricExporter { + + private static final Logger logger = Logger.getLogger(KafkaMetricExporter.class.getName()); + + private final AtomicBoolean isShutdown = new AtomicBoolean(); + + private final KafkaExporterBuilder builder; + + private final KafkaExporter delegate; + + private final AggregationTemporalitySelector aggregationTemporalitySelector; + + private final DefaultAggregationSelector defaultAggregationSelector; + + /** + * Returns a new {@link KafkaMetricExporter} using the default values. + * + *

To load configuration values from environment variables and system properties, use opentelemetry-sdk-extension-autoconfigure. + * + * @return a new {@link KafkaMetricExporter} instance. + */ + public static KafkaMetricExporter getDefault() { + return builder().build(); + } + + /** + * Returns a new builder instance for this exporter. + * + * @return a new builder instance for this exporter. + */ + public static KafkaMetricExporterBuilder builder() { + return new KafkaMetricExporterBuilder(); + } + + KafkaMetricExporter( + KafkaExporterBuilder builder, + KafkaExporter delegate, + AggregationTemporalitySelector aggregationTemporalitySelector, + DefaultAggregationSelector defaultAggregationSelector) { + this.builder = builder; + this.delegate = delegate; + this.aggregationTemporalitySelector = aggregationTemporalitySelector; + this.defaultAggregationSelector = defaultAggregationSelector; + } + + public KafkaMetricExporterBuilder toBuilder() { + return new KafkaMetricExporterBuilder(builder.copy()); + } + + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return aggregationTemporalitySelector.getAggregationTemporality(instrumentType); + } + + @Override + public Aggregation getDefaultAggregation(InstrumentType instrumentType) { + return defaultAggregationSelector.getDefaultAggregation(instrumentType); + } + + @Override + public CompletableResultCode export(Collection metrics) { + if (isShutdown.get()) { + return CompletableResultCode.ofFailure(); + } + try { + MetricsRequestMarshaler exportRequest = MetricsRequestMarshaler.create(metrics); + return delegate.export(exportRequest, metrics.size()); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "send metrics to kafka failed.", e); + return CompletableResultCode.ofFailure(); + } + } + + @Override + public CompletableResultCode flush() { + // TODO + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + logger.log(Level.INFO, "Calling shutdown() multiple times."); + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public String toString() { + return "KafkaMetricExporter{" + builder.toString(false) + "}"; + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterBuilder.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterBuilder.java new file mode 100644 index 000000000..3ae7c6947 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterBuilder.java @@ -0,0 +1,122 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.contrib.kafka.impl.KafkaExporterBuilder; +import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Builder utility for {@link KafkaMetricExporter}. + * + * @since 1.27.0 + */ +public final class KafkaMetricExporterBuilder { + + private final KafkaExporterBuilder delegate; + + private AggregationTemporalitySelector aggregationTemporalitySelector = + AggregationTemporalitySelector.alwaysCumulative(); + + private DefaultAggregationSelector defaultAggregationSelector = + DefaultAggregationSelector.getDefault(); + + KafkaMetricExporterBuilder(KafkaExporterBuilder delegate) { + this.delegate = delegate; + } + + KafkaMetricExporterBuilder() { + this(new KafkaExporterBuilder<>("kafka", "metric")); + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setProtocolVersion(String protocolVersion) { + requireNonNull(protocolVersion, "protocolVersion"); + delegate.setProtocolVersion(protocolVersion); + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setBrokers(String brokers) { + requireNonNull(brokers, "brokers"); + delegate.setBrokers(brokers); + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setTopic(String topic) { + requireNonNull(topic, "topic"); + delegate.setTopic(topic); + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + delegate.setTimeout(timeout, unit); + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setRetryPolicy(RetryPolicy retryPolicy) { + requireNonNull(retryPolicy, "retryPolicy"); + delegate.setRetryPolicy(retryPolicy); + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setMeterProvider(MeterProvider meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + setMeterProvider(() -> meterProvider); + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setMeterProvider( + Supplier meterProviderSupplier) { + requireNonNull(meterProviderSupplier, "meterProviderSupplier"); + delegate.setMeterProvider(meterProviderSupplier); + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setAggregationTemporalitySelector( + AggregationTemporalitySelector aggregationTemporalitySelector) { + requireNonNull(aggregationTemporalitySelector, "aggregationTemporalitySelector"); + this.aggregationTemporalitySelector = aggregationTemporalitySelector; + return this; + } + + @CanIgnoreReturnValue + public KafkaMetricExporterBuilder setDefaultAggregationSelector( + DefaultAggregationSelector defaultAggregationSelector) { + requireNonNull(defaultAggregationSelector, "defaultAggregationSelector"); + this.defaultAggregationSelector = defaultAggregationSelector; + return this; + } + + public KafkaMetricExporter build() { + return new KafkaMetricExporter( + delegate, delegate.build(), aggregationTemporalitySelector, defaultAggregationSelector); + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporter.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporter.java new file mode 100644 index 000000000..662725e07 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporter.java @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.impl; + +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterMetrics; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Generic Kafka exporter. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +@SuppressWarnings("checkstyle:JavadocMethod") +public final class KafkaExporter { + + private static final Logger internalLogger = Logger.getLogger(KafkaExporter.class.getName()); + + private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); + + private final AtomicBoolean isShutdown = new AtomicBoolean(); + + private final String type; + + private final KafkaSender kafkaSender; + + private final ExporterMetrics exporterMetrics; + + public KafkaExporter( + String exporterName, + String type, + KafkaSender kafkaSender, + Supplier meterProviderSupplier) { + this.type = type; + this.kafkaSender = kafkaSender; + this.exporterMetrics = ExporterMetrics.createGrpc(exporterName, type, meterProviderSupplier); + } + + public CompletableResultCode export(T exportRequest, int numItems) { + if (isShutdown.get()) { + return CompletableResultCode.ofFailure(); + } + + exporterMetrics.addSeen(numItems); + + CompletableResultCode result = new CompletableResultCode(); + + kafkaSender.send( + exportRequest, + () -> { + exporterMetrics.addSuccess(numItems); + result.succeed(); + }, + (response, throwable) -> { + exporterMetrics.addFailed(numItems); + if (logger.isLoggable(Level.FINEST)) { + logger.log( + Level.FINEST, "Failed to export " + type + "s. Details follow: " + throwable); + } + result.fail(); + }); + + return result; + } + + public CompletableResultCode shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + logger.log(Level.INFO, "Calling shutdown() multiple times."); + return CompletableResultCode.ofSuccess(); + } + return kafkaSender.shutdown(); + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporterBuilder.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporterBuilder.java new file mode 100644 index 000000000..ae2ff1b85 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaExporterBuilder.java @@ -0,0 +1,139 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.impl; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.time.Duration; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * A builder for. {@link KafkaExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaExporterBuilder { + + private static final Logger LOGGER = Logger.getLogger(KafkaExporterBuilder.class.getName()); + + private final String exporterName; + + private final String type; + + private String protocolVersion = "2.0.0"; + + private String brokers = "localhost:9092"; + + /** otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs */ + private String topic = "topic"; + + /** in milliseconds */ + private Long timeout = 1000L; + + @Nullable private RetryPolicy retryPolicy; + + private Supplier meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider; + + public KafkaExporterBuilder(String exporterName, String type) { + this.exporterName = exporterName; + this.type = type; + } + + @CanIgnoreReturnValue + public KafkaExporterBuilder setProtocolVersion(String protocolVersion) { + this.protocolVersion = protocolVersion; + return this; + } + + @CanIgnoreReturnValue + public KafkaExporterBuilder setBrokers(String brokers) { + this.brokers = brokers; + return this; + } + + @CanIgnoreReturnValue + public KafkaExporterBuilder setTopic(String topic) { + this.topic = topic; + return this; + } + + @CanIgnoreReturnValue + public KafkaExporterBuilder setTimeout(long timeout, TimeUnit unit) { + this.timeout = unit.toMillis(timeout); + return this; + } + + @CanIgnoreReturnValue + public KafkaExporterBuilder setTimeout(Duration timeout) { + return setTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + + @CanIgnoreReturnValue + public KafkaExporterBuilder setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + @CanIgnoreReturnValue + public KafkaExporterBuilder setMeterProvider(Supplier meterProviderSupplier) { + this.meterProviderSupplier = meterProviderSupplier; + return this; + } + + @SuppressWarnings("BuilderReturnThis") + public KafkaExporterBuilder copy() { + KafkaExporterBuilder copy = new KafkaExporterBuilder<>(exporterName, type); + + copy.protocolVersion = protocolVersion; + copy.brokers = brokers; + copy.topic = topic; + copy.timeout = timeout; + if (retryPolicy != null) { + copy.retryPolicy = retryPolicy.toBuilder().build(); + } + copy.meterProviderSupplier = meterProviderSupplier; + return copy; + } + + public KafkaExporter build() { + KafkaSenderProvider kafkaSenderProvider = new KafkaSenderProvider(); + KafkaSender kafkaSender = kafkaSenderProvider.createSender(brokers, topic, timeout); + LOGGER.log(Level.FINE, "Using KafkaSender: " + kafkaSender.getClass().getName()); + return new KafkaExporter<>(exporterName, type, kafkaSender, meterProviderSupplier); + } + + public String toString(boolean includePrefixAndSuffix) { + StringJoiner joiner = + includePrefixAndSuffix + ? new StringJoiner(", ", "KafkaExporterBuilder{", "}") + : new StringJoiner(", "); + joiner.add("exporterName=" + exporterName); + joiner.add("type=" + type); + joiner.add("brokers=" + brokers); + joiner.add("topic=" + topic); + joiner.add("timeout=" + timeout); + if (retryPolicy != null) { + joiner.add("retryPolicy=" + retryPolicy); + } + // Note: omit tlsConfigHelper because we can't log the configuration in any readable way + // Note: omit meterProviderSupplier because we can't log the configuration in any readable way + return joiner.toString(); + } + + @Override + public String toString() { + return toString(true); + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSender.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSender.java new file mode 100644 index 000000000..c15356990 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSender.java @@ -0,0 +1,97 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.impl; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.Bytes; + +/** + * An exporter of a messages encoded by {@link Marshaler} using the gRPC wire format. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class KafkaSender { + + private static final Logger logger = Logger.getLogger(KafkaSender.class.getName()); + + private final String topic; + + private final long timeout; + + private final KafkaProducer kafkaProducer; + + public KafkaSender(String brokers, String topic, long timeout) { + this.topic = topic; + this.timeout = timeout; + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", brokers); + properties.setProperty("request.timeout.ms", "" + this.timeout); + this.kafkaProducer = + new KafkaProducer(properties, new StringSerializer(), new BytesSerializer()); + } + + public void send(T request, Runnable onSuccess, BiConsumer onFailure) { + try { + byte[] byteArray = new byte[request.getBinarySerializedSize()]; + ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray); + OutputStream outputStream = new ByteBufferOutputStream(byteBuffer); + request.writeBinaryTo(outputStream); + // TODO + ProducerRecord producerRecord = + new ProducerRecord(topic, null, Bytes.wrap(byteArray)); + Callback callback = + new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception exception) { + if (exception != null) { + if (onFailure != null) { + try { + onFailure.accept(recordMetadata, exception); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "onFailure failed.", e); + } + } + } else { + if (onSuccess != null) { + try { + onSuccess.run(); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "onSuccess failed.", e); + } + } + } + } + }; + Future future = kafkaProducer.send(producerRecord, callback); + if (future == null) { + logger.log(Level.WARNING, "send to kafka failed."); + } + } catch (Exception e) { + logger.log(Level.WARNING, "send to kafka failed.", e); + } + } + + /** Shutdown the sender. */ + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSenderProvider.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSenderProvider.java new file mode 100644 index 000000000..4b20cb745 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/impl/KafkaSenderProvider.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.impl; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; + +/** + * A service provider interface (SPI) for providing {@link KafkaSender}s backed by different client + * libraries. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class KafkaSenderProvider { + + /** Returns a {@link KafkaSender} configured with the provided parameters. */ + @SuppressWarnings("TooManyParameters") + public KafkaSender createSender( + String brokers, String topic, long timeout) { + return new KafkaSender(brokers, topic, timeout); + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaLogRecordExporterProvider.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaLogRecordExporterProvider.java new file mode 100644 index 000000000..f3f848554 --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaLogRecordExporterProvider.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.internal; + +import io.opentelemetry.contrib.kafka.KafkaLogRecordExporter; +import io.opentelemetry.contrib.kafka.KafkaLogRecordExporterBuilder; +import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; +import io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import java.time.Duration; + +/** + * {@link LogRecordExporter} SPI implementation for {@link KafkaLogRecordExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaLogRecordExporterProvider implements ConfigurableLogRecordExporterProvider { + @Override + public LogRecordExporter createExporter(ConfigProperties config) { + KafkaLogRecordExporterBuilder kafkaLogRecordExporterBuilder = KafkaLogRecordExporter.builder(); + String protocolVersion = config.getString("exporter.kafka.logs.protocolversion", "2.0.0"); + String brokers = config.getString("exporter.kafka.logs.brokers", "localhost:9092"); + String topic = config.getString("exporter.kafka.logs.topic", "otlp_logs"); + long timeout = config.getLong("exporter.kafka.logs.timeout", 1000L); + kafkaLogRecordExporterBuilder.setProtocolVersion(protocolVersion); + kafkaLogRecordExporterBuilder.setBrokers(brokers); + kafkaLogRecordExporterBuilder.setTopic(topic); + kafkaLogRecordExporterBuilder.setTimeout(Duration.ofMillis(timeout)); + return kafkaLogRecordExporterBuilder.build(); + } + + @Override + public String getName() { + return "kafka"; + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaMetricExporterProvider.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaMetricExporterProvider.java new file mode 100644 index 000000000..6cb90a11b --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaMetricExporterProvider.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.internal; + +import io.opentelemetry.contrib.kafka.KafkaMetricExporter; +import io.opentelemetry.contrib.kafka.KafkaMetricExporterBuilder; +import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; +import io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.time.Duration; + +/** + * {@link MetricExporter} SPI implementation for {@link KafkaMetricExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaMetricExporterProvider implements ConfigurableMetricExporterProvider { + @Override + public MetricExporter createExporter(ConfigProperties config) { + KafkaMetricExporterBuilder kafkaMetricExporterBuilder = KafkaMetricExporter.builder(); + String protocolVersion = config.getString("exporter.kafka.metrics.protocolversion", "2.0.0"); + String brokers = config.getString("exporter.kafka.metrics.brokers", "localhost:9092"); + String topic = config.getString("exporter.kafka.metrics.topic", "otlp_metrics"); + long timeout = config.getLong("exporter.kafka.metrics.timeout", 1000L); + kafkaMetricExporterBuilder.setProtocolVersion(protocolVersion); + kafkaMetricExporterBuilder.setBrokers(brokers); + kafkaMetricExporterBuilder.setTopic(topic); + kafkaMetricExporterBuilder.setTimeout(Duration.ofMillis(timeout)); + return kafkaMetricExporterBuilder.build(); + } + + @Override + public String getName() { + return "kafka"; + } +} diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaSpanExporterProvider.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaSpanExporterProvider.java new file mode 100644 index 000000000..f50deab0d --- /dev/null +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/internal/KafkaSpanExporterProvider.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.internal; + +import io.opentelemetry.contrib.kafka.KafkaSpanExporter; +import io.opentelemetry.contrib.kafka.KafkaSpanExporterBuilder; +import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; +import io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider; +import io.opentelemetry.sdk.trace.export.SpanExporter; + +// import java.time.Duration; + +/** + * {@link SpanExporter} SPI implementation for {@link KafkaSpanExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class KafkaSpanExporterProvider implements ConfigurableSpanExporterProvider { + @Override + public SpanExporter createExporter(ConfigProperties config) { + KafkaSpanExporterBuilder kafkaSpanExporterBuilder = KafkaSpanExporter.newBuilder(); + // String protocolVersion = config.getString("exporter.kafka.spans.protocolversion", "2.0.0"); + // String brokers = config.getString("exporter.kafka.spans.brokers", "localhost:9092"); + // String topic = config.getString("exporter.kafka.spans.topic", "otlp_spans"); + // long timeout = config.getLong("exporter.kafka.spans.timeout", 1000L); + // kafkaSpanExporterBuilder.setProtocolVersion(protocolVersion); + // kafkaSpanExporterBuilder.setBrokers(brokers); + // kafkaSpanExporterBuilder.setTopic(topic); + // kafkaSpanExporterBuilder.setTimeout(Duration.ofMillis(timeout)); + return kafkaSpanExporterBuilder.build(); + } + + @Override + public String getName() { + return "kafka"; + } +} diff --git a/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider b/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider new file mode 100644 index 000000000..a685aff45 --- /dev/null +++ b/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider @@ -0,0 +1 @@ +io.opentelemetry.contrib.kafka.internal.KafkaLogRecordExporterProvider diff --git a/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider b/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider new file mode 100644 index 000000000..f13bf12b3 --- /dev/null +++ b/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider @@ -0,0 +1 @@ +io.opentelemetry.contrib.kafka.internal.KafkaMetricExporterProvider diff --git a/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider b/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider new file mode 100644 index 000000000..6f8327211 --- /dev/null +++ b/kafka-exporter/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider @@ -0,0 +1 @@ +io.opentelemetry.contrib.kafka.internal.KafkaSpanExporterProvider diff --git a/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterTest.java b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterTest.java new file mode 100644 index 000000000..cc8dc1b98 --- /dev/null +++ b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaLogRecordExporterTest.java @@ -0,0 +1,195 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import static io.opentelemetry.api.common.AttributeKey.booleanKey; +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; + +import com.google.common.collect.ImmutableMap; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.logs.TestLogRecordData; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.Bytes; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers(disabledWithoutDocker = true) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class KafkaLogRecordExporterTest { + + private static final Resource RESOURCE = + Resource.create(Attributes.builder().put("key", "value").build()); + + private static final LogRecordData LOG1 = + TestLogRecordData.builder() + .setResource(RESOURCE) + .setInstrumentationScopeInfo( + InstrumentationScopeInfo.builder("instrumentation") + .setVersion("1") + .setAttributes(Attributes.builder().put("key", "value").build()) + .build()) + .setBody("body1") + .setSeverity(Severity.INFO) + .setSeverityText("INFO") + .setTimestamp(100L, TimeUnit.NANOSECONDS) + .setObservedTimestamp(200L, TimeUnit.NANOSECONDS) + .setAttributes(Attributes.of(stringKey("animal"), "cat", longKey("lives"), 9L)) + .setSpanContext( + SpanContext.create( + "12345678876543211234567887654322", + "8765432112345876", + TraceFlags.getDefault(), + TraceState.getDefault())) + .build(); + + private static final LogRecordData LOG2 = + TestLogRecordData.builder() + .setResource(RESOURCE) + .setInstrumentationScopeInfo( + InstrumentationScopeInfo.builder("instrumentation2").setVersion("2").build()) + .setBody("body2") + .setSeverity(Severity.INFO) + .setSeverityText("INFO") + .setTimestamp(100L, TimeUnit.NANOSECONDS) + .setObservedTimestamp(200L, TimeUnit.NANOSECONDS) + .setAttributes(Attributes.of(booleanKey("important"), true)) + .setSpanContext( + SpanContext.create( + "12345678876543211234567887654322", + "8765432112345875", + TraceFlags.getDefault(), + TraceState.getDefault())) + .build(); + + private static final DockerImageName KAFKA_TEST_IMAGE = + DockerImageName.parse("confluentinc/cp-kafka:6.2.1"); + + private KafkaContainer kafka; + + private KafkaConsumer consumer; + + private LogRecordExporter exporter; + + @BeforeAll + void initialize() throws Exception { + kafka = new KafkaContainer(KAFKA_TEST_IMAGE); + kafka.start(); + String bootstrapServers = kafka.getBootstrapServers(); + KafkaLogRecordExporterBuilder kafkaLogRecordExporterBuilder = KafkaLogRecordExporter.builder(); + kafkaLogRecordExporterBuilder.setProtocolVersion("2.0.0"); + kafkaLogRecordExporterBuilder.setBrokers(bootstrapServers); + kafkaLogRecordExporterBuilder.setTopic("otlp_logs"); + kafkaLogRecordExporterBuilder.setTimeout(Duration.ofSeconds(1L)); + exporter = kafkaLogRecordExporterBuilder.build(); + + try (AdminClient adminClient = + AdminClient.create( + ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) { + consumer = + new KafkaConsumer( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, + "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"), + new StringDeserializer(), + new BytesDeserializer()); + + List topics = Collections.singletonList(new NewTopic("otlp_logs", 1, (short) 1)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList("otlp_logs")); + } + } + + @AfterAll + void cleanup() { + Awaitility.await() + .untilAsserted(() -> Assertions.assertThat(exporter.shutdown().isSuccess()).isTrue()); + Awaitility.await() + .untilAsserted( + () -> + Assertions.assertThat(exporter.export(Collections.emptyList()).isSuccess()) + .isFalse()); + consumer.unsubscribe(); + kafka.close(); + } + + @Test + void export() throws Exception { + CompletableResultCode actual = exporter.export(Arrays.asList(LOG1, LOG2)); + + Awaitility.await() + .untilAsserted( + () -> { + Assertions.assertThat(actual.isSuccess()).isTrue(); + Assertions.assertThat(actual.isDone()).isTrue(); + }); + Unreliables.retryUntilTrue( + 10, + TimeUnit.SECONDS, + () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + if (records.isEmpty()) { + return false; + } + + LogsRequestMarshaler logsRequestMarshaler = + LogsRequestMarshaler.create(Arrays.asList(LOG1, LOG2)); + byte[] byteArray = new byte[logsRequestMarshaler.getBinarySerializedSize()]; + ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray); + OutputStream outputStream = new ByteBufferOutputStream(byteBuffer); + logsRequestMarshaler.writeBinaryTo(outputStream); + Bytes expected = new Bytes(byteArray); + + Assertions.assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(Assertions.tuple("otlp_logs", null, expected)); + + return true; + }); + } +} diff --git a/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterTest.java b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterTest.java new file mode 100644 index 000000000..bed4d8839 --- /dev/null +++ b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaMetricExporterTest.java @@ -0,0 +1,194 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; + +import com.google.common.collect.ImmutableMap; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.resources.Resource; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.Bytes; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers(disabledWithoutDocker = true) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class KafkaMetricExporterTest { + + private static final Resource RESOURCE = + Resource.create(Attributes.builder().put("key", "value").build()); + + private static final MetricData METRIC1 = + ImmutableMetricData.createDoubleSum( + RESOURCE, + InstrumentationScopeInfo.builder("instrumentation") + .setVersion("1") + .setAttributes(Attributes.builder().put("key", "value").build()) + .build(), + "metric1", + "metric1 description", + "m", + ImmutableSumData.create( + true, + AggregationTemporality.CUMULATIVE, + Arrays.asList( + ImmutableDoublePointData.create( + 1, 2, Attributes.of(stringKey("cat"), "meow"), 4)))); + + private static final MetricData METRIC2 = + ImmutableMetricData.createDoubleSum( + RESOURCE, + InstrumentationScopeInfo.builder("instrumentation2").setVersion("2").build(), + "metric2", + "metric2 description", + "s", + ImmutableSumData.create( + true, + AggregationTemporality.CUMULATIVE, + Arrays.asList( + ImmutableDoublePointData.create( + 1, 2, Attributes.of(stringKey("cat"), "meow"), 4)))); + + private static final DockerImageName KAFKA_TEST_IMAGE = + DockerImageName.parse("confluentinc/cp-kafka:6.2.1"); + + private KafkaContainer kafka; + + private KafkaConsumer consumer; + + private MetricExporter exporter; + + @BeforeAll + void initialize() throws Exception { + kafka = new KafkaContainer(KAFKA_TEST_IMAGE); + kafka.start(); + String bootstrapServers = kafka.getBootstrapServers(); + KafkaMetricExporterBuilder kafkaMetricExporterBuilder = KafkaMetricExporter.builder(); + kafkaMetricExporterBuilder.setProtocolVersion("2.0.0"); + kafkaMetricExporterBuilder.setBrokers(bootstrapServers); + kafkaMetricExporterBuilder.setTopic("otlp_metrics"); + kafkaMetricExporterBuilder.setTimeout(Duration.ofSeconds(1L)); + exporter = kafkaMetricExporterBuilder.build(); + + try (AdminClient adminClient = + AdminClient.create( + ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) { + consumer = + new KafkaConsumer( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, + "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"), + new StringDeserializer(), + new BytesDeserializer()); + + List topics = Collections.singletonList(new NewTopic("otlp_metrics", 1, (short) 1)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList("otlp_metrics")); + } + } + + @AfterAll + void cleanup() { + Awaitility.await() + .untilAsserted(() -> Assertions.assertThat(exporter.shutdown().isSuccess()).isTrue()); + Awaitility.await() + .untilAsserted( + () -> + Assertions.assertThat(exporter.export(Collections.emptyList()).isSuccess()) + .isFalse()); + consumer.unsubscribe(); + kafka.close(); + } + + @Test + void getAggregationTemporality() { + Assertions.assertThat(exporter.getAggregationTemporality(InstrumentType.COUNTER)) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + + @Test + void export() throws Exception { + CompletableResultCode actual = exporter.export(Arrays.asList(METRIC1, METRIC2)); + + Awaitility.await() + .untilAsserted( + () -> { + Assertions.assertThat(actual.isSuccess()).isTrue(); + Assertions.assertThat(actual.isDone()).isTrue(); + }); + Unreliables.retryUntilTrue( + 10, + TimeUnit.SECONDS, + () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + if (records.isEmpty()) { + return false; + } + + MetricsRequestMarshaler metricsRequestMarshaler = + MetricsRequestMarshaler.create(Arrays.asList(METRIC1, METRIC2)); + byte[] byteArray = new byte[metricsRequestMarshaler.getBinarySerializedSize()]; + ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray); + OutputStream outputStream = new ByteBufferOutputStream(byteBuffer); + metricsRequestMarshaler.writeBinaryTo(outputStream); + Bytes expected = new Bytes(byteArray); + + Assertions.assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(Assertions.tuple("otlp_metrics", null, expected)); + + return true; + }); + } + + @Test + void flush() { + Assertions.assertThat(exporter.flush().isSuccess()).isTrue(); + } +} diff --git a/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/internal/KafkaExporterProviderTest.java b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/internal/KafkaExporterProviderTest.java new file mode 100644 index 000000000..2dd1bd074 --- /dev/null +++ b/kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/internal/KafkaExporterProviderTest.java @@ -0,0 +1,45 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.kafka.internal; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; + +import io.opentelemetry.contrib.kafka.KafkaLogRecordExporter; +import io.opentelemetry.contrib.kafka.KafkaMetricExporter; +// import io.opentelemetry.contrib.kafka.KafkaSpanExporter; +import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class KafkaExporterProviderTest { + + @Test + void logRecordExporterProvider() { + KafkaLogRecordExporterProvider provider = new KafkaLogRecordExporterProvider(); + assertThat(provider.getName()).isEqualTo("kafka"); + assertThat( + provider.createExporter(DefaultConfigProperties.createFromMap(Collections.emptyMap()))) + .isInstanceOf(KafkaLogRecordExporter.class); + } + + @Test + void metricExporterProvider() { + KafkaMetricExporterProvider provider = new KafkaMetricExporterProvider(); + assertThat(provider.getName()).isEqualTo("kafka"); + assertThat( + provider.createExporter(DefaultConfigProperties.createFromMap(Collections.emptyMap()))) + .isInstanceOf(KafkaMetricExporter.class); + } + + // @Test + // void spanExporterProvider() { + // KafkaSpanExporterProvider provider = new KafkaSpanExporterProvider(); + // assertThat(provider.getName()).isEqualTo("kafka"); + // assertThat( + // provider.createExporter(DefaultConfigProperties.createFromMap(Collections.emptyMap()))) + // .isInstanceOf(KafkaSpanExporter.class); + // } +}