Skip to content

Commit 8f22e5a

Browse files
authored
implement kafka-exporter (#965)
1 parent 6ecf34d commit 8f22e5a

15 files changed

+954
-1
lines changed

.github/component_owners.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,6 @@ components:
6060
- trask
6161
static-instrumenter:
6262
- anosek-an
63+
kafka-exporter:
64+
- spockz
65+
- vincentfree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ feature or via instrumentation, this project is hopefully for you.
2323
* [Runtime Attach](./runtime-attach/README.md)
2424
* [Samplers](./samplers/README.md)
2525
* [Static Instrumenter](./static-instrumenter/README.md)
26+
* [Kafka Support](./kafka-exporter/README.md)
2627

2728
## Getting Started
2829

dependencyManagement/build.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ val DEPENDENCIES = listOf(
5555
"org.awaitility:awaitility:4.2.0",
5656
"org.bouncycastle:bcpkix-jdk15on:1.70",
5757
"org.junit-pioneer:junit-pioneer:1.9.1",
58-
"org.skyscreamer:jsonassert:1.5.1"
58+
"org.skyscreamer:jsonassert:1.5.1",
59+
"org.apache.kafka:kafka-clients:3.5.0",
60+
"org.testcontainers:kafka:1.18.3"
5961
)
6062

6163
javaPlatform {

kafka-exporter/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Exporting SpanData to Kafka
2+
3+
This module contains `KafkaSpanExporter`, which is an implementation of the `io.opentelemetry.sdk.trace.export.SpanExporter` interface.
4+
5+
`KafkaSpanExporter` can be used for sending `SpanData` to a Kafka topic.
6+
7+
## Usage
8+
9+
In order to instantiate a `KafkaSpanExporter`, you either need to pass a Kafka `Producer` or the configuration of a Kafka `Producer` together with key and value serializers.
10+
You also need to pass the topic to which the SpanData need to be sent.
11+
For a sample usage, see `KafkaSpanExporterIntegrationTest`.
12+
13+
## Component owners
14+
15+
- [Alessandro Vermeulen](https://github.com/spockz)
16+
- [Vincent Free](https://github.com/vincentfree)
17+
18+
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).

kafka-exporter/build.gradle.kts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
plugins {
2+
id("otel.java-conventions")
3+
id("otel.publish-conventions")
4+
}
5+
6+
description = "SpanExporter based on Kafka"
7+
otelJava.moduleName.set("io.opentelemetry.contrib.kafka")
8+
9+
dependencies {
10+
api("io.opentelemetry:opentelemetry-sdk-trace")
11+
api("io.opentelemetry:opentelemetry-sdk-common")
12+
api("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha")
13+
api("org.apache.kafka:kafka-clients")
14+
15+
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
16+
compileOnly("com.google.auto.service:auto-service-annotations")
17+
compileOnly("com.google.auto.value:auto-value-annotations")
18+
compileOnly("org.slf4j:slf4j-api")
19+
20+
runtimeOnly("com.fasterxml.jackson.core:jackson-core")
21+
runtimeOnly("com.fasterxml.jackson.core:jackson-databind")
22+
23+
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
24+
implementation("com.google.protobuf:protobuf-java")
25+
26+
testImplementation("io.opentelemetry:opentelemetry-api")
27+
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
28+
testImplementation("com.google.guava:guava")
29+
testImplementation("org.testcontainers:junit-jupiter")
30+
testImplementation("org.testcontainers:kafka")
31+
testImplementation("org.rnorth.duct-tape:duct-tape")
32+
testImplementation("org.testcontainers:testcontainers")
33+
34+
testRuntimeOnly("org.slf4j:slf4j-simple")
35+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.kafka;
7+
8+
import io.opentelemetry.sdk.common.CompletableResultCode;
9+
import io.opentelemetry.sdk.trace.data.SpanData;
10+
import io.opentelemetry.sdk.trace.export.SpanExporter;
11+
import java.time.Duration;
12+
import java.util.ArrayList;
13+
import java.util.Collection;
14+
import java.util.List;
15+
import java.util.concurrent.CompletableFuture;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import javax.annotation.Nonnull;
20+
import javax.annotation.concurrent.ThreadSafe;
21+
import org.apache.kafka.clients.producer.Producer;
22+
import org.apache.kafka.clients.producer.ProducerRecord;
23+
import org.apache.kafka.common.KafkaException;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
@ThreadSafe
28+
@SuppressWarnings("FutureReturnValueIgnored")
29+
public class KafkaSpanExporter implements SpanExporter {
30+
private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class);
31+
private final String topicName;
32+
private final Producer<String, Collection<SpanData>> producer;
33+
private final ExecutorService executorService;
34+
private final long timeoutInSeconds;
35+
private final AtomicBoolean isShutdown = new AtomicBoolean();
36+
37+
public static KafkaSpanExporterBuilder newBuilder() {
38+
return new KafkaSpanExporterBuilder();
39+
}
40+
41+
KafkaSpanExporter(
42+
String topicName,
43+
Producer<String, Collection<SpanData>> producer,
44+
ExecutorService executorService,
45+
long timeoutInSeconds) {
46+
this.topicName = topicName;
47+
this.producer = producer;
48+
this.executorService = executorService;
49+
this.timeoutInSeconds = timeoutInSeconds;
50+
}
51+
52+
@Override
53+
public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {
54+
if (isShutdown.get()) {
55+
return CompletableResultCode.ofFailure();
56+
}
57+
ProducerRecord<String, Collection<SpanData>> producerRecord =
58+
new ProducerRecord<>(topicName, spans);
59+
60+
CompletableResultCode result = new CompletableResultCode();
61+
CompletableFuture.runAsync(
62+
() ->
63+
producer.send(
64+
producerRecord,
65+
(metadata, exception) -> {
66+
if (exception == null) {
67+
result.succeed();
68+
} else {
69+
logger.error(
70+
String.format("Error while sending spans to Kafka topic %s", topicName),
71+
exception);
72+
result.fail();
73+
}
74+
}),
75+
executorService);
76+
return result;
77+
}
78+
79+
@Override
80+
public CompletableResultCode flush() {
81+
CompletableResultCode result = new CompletableResultCode();
82+
CompletableFuture.runAsync(producer::flush, executorService)
83+
.handle(
84+
(unused, exception) -> {
85+
if (exception == null) {
86+
result.succeed();
87+
} else {
88+
logger.error(
89+
String.format(
90+
"Error while performing the flush operation on topic %s", topicName),
91+
exception);
92+
result.fail();
93+
}
94+
return true;
95+
});
96+
return result;
97+
}
98+
99+
private CompletableResultCode shutdownExecutorService() {
100+
try {
101+
executorService.shutdown();
102+
boolean terminated = executorService.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS);
103+
if (!terminated) {
104+
List<Runnable> interrupted = executorService.shutdownNow();
105+
if (!interrupted.isEmpty()) {
106+
logger.error(
107+
"Shutting down KafkaSpanExporter forced {} tasks to be cancelled.",
108+
interrupted.size());
109+
}
110+
}
111+
return CompletableResultCode.ofSuccess();
112+
} catch (InterruptedException e) {
113+
logger.error("Error when trying to shutdown KafkaSpanExporter executorService.", e);
114+
return CompletableResultCode.ofFailure();
115+
}
116+
}
117+
118+
private CompletableResultCode shutdownProducer() {
119+
try {
120+
producer.close(Duration.ofSeconds(timeoutInSeconds));
121+
return CompletableResultCode.ofSuccess();
122+
} catch (KafkaException e) {
123+
logger.error("Error when trying to shutdown KafkaSpanExporter Producer.", e);
124+
return CompletableResultCode.ofFailure();
125+
}
126+
}
127+
128+
@Override
129+
public CompletableResultCode shutdown() {
130+
if (!isShutdown.compareAndSet(false, true)) {
131+
logger.warn("Calling shutdown() multiple times.");
132+
return CompletableResultCode.ofSuccess();
133+
}
134+
List<CompletableResultCode> codes = new ArrayList<>(2);
135+
codes.add(shutdownExecutorService());
136+
codes.add(shutdownProducer());
137+
return CompletableResultCode.ofAll(codes);
138+
}
139+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.kafka;
7+
8+
import static java.util.Objects.isNull;
9+
import static java.util.Objects.nonNull;
10+
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
11+
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
12+
13+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
14+
import io.opentelemetry.sdk.trace.data.SpanData;
15+
import java.util.Collection;
16+
import java.util.Map;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
19+
import org.apache.kafka.clients.producer.KafkaProducer;
20+
import org.apache.kafka.clients.producer.Producer;
21+
import org.apache.kafka.common.serialization.Serializer;
22+
23+
public class KafkaSpanExporterBuilder {
24+
private static final long DEFAULT_TIMEOUT_IN_SECONDS = 5L;
25+
private String topicName;
26+
private Producer<String, Collection<SpanData>> producer;
27+
private ExecutorService executorService;
28+
private long timeoutInSeconds = DEFAULT_TIMEOUT_IN_SECONDS;
29+
30+
@SuppressWarnings(value = {"NullAway"})
31+
public KafkaSpanExporterBuilder() {}
32+
33+
@CanIgnoreReturnValue
34+
public KafkaSpanExporterBuilder setTopicName(String topicName) {
35+
this.topicName = topicName;
36+
return this;
37+
}
38+
39+
@CanIgnoreReturnValue
40+
public KafkaSpanExporterBuilder setProducer(Producer<String, Collection<SpanData>> producer) {
41+
this.producer = producer;
42+
return this;
43+
}
44+
45+
@CanIgnoreReturnValue
46+
public KafkaSpanExporterBuilder setExecutorService(ExecutorService executorService) {
47+
this.executorService = executorService;
48+
return this;
49+
}
50+
51+
@CanIgnoreReturnValue
52+
public KafkaSpanExporterBuilder setTimeoutInSeconds(long timeoutInSeconds) {
53+
this.timeoutInSeconds = timeoutInSeconds;
54+
return this;
55+
}
56+
57+
public KafkaSpanExporter build() {
58+
if (isNull(topicName)) {
59+
throw new IllegalArgumentException("topicName cannot be null");
60+
}
61+
if (isNull(producer)) {
62+
throw new IllegalArgumentException("producer cannot be null");
63+
}
64+
if (isNull(executorService)) {
65+
executorService = Executors.newCachedThreadPool();
66+
}
67+
return new KafkaSpanExporter(topicName, producer, executorService, timeoutInSeconds);
68+
}
69+
70+
public static class ProducerBuilder {
71+
private Map<String, Object> config;
72+
private Serializer<String> keySerializer;
73+
private Serializer<Collection<SpanData>> valueSerializer;
74+
75+
public static ProducerBuilder newInstance() {
76+
return new ProducerBuilder();
77+
}
78+
79+
@SuppressWarnings(value = {"NullAway"})
80+
public ProducerBuilder() {}
81+
82+
@CanIgnoreReturnValue
83+
public ProducerBuilder setConfig(Map<String, Object> config) {
84+
this.config = config;
85+
return this;
86+
}
87+
88+
@CanIgnoreReturnValue
89+
public ProducerBuilder setKeySerializer(Serializer<String> keySerializer) {
90+
this.keySerializer = keySerializer;
91+
return this;
92+
}
93+
94+
@CanIgnoreReturnValue
95+
public ProducerBuilder setValueSerializer(Serializer<Collection<SpanData>> valueSerializer) {
96+
this.valueSerializer = valueSerializer;
97+
return this;
98+
}
99+
100+
public Producer<String, Collection<SpanData>> build() {
101+
if (isNull(config)) {
102+
throw new IllegalArgumentException("producer configuration cannot be null");
103+
}
104+
boolean correctConfig =
105+
((config.containsKey(KEY_SERIALIZER_CLASS_CONFIG)
106+
&& config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG))
107+
^ (nonNull(keySerializer) && nonNull(valueSerializer)))
108+
&& (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG) ^ nonNull(valueSerializer))
109+
&& (config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG) ^ nonNull(keySerializer));
110+
if (!correctConfig) {
111+
throw new IllegalArgumentException(
112+
"Both the key and value serializers should be provided either in the configuration or by using the corresponding setters");
113+
}
114+
if (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG)) {
115+
return new KafkaProducer<>(config);
116+
}
117+
return new KafkaProducer<>(config, keySerializer, valueSerializer);
118+
}
119+
}
120+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.kafka;
7+
8+
import com.google.protobuf.InvalidProtocolBufferException;
9+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
10+
import java.util.Objects;
11+
import org.apache.kafka.common.errors.SerializationException;
12+
import org.apache.kafka.common.serialization.Deserializer;
13+
14+
public class SpanDataDeserializer implements Deserializer<ExportTraceServiceRequest> {
15+
@SuppressWarnings("NullAway")
16+
@Override
17+
public ExportTraceServiceRequest deserialize(String topic, byte[] data) {
18+
if (Objects.isNull(data)) {
19+
return null;
20+
}
21+
try {
22+
return ExportTraceServiceRequest.parseFrom(data);
23+
} catch (InvalidProtocolBufferException e) {
24+
throw new SerializationException("Error while deserializing data", e);
25+
}
26+
}
27+
}

0 commit comments

Comments
 (0)