22
33import io .opentelemetry .api .GlobalOpenTelemetry ;
44import io .opentelemetry .api .OpenTelemetry ;
5+ import io .opentelemetry .api .common .AttributeKey ;
56import io .opentelemetry .api .trace .Span ;
67import io .opentelemetry .api .trace .SpanKind ;
78import io .opentelemetry .api .trace .Tracer ;
89import io .opentelemetry .context .Scope ;
910import io .opentelemetry .contrib .messaging .wrappers .MessagingProcessWrapper ;
10- import io .opentelemetry .contrib .messaging .wrappers .kafka .internal .AutoConfiguredDataCapture ;
1111import io .opentelemetry .contrib .messaging .wrappers .kafka .semconv .KafkaConsumerAttributesExtractor ;
1212import io .opentelemetry .contrib .messaging .wrappers .kafka .semconv .KafkaProcessRequest ;
1313import io .opentelemetry .instrumentation .kafkaclients .v2_6 .TracingConsumerInterceptor ;
1414import io .opentelemetry .instrumentation .kafkaclients .v2_6 .TracingProducerInterceptor ;
15- import io .opentelemetry .instrumentation .testing .util .TelemetryDataUtil ;
16- import io .opentelemetry .sdk .testing .assertj .TraceAssert ;
17- import io .opentelemetry .sdk .testing .assertj .TracesAssert ;
18- import io .opentelemetry .sdk .trace .data .SpanData ;
1915import org .apache .kafka .clients .consumer .ConsumerConfig ;
2016import org .apache .kafka .clients .consumer .ConsumerRecord ;
2117import org .apache .kafka .clients .consumer .ConsumerRecords ;
2218import org .apache .kafka .clients .producer .ProducerConfig ;
2319import org .apache .kafka .clients .producer .ProducerRecord ;
2420import org .assertj .core .api .AbstractAssert ;
25- import org .awaitility .core .ConditionTimeoutException ;
2621import org .junit .jupiter .api .Test ;
2722
28- import javax .annotation .Nullable ;
2923import java .nio .charset .StandardCharsets ;
3024import java .time .Duration ;
31- import java .util .ArrayList ;
32- import java .util .Comparator ;
33- import java .util .List ;
3425import java .util .Map ;
35- import java .util .Arrays ;
36- import java .util .concurrent .TimeoutException ;
37- import java .util .function .Consumer ;
38- import java .util .function .Supplier ;
3926
40- import static io .opentelemetry .instrumentation .testing .util .TelemetryDataUtil .orderByRootSpanName ;
41- import static io .opentelemetry .instrumentation .testing .util .TelemetryDataUtil .waitForTraces ;
4227import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .assertThat ;
4328import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .equalTo ;
4429import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .satisfies ;
4934import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_MESSAGE_BODY_SIZE ;
5035import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_OPERATION ;
5136import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_SYSTEM ;
52- import static org .awaitility .Awaitility .await ;
5337
5438public class KafkaClientTest extends KafkaClientBaseTest {
5539
@@ -130,7 +114,7 @@ public void consumeWithChild(Tracer tracer, MessagingProcessWrapper<KafkaProcess
130114 @ SuppressWarnings ("deprecation" ) // using deprecated semconv
131115 public void assertTraces () {
132116 waitAndAssertTraces (
133- orderByRootSpanName ("parent" , "producer callback" ),
117+ sortByRootSpanName ("parent" , "producer callback" ),
134118 trace ->
135119 trace .hasSpansSatisfyingExactly (
136120 span -> span .hasName ("parent" ).hasKind (SpanKind .INTERNAL ).hasNoParent (),
@@ -152,9 +136,9 @@ public void assertTraces() {
152136 satisfies (
153137 MESSAGING_DESTINATION_PARTITION_ID ,
154138 org .assertj .core .api .AbstractStringAssert ::isNotEmpty ),
155- satisfies (
156- MESSAGING_CLIENT_ID ,
157- stringAssert -> stringAssert . isEqualTo ( " test-consumer-1") ),
139+ // FIXME: We do have "messaging.client_id" in instrumentation but "messaging.client.id" in
140+ // semconv library right now. It should be replaced after semconv release.
141+ equalTo ( AttributeKey . stringKey ( "messaging.client_id" ), " test-consumer-1" ),
158142 satisfies (
159143 MESSAGING_KAFKA_OFFSET ,
160144 AbstractAssert ::isNotNull ),
@@ -171,47 +155,4 @@ public void assertTraces() {
171155 span ->
172156 span .hasName ("producer callback" ).hasKind (SpanKind .INTERNAL ).hasNoParent ()));
173157 }
174-
175- @ SafeVarargs
176- @ SuppressWarnings ("varargs" )
177- private static void waitAndAssertTraces (
178- @ Nullable Comparator <List <SpanData >> traceComparator ,
179- Consumer <TraceAssert >... assertions ) {
180- List <Consumer <TraceAssert >> assertionsList = new ArrayList <>(Arrays .asList (assertions ));
181- try {
182- await ()
183- .untilAsserted (() -> doAssertTraces (traceComparator , AutoConfiguredDataCapture ::getSpans , assertionsList ));
184- } catch (Throwable t ) {
185- // awaitility is doing a jmx call that is not implemented in GraalVM:
186- // call:
187- // https://github.com/awaitility/awaitility/blob/fbe16add874b4260dd240108304d5c0be84eabc8/awaitility/src/main/java/org/awaitility/core/ConditionAwaiter.java#L157
188- // see https://github.com/oracle/graal/issues/6101 (spring boot graal native image)
189- if (t .getClass ().getName ().equals ("com.oracle.svm.core.jdk.UnsupportedFeatureError" )
190- || t instanceof ConditionTimeoutException ) {
191- // Don't throw this failure since the stack is the awaitility thread, causing confusion.
192- // Instead, just assert one more time on the test thread, which will fail with a better
193- // stack trace.
194- // TODO(anuraaga): There is probably a better way to do this.
195- doAssertTraces (traceComparator , AutoConfiguredDataCapture ::getSpans , assertionsList );
196- } else {
197- throw t ;
198- }
199- }
200- }
201-
202- private static void doAssertTraces (
203- @ Nullable Comparator <List <SpanData >> traceComparator ,
204- Supplier <List <SpanData >> supplier ,
205- List <Consumer <TraceAssert >> assertionsList ) {
206- try {
207- List <List <SpanData >> traces = waitForTraces (supplier , assertionsList .size ());
208- TelemetryDataUtil .assertScopeVersion (traces );
209- if (traceComparator != null ) {
210- traces .sort (traceComparator );
211- }
212- TracesAssert .assertThat (traces ).hasTracesSatisfyingExactly (assertionsList );
213- } catch (InterruptedException | TimeoutException e ) {
214- throw new AssertionError ("Error waiting for " + assertionsList .size () + " traces" , e );
215- }
216- }
217158}
0 commit comments