|
7 | 7 |
|
8 | 8 | import static org.assertj.core.api.Assertions.assertThatThrownBy; |
9 | 9 |
|
10 | | -import java.lang.reflect.InvocationHandler; |
11 | | -import java.lang.reflect.Method; |
| 10 | +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; |
| 11 | +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; |
12 | 12 | import java.lang.reflect.Proxy; |
| 13 | +import java.time.Duration; |
13 | 14 | import org.apache.kafka.clients.consumer.Consumer; |
14 | 15 | import org.apache.kafka.clients.producer.Producer; |
15 | 16 | import org.junit.jupiter.api.Test; |
| 17 | +import org.junit.jupiter.api.extension.RegisterExtension; |
16 | 18 |
|
17 | | -class ExceptionHandlingTest extends KafkaClientBaseTest { |
| 19 | +class ExceptionHandlingTest { |
| 20 | + |
| 21 | + @RegisterExtension |
| 22 | + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); |
18 | 23 |
|
19 | 24 | @Test |
20 | | - void testConsumerPropagatesException() { |
21 | | - Consumer<?, ?> throwingConsumer = |
| 25 | + void testConsumerExceptionPropagatesToCaller() { |
| 26 | + Consumer<?, ?> consumer = |
22 | 27 | (Consumer<?, ?>) |
23 | 28 | Proxy.newProxyInstance( |
24 | | - getClass().getClassLoader(), |
| 29 | + ExceptionHandlingTest.class.getClassLoader(), |
25 | 30 | new Class<?>[] {Consumer.class}, |
26 | | - new InvocationHandler() { |
27 | | - @Override |
28 | | - public Object invoke(Object proxy, Method method, Object[] args) { |
29 | | - throw new IllegalStateException("Test exception"); |
30 | | - } |
| 31 | + (proxy, method, args) -> { |
| 32 | + throw new IllegalStateException("can't invoke"); |
31 | 33 | }); |
32 | | - Consumer<?, ?> wrappedConsumer = |
33 | | - KafkaTelemetry.create(testing.getOpenTelemetry()).wrap(throwingConsumer); |
34 | | - assertThatThrownBy(() -> wrappedConsumer.poll(null)) |
| 34 | + |
| 35 | + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); |
| 36 | + Consumer<?, ?> wrappedConsumer = telemetry.wrap(consumer); |
| 37 | + |
| 38 | + assertThatThrownBy(() -> wrappedConsumer.poll(Duration.ofMillis(1))) |
35 | 39 | .isInstanceOf(IllegalStateException.class) |
36 | | - .hasMessage("Test exception"); |
| 40 | + .hasMessage("can't invoke"); |
37 | 41 | } |
38 | 42 |
|
39 | 43 | @Test |
40 | | - void testProducerPropagatesException() { |
41 | | - Producer<?, ?> throwingProducer = |
| 44 | + void testProducerExceptionPropagatesToCaller() { |
| 45 | + Producer<?, ?> producer = |
42 | 46 | (Producer<?, ?>) |
43 | 47 | Proxy.newProxyInstance( |
44 | | - getClass().getClassLoader(), |
| 48 | + ExceptionHandlingTest.class.getClassLoader(), |
45 | 49 | new Class<?>[] {Producer.class}, |
46 | | - new InvocationHandler() { |
47 | | - @Override |
48 | | - public Object invoke(Object proxy, Method method, Object[] args) { |
49 | | - throw new IllegalStateException("Test exception"); |
50 | | - } |
| 50 | + (proxy, method, args) -> { |
| 51 | + throw new IllegalStateException("can't invoke"); |
51 | 52 | }); |
52 | | - Producer<?, ?> wrappedProducer = |
53 | | - KafkaTelemetry.create(testing.getOpenTelemetry()).wrap(throwingProducer); |
54 | | - assertThatThrownBy(() -> wrappedProducer.send(null)) |
| 53 | + |
| 54 | + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); |
| 55 | + Producer<?, ?> wrappedProducer = telemetry.wrap(producer); |
| 56 | + assertThatThrownBy(wrappedProducer::flush) |
55 | 57 | .isInstanceOf(IllegalStateException.class) |
56 | | - .hasMessage("Test exception"); |
| 58 | + .hasMessage("can't invoke"); |
57 | 59 | } |
58 | 60 | } |
0 commit comments