Skip to content

Commit 773eb24

Browse files
authored
[kafka-exporter] Add Kafka connectivity error handling (#2202)
1 parent 81974e2 commit 773eb24

File tree

2 files changed

+48
-15
lines changed

2 files changed

+48
-15
lines changed

kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporter.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,28 @@ public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {
5959

6060
CompletableResultCode result = new CompletableResultCode();
6161
CompletableFuture.runAsync(
62-
() ->
63-
producer.send(
64-
producerRecord,
65-
(metadata, exception) -> {
66-
if (exception == null) {
67-
result.succeed();
68-
} else {
69-
logger.error(
70-
String.format("Error while sending spans to Kafka topic %s", topicName),
71-
exception);
72-
result.fail();
73-
}
74-
}),
75-
executorService);
62+
() ->
63+
producer.send(
64+
producerRecord,
65+
(metadata, exception) -> {
66+
if (exception == null) {
67+
result.succeed();
68+
} else {
69+
logger.error(
70+
String.format("Error while sending spans to Kafka topic %s", topicName),
71+
exception);
72+
result.fail();
73+
}
74+
}),
75+
executorService)
76+
.whenComplete(
77+
(ignore, exception) -> {
78+
if (exception != null) {
79+
logger.error(
80+
"Executor task failed while sending to Kafka topic {}", topicName, exception);
81+
result.fail();
82+
}
83+
});
7684
return result;
7785
}
7886

kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterIntegrationTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.opentelemetry.sdk.common.CompletableResultCode;
2020
import io.opentelemetry.sdk.trace.data.SpanData;
2121
import java.time.Duration;
22+
import java.util.Collection;
2223
import java.util.List;
2324
import java.util.UUID;
2425
import java.util.concurrent.TimeUnit;
@@ -29,7 +30,9 @@
2930
import org.apache.kafka.clients.consumer.ConsumerRecord;
3031
import org.apache.kafka.clients.consumer.ConsumerRecords;
3132
import org.apache.kafka.clients.consumer.KafkaConsumer;
33+
import org.apache.kafka.clients.producer.MockProducer;
3234
import org.apache.kafka.clients.producer.ProducerConfig;
35+
import org.apache.kafka.common.KafkaException;
3336
import org.apache.kafka.common.errors.ApiException;
3437
import org.apache.kafka.common.serialization.StringDeserializer;
3538
import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,7 +49,7 @@
4649
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
4750
class KafkaSpanExporterIntegrationTest {
4851
private static final DockerImageName KAFKA_TEST_IMAGE =
49-
DockerImageName.parse("apache/kafka:3.8.1");
52+
DockerImageName.parse("apache/kafka:3.9.1");
5053
private static final String TOPIC = "span_topic";
5154
private KafkaContainer kafka;
5255
private KafkaConsumer<String, ExportTraceServiceRequest> consumer;
@@ -155,6 +158,28 @@ void exportWhenProducerInError() {
155158
testSubject.shutdown();
156159
}
157160

161+
@Test
162+
void exportWhenProducerFailsToSend() {
163+
var mockProducer = new MockProducer<String, Collection<SpanData>>();
164+
mockProducer.sendException = new KafkaException("Simulated kafka exception");
165+
var testSubjectWithMockProducer =
166+
KafkaSpanExporter.newBuilder().setTopicName(TOPIC).setProducer(mockProducer).build();
167+
168+
ImmutableList<SpanData> spans =
169+
ImmutableList.of(makeBasicSpan("span-1"), makeBasicSpan("span-2"));
170+
171+
CompletableResultCode actual = testSubjectWithMockProducer.export(spans);
172+
173+
await()
174+
.untilAsserted(
175+
() -> {
176+
assertThat(actual.isSuccess()).isFalse();
177+
assertThat(actual.isDone()).isTrue();
178+
});
179+
180+
testSubjectWithMockProducer.shutdown();
181+
}
182+
158183
@Test
159184
void flush() {
160185
CompletableResultCode actual = testSubject.flush();

0 commit comments

Comments
 (0)