Skip to content

Commit 0f1712f

Browse files
author
Liudmila Molkova
authored
ServiceBus: small optimizations and logging improvements (Azure#36536)
* optimizations and logging improvements
1 parent 4aa0679 commit 0f1712f

File tree

14 files changed

+106
-49
lines changed

14 files changed

+106
-49
lines changed

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpResponseCode.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33

44
package com.azure.core.amqp.exception;
55

6-
import java.util.HashMap;
7-
import java.util.Map;
8-
96
/**
107
* Error response codes returned from AMQP.
118
*/
@@ -200,11 +197,11 @@ public enum AmqpResponseCode {
200197
*/
201198
HTTP_VERSION_NOT_SUPPORTED(505);
202199

203-
private static final Map<Integer, AmqpResponseCode> VALUE_MAP = new HashMap<>();
200+
private static final AmqpResponseCode[] VALUE_MAP = new AmqpResponseCode[1024];
204201

205202
static {
206203
for (AmqpResponseCode code : AmqpResponseCode.values()) {
207-
VALUE_MAP.put(code.value, code);
204+
VALUE_MAP[code.value] = code;
208205
}
209206
}
210207

@@ -222,7 +219,11 @@ public enum AmqpResponseCode {
222219
* is found.
223220
*/
224221
public static AmqpResponseCode fromValue(final int value) {
225-
return VALUE_MAP.get(value);
222+
if (value >= 0 && value < VALUE_MAP.length) {
223+
return VALUE_MAP[value];
224+
}
225+
226+
return null;
226227
}
227228

228229
/**

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Objects;
2424

2525
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addKeyValueIfNotNull;
26+
import static com.azure.core.amqp.implementation.ClientConstants.DELIVERY_STATE_KEY;
2627
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
2728
import static com.azure.core.amqp.implementation.ClientConstants.ERROR_CONDITION_KEY;
2829
import static com.azure.core.amqp.implementation.ClientConstants.ERROR_DESCRIPTION_KEY;
@@ -59,9 +60,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {
5960
return channel.sendWithAck(protonJMessage)
6061
.handle((Message responseMessage, SynchronousSink<AmqpAnnotatedMessage> sink) ->
6162
handleResponse(responseMessage, sink, channel.getErrorContext()))
62-
.switchIfEmpty(Mono.error(() -> new AmqpException(true, String.format(
63-
"entityPath[%s] No response received from management channel.", entityPath),
64-
channel.getErrorContext())));
63+
.switchIfEmpty(errorIfEmpty(channel, null));
6564
}));
6665
}
6766

@@ -74,9 +73,7 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOut
7473
return channel.sendWithAck(protonJMessage, protonJDeliveryState)
7574
.handle((Message responseMessage, SynchronousSink<AmqpAnnotatedMessage> sink) ->
7675
handleResponse(responseMessage, sink, channel.getErrorContext()))
77-
.switchIfEmpty(Mono.error(() -> new AmqpException(true, String.format(
78-
"entityPath[%s] outcome[%s] No response received from management channel.", entityPath,
79-
deliveryOutcome.getDeliveryState()), channel.getErrorContext())));
76+
.switchIfEmpty(errorIfEmpty(channel, deliveryOutcome.getDeliveryState()));
8077
}));
8178
}
8279

@@ -126,6 +123,17 @@ private void handleResponse(Message response, SynchronousSink<AmqpAnnotatedMessa
126123
sink.error(throwable);
127124
}
128125

126+
private <T> Mono<T> errorIfEmpty(RequestResponseChannel channel, com.azure.core.amqp.models.DeliveryState deliveryState) {
127+
return Mono.error(() -> {
128+
String error = String.format(
129+
"entityPath[%s] deliveryState[%s] No response received from management channel.", entityPath, deliveryState);
130+
AmqpException exception = new AmqpException(true, error, channel.getErrorContext());
131+
return logger.atError()
132+
.addKeyValue(DELIVERY_STATE_KEY, deliveryState)
133+
.log(exception);
134+
});
135+
}
136+
129137
private Mono<Void> isAuthorized() {
130138
return tokenManager.getAuthorizationResults()
131139
.next()

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,13 +396,18 @@ protected Message decodeDelivery(Delivery delivery) {
396396
}
397397

398398
private void settleMessage(Message message) {
399-
final String id = String.valueOf(message.getCorrelationId());
400-
final UnsignedLong correlationId = UnsignedLong.valueOf(id);
399+
UnsignedLong correlationId;
400+
if (message.getCorrelationId() instanceof UnsignedLong) {
401+
correlationId = (UnsignedLong) message.getCorrelationId();
402+
} else {
403+
String id = String.valueOf(message.getCorrelationId());
404+
correlationId = UnsignedLong.valueOf(id);
405+
}
401406
final MonoSink<Message> sink = unconfirmedSends.remove(correlationId);
402407

403408
if (sink == null) {
404409
logger.atWarning()
405-
.addKeyValue("messageId", id)
410+
.addKeyValue("messageId", message.getCorrelationId())
406411
.log("Received delivery without pending message.");
407412
return;
408413
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReactorHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public void onReactorInit(Event e) {
3939

4040
@Override
4141
public void onReactorFinal(Event e) {
42-
logger.info("reactor.onReactorFinal. event: {}", e);
42+
logger.atInfo()
43+
.addKeyValue("event", e)
44+
.log("reactor.onReactorFinal.");
4345
}
4446
}

sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/exception/AmqpResponseCodeTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
import org.junit.jupiter.api.Assertions;
77
import org.junit.jupiter.api.Test;
8+
import org.junit.jupiter.params.ParameterizedTest;
9+
import org.junit.jupiter.params.provider.ValueSource;
10+
11+
import static org.junit.jupiter.api.Assertions.assertNull;
812

913
public class AmqpResponseCodeTest {
1014
/**
@@ -20,6 +24,12 @@ public void createFromInteger() {
2024
Assertions.assertEquals(expected, actual);
2125
}
2226

27+
@ParameterizedTest
28+
@ValueSource(ints = {-1, 42, 1024})
29+
public void createFromInvalidInteger(int notDefined) {
30+
assertNull(AmqpResponseCode.fromValue(notDefined));
31+
}
32+
2333
/**
2434
* Verifies that we can parse the AmqpResponseCode an integer.
2535
*/

sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryUtils.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import io.opentelemetry.api.common.AttributeKey;
1010
import io.opentelemetry.api.common.Attributes;
1111

12-
import java.util.Optional;
13-
1412
import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY;
1513

1614
class OpenTelemetryUtils {
@@ -51,17 +49,20 @@ static void addAttribute(io.opentelemetry.api.common.AttributesBuilder attribute
5149
* If not context is found, returns {@link io.opentelemetry.context.Context#current()}.
5250
*/
5351
static io.opentelemetry.context.Context getTraceContextOrCurrent(Context azContext) {
54-
Optional<Object> traceContextOpt = azContext.getData(PARENT_TRACE_CONTEXT_KEY);
55-
if (traceContextOpt.isPresent()) {
56-
Object traceContextObj = traceContextOpt.get();
57-
if (traceContextObj instanceof io.opentelemetry.context.Context) {
52+
Object traceContextObj = azContext.getData(PARENT_TRACE_CONTEXT_KEY).orElse(null);
53+
if (traceContextObj != null) {
54+
if (io.opentelemetry.context.Context.class.isAssignableFrom(traceContextObj.getClass())) {
5855
return (io.opentelemetry.context.Context) traceContextObj;
59-
} else if (traceContextObj != null) {
60-
// TODO (limolkova) somehow we can get shaded otel agent context here
61-
if (!warnedOnContextType) {
62-
LOGGER.warning("Expected instance of `io.opentelemetry.context.Context` under `PARENT_TRACE_CONTEXT_KEY`, but got {}, ignoring it.", traceContextObj.getClass().getName());
63-
warnedOnContextType = true;
64-
}
56+
} else if (!warnedOnContextType) {
57+
// TODO (limolkova) https://github.com/Azure/azure-sdk-for-java/issues/36537
58+
// The context we have here is created by azure-core-tracing-opentelemetry.
59+
// but if otel or applicationInsights agents are used, the azure-core-tracing-opentelemetry is shaded and records shaded context.
60+
// if azure-core-metrics-opentelemetry is NOT shaded, we won't be able to reuse this context since it's not compatible.
61+
// I.e. it works fine if both are shaded or not shaded and warns (once) if just one of them is shaded.
62+
// We should fix this by adding azure-core-metrics-opentelemetry to otel agent
63+
// In the meantime, it's ok - we return current context and exemplars should work reasonably well.
64+
LOGGER.warning("Expected instance of `io.opentelemetry.context.Context` under `PARENT_TRACE_CONTEXT_KEY`, but got {}, ignoring it.", traceContextObj.getClass().getName());
65+
warnedOnContextType = true;
6566
}
6667
}
6768

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxAutoComplete.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ static final class AutoCompleteSubscriber extends BaseSubscriber<ServiceBusMessa
7272

7373
@Override
7474
protected void hookOnSubscribe(Subscription subscription) {
75-
logger.info("Subscription received. Subscribing downstream. {}", subscription);
75+
logger.atVerbose()
76+
.addKeyValue("subscription", subscription)
77+
.log("Subscription received. Subscribing downstream.");
7678
downstream.onSubscribe(this);
7779
}
7880

@@ -149,7 +151,9 @@ private void applyWithCatch(Function<ServiceBusMessageContext, Mono<Void>> funct
149151
try {
150152
function.apply(context).block();
151153
} catch (Exception e) {
152-
logger.warning("Unable to '{}' message.", operation, e);
154+
logger.atWarning()
155+
.addKeyValue("operation", operation)
156+
.log("Operation on message failed.", e);
153157

154158
// On an error, we'll stop requesting from upstream and pass the error downstream.
155159
upstream().cancel();

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,9 @@ private List<ServiceBusReceivedMessage> deserializeListOfMessages(Message amqpMe
341341
final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(amqpMessage);
342342

343343
if (statusCode != AmqpResponseCode.OK) {
344-
LOGGER.warning("AMQP response did not contain OK status code. Actual: {}", statusCode);
344+
LOGGER.atWarning()
345+
.addKeyValue("statusCode", statusCode)
346+
.log("AMQP response did not contain OK status code.");
345347
return Collections.emptyList();
346348
}
347349

@@ -351,25 +353,32 @@ private List<ServiceBusReceivedMessage> deserializeListOfMessages(Message amqpMe
351353
LOGGER.warning("AMQP response did not contain a body.");
352354
return Collections.emptyList();
353355
} else if (!(responseBodyMap instanceof Map)) {
354-
LOGGER.warning("AMQP response body is not correct instance. Expected: {}. Actual: {}",
355-
Map.class, responseBodyMap.getClass());
356+
LOGGER.atWarning()
357+
.addKeyValue("expectedType", Map.class)
358+
.addKeyValue("actualType", responseBodyMap.getClass())
359+
.log("AMQP response body is not correct instance.");
356360
return Collections.emptyList();
357361
}
358362

359363
final Object messages = ((Map) responseBodyMap).get(ManagementConstants.MESSAGES);
360364
if (messages == null) {
361-
LOGGER.warning("Response body did not contain key: {}", ManagementConstants.MESSAGES);
365+
LOGGER.atWarning().addKeyValue("expectedKey", ManagementConstants.MESSAGES)
366+
.log("AMQP response body did not contain key.");
362367
return Collections.emptyList();
363368
} else if (!(messages instanceof Iterable)) {
364-
LOGGER.warning("Response body contents is not the correct type. Expected: {}. Actual: {}",
365-
Iterable.class, messages.getClass());
369+
LOGGER.atWarning()
370+
.addKeyValue("expectedType", Iterable.class)
371+
.addKeyValue("actualType", messages.getClass())
372+
.log("Response body contents is not the correct type.");
366373
return Collections.emptyList();
367374
}
368375

369376
for (Object message : (Iterable) messages) {
370377
if (!(message instanceof Map)) {
371-
LOGGER.warning("Message inside iterable of message is not correct type. Expected: {}. Actual: {}",
372-
Map.class, message.getClass());
378+
LOGGER.atWarning()
379+
.addKeyValue("expectedType", Map.class)
380+
.addKeyValue("actualType", message.getClass())
381+
.log("Message inside iterable of message is not correct type.");
373382
continue;
374383
}
375384

@@ -407,11 +416,17 @@ private ServiceBusReceivedMessage deserializeMessage(Message amqpMessage) {
407416
amqpMessageBody = AmqpMessageBody.fromSequence(messageData);
408417

409418
} else {
410-
LOGGER.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE, body.getType()));
419+
LOGGER.atWarning()
420+
.addKeyValue("actualType", body.getType())
421+
.log("Message body is not correct. Not setting body contents.");
422+
411423
amqpMessageBody = AmqpMessageBody.fromData(EMPTY_BYTE_ARRAY);
412424
}
413425
} else {
414-
LOGGER.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE, "null"));
426+
LOGGER.atWarning()
427+
.addKeyValue("actualType", "null")
428+
.log("Message body is not correct. Not setting body contents.");
429+
415430
amqpMessageBody = AmqpMessageBody.fromData(EMPTY_BYTE_ARRAY);
416431
}
417432

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousReceiveWork.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,17 @@ synchronized boolean emitNext(ServiceBusReceivedMessage message) {
152152
final int numberLeft = remaining.decrementAndGet();
153153

154154
if (numberLeft < 0) {
155-
logger.info("Number left {} < 0. Not emitting downstream.", numberLeft);
155+
logger.atInfo()
156+
.addKeyValue("numberLeft", numberLeft)
157+
.log("Not emitting downstream.");
156158
return false;
157159
}
158160

159161
final Sinks.EmitResult result = downstreamEmitter.tryEmitNext(message);
160162
if (result != Sinks.EmitResult.OK) {
161-
logger.info("Could not emit downstream. EmitResult: {}", result);
163+
logger.atInfo()
164+
.addKeyValue("emitResult", result)
165+
.log("Could not emit downstream.");
162166
return false;
163167
}
164168

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/implementation/EntityHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,9 @@ private static List<AuthorizationRuleImpl> toImplementation(List<AuthorizationRu
495495
implementation.setType("SharedAccessAuthorizationRule");
496496
} else {
497497
final String className = rule.getClass().getName();
498-
LOGGER.warning("AuthorizationRule type '{}' is unknown.", className);
498+
LOGGER.atWarning()
499+
.addKeyValue("type", className)
500+
.log("AuthorizationRule type is unknown.");
499501
implementation.setType(className);
500502
}
501503

0 commit comments

Comments
 (0)