Skip to content

Commit a83166e

Browse files
author
Liudmila Molkova
authored
ServiceBus tracing: Fix more occurrences of NPE when message is null in tracing instrumentation (Azure#36800)
* Fix more occurences of NPE when message is null in tracing instr
1 parent b20cdd3 commit a83166e

File tree

6 files changed

+135
-1
lines changed

6 files changed

+135
-1
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535

3636
- Fixed issue causing updates to TopicProperties with AuthorizationRules to return 400 Bad request. ([#34880](https://github.com/Azure/azure-sdk-for-java/issues/34880))
3737

38+
- Fixed `NullPointerException` that happens when session processor or receiver encounters an error and distributed tracing is enabled.
39+
([#36800](https://github.com/Azure/azure-sdk-for-java/issues/36800))
40+
3841
### Other Changes
3942

4043
#### Dependency Updates

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ protected void hookOnSubscribe(Subscription subscription) {
5555

5656
@Override
5757
protected void hookOnNext(ServiceBusMessageContext message) {
58+
if (message == null || message.getMessage() == null) {
59+
downstream.onNext(message);
60+
return;
61+
}
62+
5863
Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
5964
message.getMessage().setContext(span);
6065
AutoCloseable scope = tracer.makeSpanCurrent(span);

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ public void onSubscribe(Subscription subscription) {
374374
@SuppressWarnings("try")
375375
@Override
376376
public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
377-
Context span = serviceBusMessageContext.getMessage().getContext();
377+
Context span = serviceBusMessageContext.getMessage() != null ? serviceBusMessageContext.getMessage().getContext() : Context.NONE;
378378
Exception exception = null;
379379
AutoCloseable scope = tracer.makeSpanCurrent(span);
380380
try {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ContextAccessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public interface SendMessageContextAccessor {
2121
}
2222

2323
public static ServiceBusReceivedMessage setContext(ServiceBusReceivedMessage message, Context context) {
24+
assert message != null; // message is never null on this path.
2425
return receiveAccessor.setContext(message, context);
2526
}
2627

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.servicebus;
5+
6+
import com.azure.core.util.BinaryData;
7+
import com.azure.core.util.Context;
8+
import com.azure.core.util.tracing.SpanKind;
9+
import com.azure.core.util.tracing.StartSpanOptions;
10+
import com.azure.core.util.tracing.Tracer;
11+
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
12+
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
13+
import org.junit.jupiter.params.ParameterizedTest;
14+
import org.junit.jupiter.params.provider.EnumSource;
15+
import reactor.test.StepVerifier;
16+
import reactor.test.publisher.TestPublisher;
17+
18+
import java.time.Duration;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
24+
public class FluxTraceTest {
25+
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
26+
private final ServiceBusReceivedMessage receivedMessage = new ServiceBusReceivedMessage(BinaryData.fromString("Some Data"));
27+
private final ServiceBusMessageContext message = new ServiceBusMessageContext(receivedMessage);
28+
private final TestPublisher<ServiceBusMessageContext> messagesPublisher = TestPublisher.create();
29+
30+
@ParameterizedTest
31+
@EnumSource(ReceiverKind.class)
32+
public void testProcessSpans(ReceiverKind receiverKind) {
33+
TestTracer tracer = new TestTracer();
34+
ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, null, "fqdn", "entityPath", null, receiverKind);
35+
FluxTrace fluxTrace = new FluxTrace(messagesPublisher.flux(), instrumentation);
36+
37+
StepVerifier.create(fluxTrace)
38+
.then(() -> messagesPublisher.next(message))
39+
.assertNext(m -> {
40+
switch (receiverKind) {
41+
case SYNC_RECEIVER:
42+
assertEquals(0, tracer.getStartedSpans().size());
43+
break;
44+
default:
45+
assertEquals(1, tracer.getStartedSpans().size());
46+
assertEquals("ServiceBus.process", tracer.getStartedSpans().get(0));
47+
break;
48+
}
49+
})
50+
.thenCancel()
51+
.verify(DEFAULT_TIMEOUT);
52+
}
53+
54+
@ParameterizedTest
55+
@EnumSource(ReceiverKind.class)
56+
public void nullMessage(ReceiverKind receiverKind) {
57+
TestTracer tracer = new TestTracer();
58+
ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, null, "fqdn", "entityPath", null, receiverKind);
59+
FluxTrace fluxTrace = new FluxTrace(messagesPublisher.flux(), instrumentation);
60+
61+
StepVerifier.create(fluxTrace)
62+
.then(() -> messagesPublisher.next(new ServiceBusMessageContext("sessionId", new RuntimeException("foo"))))
63+
.assertNext(m -> assertEquals(0, tracer.getStartedSpans().size()))
64+
.thenCancel()
65+
.verify(DEFAULT_TIMEOUT);
66+
}
67+
68+
private static class TestTracer implements Tracer {
69+
private final List<String> startedSpans = new ArrayList<>();
70+
@Override
71+
public Context start(String methodName, StartSpanOptions options, Context context) {
72+
startedSpans.add(methodName);
73+
return context;
74+
}
75+
76+
@Override
77+
public Context start(String methodName, Context context) {
78+
return start(methodName, new StartSpanOptions(SpanKind.INTERNAL), context);
79+
}
80+
81+
@Override
82+
public void end(String errorMessage, Throwable throwable, Context context) {
83+
}
84+
85+
@Override
86+
public void setAttribute(String key, String value, Context context) {
87+
}
88+
89+
public List<String> getStartedSpans() {
90+
return startedSpans;
91+
}
92+
}
93+
}

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static org.junit.jupiter.api.Assertions.assertNull;
4040
import static org.junit.jupiter.api.Assertions.assertSame;
4141
import static org.junit.jupiter.api.Assertions.assertTrue;
42+
import static org.junit.jupiter.api.Assertions.fail;
4243
import static org.mockito.ArgumentMatchers.any;
4344
import static org.mockito.ArgumentMatchers.eq;
4445
import static org.mockito.ArgumentMatchers.isNull;
@@ -439,6 +440,37 @@ public void testProcessorWithTracingEnabled() throws InterruptedException {
439440
verify(tracer, atLeast(numberOfTimes - 1)).end(isNull(), isNull(), any());
440441
}
441442

443+
@Test
444+
@SuppressWarnings("unchecked")
445+
public void testProcessorWithTracingEnabledAndNullMessage() throws InterruptedException {
446+
final Tracer tracer = mock(Tracer.class);
447+
final int numberOfTimes = 1;
448+
449+
when(tracer.isEnabled()).thenReturn(true);
450+
when(tracer.extractContext(any())).thenReturn(Context.NONE);
451+
452+
when(tracer.start(eq("ServiceBus.process"), any(StartSpanOptions.class), any())).thenReturn(new Context(PARENT_TRACE_CONTEXT_KEY, "span"));
453+
454+
Flux<ServiceBusMessageContext> messageFlux = Flux.just(new ServiceBusMessageContext("sessionId", new RuntimeException("foo")));
455+
ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux, tracer);
456+
457+
CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes);
458+
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, ENTITY_NAME,
459+
null, null,
460+
messageContext -> fail("Should not have received a message"),
461+
error -> {
462+
assertEquals("foo", error.getException().getMessage());
463+
countDownLatch.countDown();
464+
},
465+
new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1));
466+
467+
serviceBusProcessorClient.start();
468+
assertTrue(countDownLatch.await(20, TimeUnit.SECONDS));
469+
serviceBusProcessorClient.close();
470+
471+
verify(tracer, never()).start(eq("ServiceBus.process"), any(StartSpanOptions.class), any(Context.class));
472+
}
473+
442474
@Test
443475
@SuppressWarnings("unchecked")
444476
public void testProcessorWithTracingDisabled() throws InterruptedException {

0 commit comments

Comments
 (0)