Skip to content

Commit 810c222

Browse files
committed
more
1 parent 41c767c commit 810c222

File tree

7 files changed

+87
-154
lines changed

7 files changed

+87
-154
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
5151
// timer should be started before fetching ConsumerRecords, but there is no callback for that
5252
Timer timer = Timer.start();
5353
Context receiveContext =
54-
telemetry.getConsumerTelemetry().buildAndFinishSpan(records, consumerGroup, clientId, timer);
54+
telemetry
55+
.getConsumerTelemetry()
56+
.buildAndFinishSpan(records, consumerGroup, clientId, timer);
5557
if (receiveContext == null) {
5658
receiveContext = Context.current();
5759
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ public void configure(Map<String, ?> configs) {
6565

6666
KafkaConsumerTelemetrySupplier supplier =
6767
getProperty(
68-
configs, CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER, KafkaConsumerTelemetrySupplier.class);
68+
configs,
69+
CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER,
70+
KafkaConsumerTelemetrySupplier.class);
6971
this.consumerTelemetry = supplier.get();
7072
}
7173

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ public void configure(Map<String, ?> configs) {
4949

5050
KafkaProducerTelemetrySupplier supplier =
5151
getProperty(
52-
configs, CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER, KafkaProducerTelemetrySupplier.class);
52+
configs,
53+
CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER,
54+
KafkaProducerTelemetrySupplier.class);
5355
this.producerTelemetry = supplier.get();
5456
}
5557

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptorTest.java

Lines changed: 5 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,18 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
77

8-
import static org.assertj.core.api.Assertions.assertThat;
98
import static org.assertj.core.api.Assertions.assertThatThrownBy;
109

1110
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
1211
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1312
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
14-
import java.io.ByteArrayInputStream;
15-
import java.io.ByteArrayOutputStream;
1613
import java.io.IOException;
17-
import java.io.InputStream;
18-
import java.io.ObjectInputStream;
19-
import java.io.ObjectOutputStream;
20-
import java.io.ObjectStreamClass;
2114
import java.util.HashMap;
2215
import java.util.Map;
2316
import java.util.function.Supplier;
2417
import org.apache.kafka.clients.consumer.ConsumerConfig;
2518
import org.apache.kafka.clients.consumer.KafkaConsumer;
2619
import org.apache.kafka.common.serialization.StringDeserializer;
27-
import org.junit.jupiter.api.Assumptions;
2820
import org.junit.jupiter.api.Test;
2921
import org.junit.jupiter.api.extension.RegisterExtension;
3022

@@ -46,14 +38,13 @@ private static Map<String, Object> consumerConfig() {
4638

4739
@Test
4840
void badConfig() {
49-
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
50-
5141
// Bad config - wrong type for supplier
5242
assertThatThrownBy(
5343
() -> {
5444
Map<String, Object> consumerConfig = consumerConfig();
5545
consumerConfig.put(
56-
OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER, "foo");
46+
OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER,
47+
"foo");
5748
new KafkaConsumer<>(consumerConfig).close();
5849
})
5950
.hasRootCauseInstanceOf(IllegalStateException.class)
@@ -76,46 +67,8 @@ void badConfig() {
7667

7768
@Test
7869
void serializableConfig() throws IOException, ClassNotFoundException {
79-
testSerialize(consumerConfig());
80-
}
81-
82-
@SuppressWarnings("unchecked")
83-
private static void testSerialize(Map<String, Object> map)
84-
throws IOException, ClassNotFoundException {
85-
// Check that consumer config has the supplier
86-
Object consumerSupplier =
87-
map.get(OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER);
88-
89-
assertThat(consumerSupplier).isInstanceOf(KafkaConsumerTelemetrySupplier.class);
90-
KafkaConsumerTelemetrySupplier supplier = (KafkaConsumerTelemetrySupplier) consumerSupplier;
91-
assertThat(supplier.get()).isNotNull();
92-
93-
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
94-
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
95-
outputStream.writeObject(map);
96-
}
97-
98-
class CustomObjectInputStream extends ObjectInputStream {
99-
CustomObjectInputStream(InputStream inputStream) throws IOException {
100-
super(inputStream);
101-
}
102-
103-
@Override
104-
protected Class<?> resolveClass(ObjectStreamClass desc)
105-
throws IOException, ClassNotFoundException {
106-
if (desc.getName().startsWith("io.opentelemetry.")) {
107-
throw new IllegalStateException(
108-
"Serial form contains opentelemetry class " + desc.getName());
109-
}
110-
return super.resolveClass(desc);
111-
}
112-
}
113-
114-
try (ObjectInputStream inputStream =
115-
new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
116-
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
117-
assertThat(result.get(OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER))
118-
.isNull();
119-
}
70+
SerializationTestUtil.testSerialize(
71+
consumerConfig(),
72+
OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER);
12073
}
12174
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryMetricsReporterTest.java

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,14 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
77

8-
import static org.assertj.core.api.Assertions.assertThat;
98
import static org.assertj.core.api.Assertions.assertThatThrownBy;
109

1110
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.AbstractOpenTelemetryMetricsReporterTest;
1211
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter;
13-
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
1412
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
1513
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1614
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
17-
import java.io.ByteArrayInputStream;
18-
import java.io.ByteArrayOutputStream;
1915
import java.io.IOException;
20-
import java.io.InputStream;
21-
import java.io.ObjectInputStream;
22-
import java.io.ObjectOutputStream;
23-
import java.io.ObjectStreamClass;
2416
import java.util.Map;
2517
import org.apache.kafka.clients.consumer.KafkaConsumer;
2618
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -127,44 +119,9 @@ void badConfig() {
127119

128120
@Test
129121
void serializableConfig() throws IOException, ClassNotFoundException {
130-
testSerialize(producerConfig());
131-
testSerialize(consumerConfig());
132-
}
133-
134-
@SuppressWarnings("unchecked")
135-
private static void testSerialize(Map<String, Object> map)
136-
throws IOException, ClassNotFoundException {
137-
OpenTelemetrySupplier supplier =
138-
(OpenTelemetrySupplier)
139-
map.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
140-
assertThat(supplier).isNotNull();
141-
assertThat(supplier.get()).isNotNull();
142-
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
143-
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
144-
outputStream.writeObject(map);
145-
}
146-
147-
class CustomObjectInputStream extends ObjectInputStream {
148-
CustomObjectInputStream(InputStream inputStream) throws IOException {
149-
super(inputStream);
150-
}
151-
152-
@Override
153-
protected Class<?> resolveClass(ObjectStreamClass desc)
154-
throws IOException, ClassNotFoundException {
155-
if (desc.getName().startsWith("io.opentelemetry.")) {
156-
throw new IllegalStateException(
157-
"Serial form contains opentelemetry class " + desc.getName());
158-
}
159-
return super.resolveClass(desc);
160-
}
161-
}
162-
163-
try (ObjectInputStream inputStream =
164-
new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
165-
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
166-
assertThat(result.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER))
167-
.isNull();
168-
}
122+
SerializationTestUtil.testSerialize(
123+
producerConfig(), OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
124+
SerializationTestUtil.testSerialize(
125+
consumerConfig(), OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
169126
}
170127
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptorTest.java

Lines changed: 5 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,18 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
77

8-
import static org.assertj.core.api.Assertions.assertThat;
98
import static org.assertj.core.api.Assertions.assertThatThrownBy;
109

1110
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
1211
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1312
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
14-
import java.io.ByteArrayInputStream;
15-
import java.io.ByteArrayOutputStream;
1613
import java.io.IOException;
17-
import java.io.InputStream;
18-
import java.io.ObjectInputStream;
19-
import java.io.ObjectOutputStream;
20-
import java.io.ObjectStreamClass;
2114
import java.util.HashMap;
2215
import java.util.Map;
2316
import java.util.function.Supplier;
2417
import org.apache.kafka.clients.producer.KafkaProducer;
2518
import org.apache.kafka.clients.producer.ProducerConfig;
2619
import org.apache.kafka.common.serialization.StringSerializer;
27-
import org.junit.jupiter.api.Assumptions;
2820
import org.junit.jupiter.api.Test;
2921
import org.junit.jupiter.api.extension.RegisterExtension;
3022

@@ -45,14 +37,13 @@ private static Map<String, Object> producerConfig() {
4537

4638
@Test
4739
void badConfig() {
48-
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
49-
5040
// Bad config - wrong type for supplier
5141
assertThatThrownBy(
5242
() -> {
5343
Map<String, Object> producerConfig = producerConfig();
5444
producerConfig.put(
55-
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER, "foo");
45+
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER,
46+
"foo");
5647
new KafkaProducer<>(producerConfig).close();
5748
})
5849
.hasRootCauseInstanceOf(IllegalStateException.class)
@@ -75,46 +66,8 @@ void badConfig() {
7566

7667
@Test
7768
void serializableConfig() throws IOException, ClassNotFoundException {
78-
testSerialize(producerConfig());
79-
}
80-
81-
@SuppressWarnings("unchecked")
82-
private static void testSerialize(Map<String, Object> map)
83-
throws IOException, ClassNotFoundException {
84-
// Check that producer config has the supplier
85-
Object producerSupplier =
86-
map.get(OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER);
87-
88-
assertThat(producerSupplier).isInstanceOf(KafkaProducerTelemetrySupplier.class);
89-
KafkaProducerTelemetrySupplier supplier = (KafkaProducerTelemetrySupplier) producerSupplier;
90-
assertThat(supplier.get()).isNotNull();
91-
92-
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
93-
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
94-
outputStream.writeObject(map);
95-
}
96-
97-
class CustomObjectInputStream extends ObjectInputStream {
98-
CustomObjectInputStream(InputStream inputStream) throws IOException {
99-
super(inputStream);
100-
}
101-
102-
@Override
103-
protected Class<?> resolveClass(ObjectStreamClass desc)
104-
throws IOException, ClassNotFoundException {
105-
if (desc.getName().startsWith("io.opentelemetry.")) {
106-
throw new IllegalStateException(
107-
"Serial form contains opentelemetry class " + desc.getName());
108-
}
109-
return super.resolveClass(desc);
110-
}
111-
}
112-
113-
try (ObjectInputStream inputStream =
114-
new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
115-
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
116-
assertThat(result.get(OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER))
117-
.isNull();
118-
}
69+
SerializationTestUtil.testSerialize(
70+
producerConfig(),
71+
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER);
11972
}
12073
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import java.io.ByteArrayInputStream;
11+
import java.io.ByteArrayOutputStream;
12+
import java.io.IOException;
13+
import java.io.InputStream;
14+
import java.io.ObjectInputStream;
15+
import java.io.ObjectOutputStream;
16+
import java.io.ObjectStreamClass;
17+
import java.util.Map;
18+
19+
final class SerializationTestUtil {
20+
21+
/**
22+
* Tests that a configuration map can be serialized and that OpenTelemetry classes are replaced
23+
* with null during serialization (via writeReplace()).
24+
*
25+
* @param map the configuration map to serialize
26+
* @param configKey the key to check for in the deserialized map
27+
*/
28+
static void testSerialize(Map<String, Object> map, String configKey)
29+
throws IOException, ClassNotFoundException {
30+
Object supplierValue = map.get(configKey);
31+
assertThat(supplierValue).isNotNull();
32+
33+
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
34+
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
35+
outputStream.writeObject(map);
36+
}
37+
38+
class CustomObjectInputStream extends ObjectInputStream {
39+
CustomObjectInputStream(InputStream inputStream) throws IOException {
40+
super(inputStream);
41+
}
42+
43+
@Override
44+
protected Class<?> resolveClass(ObjectStreamClass desc)
45+
throws IOException, ClassNotFoundException {
46+
if (desc.getName().startsWith("io.opentelemetry.")) {
47+
throw new IllegalStateException(
48+
"Serial form contains opentelemetry class " + desc.getName());
49+
}
50+
return super.resolveClass(desc);
51+
}
52+
}
53+
54+
try (ObjectInputStream inputStream =
55+
new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
56+
@SuppressWarnings("unchecked")
57+
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
58+
// After deserialization, the supplier should be null (replaced via writeReplace())
59+
assertThat(result.get(configKey)).isNull();
60+
}
61+
}
62+
63+
private SerializationTestUtil() {}
64+
}

0 commit comments

Comments
 (0)