Skip to content

Commit d74fe56

Browse files
committed
add Kafka connection failure handling
1 parent 2715df0 commit d74fe56

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,12 @@ public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {
7272
result.fail();
7373
}
7474
}),
75-
executorService);
75+
executorService).whenComplete((ignore, exception) -> {
76+
if (exception != null) {
77+
logger.error("Executor task failed while sending to Kafka topic {}", topicName, exception);
78+
result.fail();
79+
}
80+
});
7681
return result;
7782
}
7883

kafka-exporter/src/test/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterIntegrationTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.opentelemetry.sdk.common.CompletableResultCode;
2020
import io.opentelemetry.sdk.trace.data.SpanData;
2121
import java.time.Duration;
22+
import java.util.Collection;
2223
import java.util.List;
2324
import java.util.UUID;
2425
import java.util.concurrent.TimeUnit;
@@ -29,7 +30,9 @@
2930
import org.apache.kafka.clients.consumer.ConsumerRecord;
3031
import org.apache.kafka.clients.consumer.ConsumerRecords;
3132
import org.apache.kafka.clients.consumer.KafkaConsumer;
33+
import org.apache.kafka.clients.producer.MockProducer;
3234
import org.apache.kafka.clients.producer.ProducerConfig;
35+
import org.apache.kafka.common.KafkaException;
3336
import org.apache.kafka.common.errors.ApiException;
3437
import org.apache.kafka.common.serialization.StringDeserializer;
3538
import org.apache.kafka.common.serialization.StringSerializer;
@@ -155,6 +158,31 @@ void exportWhenProducerInError() {
155158
testSubject.shutdown();
156159
}
157160

161+
@Test
162+
void exportWhenProducerFailsToSend() {
163+
var mockProducer = new MockProducer<String, Collection<SpanData>>();
164+
mockProducer.sendException = new KafkaException("Simulated kafka exception");
165+
var testSubjectWithMockProducer =
166+
KafkaSpanExporter.newBuilder()
167+
.setTopicName(TOPIC)
168+
.setProducer(mockProducer)
169+
.build();
170+
171+
ImmutableList<SpanData> spans =
172+
ImmutableList.of(makeBasicSpan("span-1"), makeBasicSpan("span-2"));
173+
174+
CompletableResultCode actual = testSubjectWithMockProducer.export(spans);
175+
176+
await()
177+
.untilAsserted(
178+
() -> {
179+
assertThat(actual.isSuccess()).isFalse();
180+
assertThat(actual.isDone()).isTrue();
181+
});
182+
183+
testSubjectWithMockProducer.shutdown();
184+
}
185+
158186
@Test
159187
void flush() {
160188
CompletableResultCode actual = testSubject.flush();

0 commit comments

Comments
 (0)