Skip to content

Commit 95553e4

Browse files
authored
Create TracingListIterator.java
1 parent 970717b commit 95553e4

File tree

1 file changed

+89
-0
lines changed
  • instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal

1 file changed

+89
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;
7+
8+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
9+
import java.util.Iterator;
10+
import java.util.ListIterator;
11+
import java.util.function.BooleanSupplier;
12+
import org.apache.kafka.clients.consumer.ConsumerRecord;
13+
14+
/**
15+
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
16+
* any time.
17+
*/
18+
public class TracingListIterator<K, V> implements ListIterator<ConsumerRecord<K, V>> {
19+
20+
private final ListIterator<ConsumerRecord<K, V>> delegateListIterator;
21+
private final Iterator<ConsumerRecord<K, V>> tracingIterator;
22+
23+
private TracingListIterator(
24+
ListIterator<ConsumerRecord<K, V>> delegateListIterator,
25+
Instrumenter<KafkaProcessRequest, Void> instrumenter,
26+
BooleanSupplier wrappingEnabled,
27+
KafkaConsumerContext consumerContext) {
28+
this.delegateListIterator = delegateListIterator;
29+
this.tracingIterator =
30+
TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext);
31+
}
32+
33+
public static <K, V> ListIterator<ConsumerRecord<K, V>> wrap(
34+
ListIterator<ConsumerRecord<K, V>> delegateListIterator,
35+
Instrumenter<KafkaProcessRequest, Void> instrumenter,
36+
BooleanSupplier wrappingEnabled,
37+
KafkaConsumerContext consumerContext) {
38+
if (wrappingEnabled.getAsBoolean()) {
39+
return new TracingListIterator<>(
40+
delegateListIterator, instrumenter, wrappingEnabled, consumerContext);
41+
}
42+
return delegateListIterator;
43+
}
44+
45+
@Override
46+
public boolean hasNext() {
47+
return tracingIterator.hasNext();
48+
}
49+
50+
@Override
51+
public ConsumerRecord<K, V> next() {
52+
return tracingIterator.next();
53+
}
54+
55+
@Override
56+
public boolean hasPrevious() {
57+
return delegateListIterator.hasPrevious();
58+
}
59+
60+
@Override
61+
public ConsumerRecord<K, V> previous() {
62+
return delegateListIterator.previous();
63+
}
64+
65+
@Override
66+
public int nextIndex() {
67+
return delegateListIterator.nextIndex();
68+
}
69+
70+
@Override
71+
public int previousIndex() {
72+
return delegateListIterator.previousIndex();
73+
}
74+
75+
@Override
76+
public void remove() {
77+
delegateListIterator.remove();
78+
}
79+
80+
@Override
81+
public void set(ConsumerRecord<K, V> consumerRecord) {
82+
delegateListIterator.set(consumerRecord);
83+
}
84+
85+
@Override
86+
public void add(ConsumerRecord<K, V> consumerRecord) {
87+
delegateListIterator.add(consumerRecord);
88+
}
89+
}

0 commit comments

Comments
 (0)