Skip to content

Commit 789f444

Browse files
authored
Fix rocketmq test (#10902)
1 parent 8e1b900 commit 789f444

File tree

3 files changed

+126
-89
lines changed

3 files changed

+126
-89
lines changed

instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/javaagent/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,7 @@ tasks.withType<Test>().configureEach {
3131
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
3232

3333
jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")
34+
35+
// with default settings tests will fail when disk is 90% full
36+
jvmArgs("-Drocketmq.broker.diskSpaceWarningLevelRatio=1.0")
3437
}

instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ tasks.withType<Test>().configureEach {
1919
jvmArgs("--add-opens=java.base/sun.nio.ch=ALL-UNNAMED")
2020
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
2121
jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")
22+
// with default settings tests will fail when disk is 90% full
23+
jvmArgs("-Drocketmq.broker.diskSpaceWarningLevelRatio=1.0")
2224
}

instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.java

Lines changed: 121 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
99
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
1010
import static java.util.Collections.singletonList;
11+
import static org.assertj.core.api.Assertions.assertThat;
1112
import static org.junit.jupiter.api.Assertions.assertEquals;
1213

1314
import io.opentelemetry.api.common.AttributeKey;
15+
import io.opentelemetry.api.trace.SpanContext;
1416
import io.opentelemetry.api.trace.SpanKind;
1517
import io.opentelemetry.instrumentation.rocketmqclient.v4_8.base.BaseConf;
1618
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@@ -23,6 +25,8 @@
2325
import java.util.concurrent.ExecutionException;
2426
import java.util.concurrent.TimeUnit;
2527
import java.util.concurrent.TimeoutException;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.function.Consumer;
2630
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
2731
import org.apache.rocketmq.client.exception.MQClientException;
2832
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -253,106 +257,115 @@ void testRocketmqProduceAndBatchConsume() throws Exception {
253257
int maxAttempts = 5;
254258
for (int i = 0; i < maxAttempts; i++) {
255259
tracingMessageListener.reset();
260+
256261
testing().runWithSpan("parent", () -> producer.send(msgs));
262+
257263
tracingMessageListener.waitForMessages();
258264
if (tracingMessageListener.getLastBatchSize() == 2) {
259265
break;
260266
} else if (i < maxAttempts) {
261267
// if messages weren't received as a batch we get 1 trace instead of 2
262268
testing().waitForTraces(1);
269+
Thread.sleep(2_000);
263270
testing().clearData();
264271
logger.error("Messages weren't received as batch, retrying");
265272
}
266-
testing()
267-
.waitAndAssertTraces(
268-
trace ->
269-
trace.hasSpansSatisfyingExactly(
270-
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
271-
span ->
272-
span.hasName(sharedTopic + " publish")
273-
.hasKind(SpanKind.PRODUCER)
274-
.hasParent(trace.getSpan(0))
275-
.hasAttributesSatisfyingExactly(
276-
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
277-
equalTo(
278-
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
279-
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
280-
satisfies(
281-
SemanticAttributes.MESSAGING_MESSAGE_ID,
282-
val -> val.isInstanceOf(String.class)),
283-
satisfies(
284-
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
285-
val -> val.isInstanceOf(String.class)),
286-
equalTo(
287-
AttributeKey.stringKey("messaging.rocketmq.send_result"),
288-
"SEND_OK"))),
289-
trace ->
290-
trace.hasSpansSatisfyingExactly(
291-
span ->
292-
span.hasName("multiple_sources receive")
293-
.hasKind(SpanKind.CONSUMER)
294-
.hasAttributesSatisfyingExactly(
295-
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
296-
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
297-
span ->
298-
span.hasName(sharedTopic + " process")
299-
.hasKind(SpanKind.CONSUMER)
300-
.hasParent(trace.getSpan(0))
301-
.hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
302-
.hasAttributesSatisfyingExactly(
303-
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
304-
equalTo(
305-
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
306-
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
307-
satisfies(
308-
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
309-
val -> val.isInstanceOf(Long.class)),
310-
satisfies(
311-
SemanticAttributes.MESSAGING_MESSAGE_ID,
312-
val -> val.isInstanceOf(String.class)),
313-
equalTo(
314-
SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
315-
satisfies(
316-
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
317-
val -> val.isInstanceOf(Long.class)),
318-
satisfies(
319-
AttributeKey.stringKey("messaging.rocketmq.queue_id"),
320-
val -> val.isInstanceOf(Long.class)),
321-
satisfies(
322-
AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
323-
val -> val.isInstanceOf(Long.class))),
324-
span ->
325-
span.hasName(sharedTopic + " process")
326-
.hasKind(SpanKind.CONSUMER)
327-
.hasParent(trace.getSpan(0))
328-
.hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
329-
.hasAttributesSatisfyingExactly(
330-
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
331-
equalTo(
332-
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
333-
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
334-
satisfies(
335-
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
336-
val -> val.isInstanceOf(Long.class)),
337-
satisfies(
338-
SemanticAttributes.MESSAGING_MESSAGE_ID,
339-
val -> val.isInstanceOf(String.class)),
340-
equalTo(
341-
SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
342-
satisfies(
343-
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
344-
val -> val.isInstanceOf(String.class)),
345-
satisfies(
346-
AttributeKey.stringKey("messaging.rocketmq.queue_id"),
347-
val -> val.isInstanceOf(Long.class)),
348-
satisfies(
349-
AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
350-
val -> val.isInstanceOf(Long.class))),
351-
span ->
352-
span.hasName("messageListener")
353-
.hasParent(trace.getSpan(0))
354-
.hasKind(SpanKind.INTERNAL)));
355273
}
274+
275+
AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
276+
testing()
277+
.waitAndAssertTraces(
278+
trace -> {
279+
trace.hasSpansSatisfyingExactly(
280+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
281+
span ->
282+
span.hasName(sharedTopic + " publish")
283+
.hasKind(SpanKind.PRODUCER)
284+
.hasParent(trace.getSpan(0))
285+
.hasAttributesSatisfyingExactly(
286+
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
287+
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
288+
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
289+
satisfies(
290+
SemanticAttributes.MESSAGING_MESSAGE_ID,
291+
val -> val.isInstanceOf(String.class)),
292+
satisfies(
293+
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
294+
val -> val.isInstanceOf(String.class)),
295+
equalTo(
296+
AttributeKey.stringKey("messaging.rocketmq.send_result"),
297+
"SEND_OK")));
298+
299+
SpanContext spanContext = trace.getSpan(1).getSpanContext();
300+
producerSpanContext.set(
301+
SpanContext.createFromRemoteParent(
302+
spanContext.getTraceId(),
303+
spanContext.getSpanId(),
304+
spanContext.getTraceFlags(),
305+
spanContext.getTraceState()));
306+
},
307+
trace ->
308+
trace.hasSpansSatisfyingExactly(
309+
span ->
310+
span.hasName("multiple_sources receive")
311+
.hasKind(SpanKind.CONSUMER)
312+
.hasAttributesSatisfyingExactly(
313+
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
314+
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
315+
span ->
316+
span.hasName(sharedTopic + " process")
317+
.hasKind(SpanKind.CONSUMER)
318+
.hasParent(trace.getSpan(0))
319+
.hasLinksSatisfying(links(producerSpanContext.get()))
320+
.hasAttributesSatisfyingExactly(
321+
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
322+
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
323+
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
324+
satisfies(
325+
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
326+
val -> val.isInstanceOf(Long.class)),
327+
satisfies(
328+
SemanticAttributes.MESSAGING_MESSAGE_ID,
329+
val -> val.isInstanceOf(String.class)),
330+
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
331+
satisfies(
332+
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
333+
val -> val.isNotEmpty()),
334+
satisfies(
335+
AttributeKey.longKey("messaging.rocketmq.queue_id"),
336+
val -> val.isNotNull()),
337+
satisfies(
338+
AttributeKey.longKey("messaging.rocketmq.queue_offset"),
339+
val -> val.isNotNull())),
340+
span ->
341+
span.hasName(sharedTopic + " process")
342+
.hasKind(SpanKind.CONSUMER)
343+
.hasParent(trace.getSpan(0))
344+
.hasLinksSatisfying(links(producerSpanContext.get()))
345+
.hasAttributesSatisfyingExactly(
346+
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
347+
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
348+
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
349+
satisfies(
350+
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
351+
val -> val.isInstanceOf(Long.class)),
352+
satisfies(
353+
SemanticAttributes.MESSAGING_MESSAGE_ID,
354+
val -> val.isInstanceOf(String.class)),
355+
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagB"),
356+
satisfies(
357+
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
358+
val -> val.isNotEmpty()),
359+
satisfies(
360+
AttributeKey.longKey("messaging.rocketmq.queue_id"),
361+
val -> val.isNotNull()),
362+
satisfies(
363+
AttributeKey.longKey("messaging.rocketmq.queue_offset"),
364+
val -> val.isNotNull())),
365+
span ->
366+
span.hasName("messageListener")
367+
.hasParent(trace.getSpan(0))
368+
.hasKind(SpanKind.INTERNAL)));
356369
}
357370

358371
@Test
@@ -433,4 +446,23 @@ void captureMessageHeaderAsSpanAttributes() throws Exception {
433446
.hasParent(trace.getSpan(2))
434447
.hasKind(SpanKind.INTERNAL)));
435448
}
449+
450+
private static Consumer<List<? extends LinkData>> links(SpanContext... spanContexts) {
451+
return links -> {
452+
assertThat(links).hasSize(spanContexts.length);
453+
for (SpanContext spanContext : spanContexts) {
454+
assertThat(links)
455+
.anySatisfy(
456+
link -> {
457+
assertThat(link.getSpanContext().getTraceId())
458+
.isEqualTo(spanContext.getTraceId());
459+
assertThat(link.getSpanContext().getSpanId()).isEqualTo(spanContext.getSpanId());
460+
assertThat(link.getSpanContext().getTraceFlags())
461+
.isEqualTo(spanContext.getTraceFlags());
462+
assertThat(link.getSpanContext().getTraceState())
463+
.isEqualTo(spanContext.getTraceState());
464+
});
465+
}
466+
};
467+
}
436468
}

0 commit comments

Comments
 (0)