Skip to content

Commit e397953

Browse files
author
Liudmila Molkova
authored
Report prefetched seqno instead of consumer lag (Azure#30731)
* Report prefetched seqno instead of consumer lag
1 parent ed9b363 commit e397953

File tree

6 files changed

+204
-57
lines changed

6 files changed

+204
-57
lines changed

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpMetricsProvider.java

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,18 @@
1111
import com.azure.core.util.logging.ClientLogger;
1212
import com.azure.core.util.metrics.DoubleHistogram;
1313
import com.azure.core.util.metrics.LongCounter;
14+
import com.azure.core.util.metrics.LongGauge;
1415
import com.azure.core.util.metrics.Meter;
1516
import com.azure.core.util.metrics.MeterProvider;
1617
import org.apache.qpid.proton.amqp.Symbol;
1718
import org.apache.qpid.proton.amqp.transport.DeliveryState;
1819
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
19-
import org.apache.qpid.proton.message.Message;
2020

2121
import java.time.Instant;
22-
import java.util.Date;
2322
import java.util.HashMap;
2423
import java.util.Map;
2524
import java.util.concurrent.ConcurrentHashMap;
26-
27-
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
25+
import java.util.function.Supplier;
2826

2927
/**
3028
* Helper class responsible for efficient reporting metrics in AMQP core. It's efficient and safe to use when there is no
@@ -34,7 +32,6 @@ public class AmqpMetricsProvider {
3432
public static final String STATUS_CODE_KEY = "amqpStatusCode";
3533
public static final String MANAGEMENT_OPERATION_KEY = "amqpOperation";
3634
private static final ClientLogger LOGGER = new ClientLogger(AmqpMetricsProvider.class);
37-
private static final Symbol ENQUEUED_TIME_ANNOTATION = Symbol.valueOf(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
3835

3936
private static final String AZURE_CORE_AMQP_PROPERTIES_NAME = "azure-core.properties";
4037
private static final String AZURE_CORE_AMQP_PROPERTIES_VERSION_KEY = "version";
@@ -43,6 +40,9 @@ public class AmqpMetricsProvider {
4340
.getProperties(AZURE_CORE_AMQP_PROPERTIES_NAME)
4441
.getOrDefault(AZURE_CORE_AMQP_PROPERTIES_VERSION_KEY, null);
4542

43+
private static final AutoCloseable NOOP_CLOSEABLE = () -> {
44+
};
45+
4646
// all delivery state + 1 for `null` - we'll treat it as an error - no delivery was received
4747
private static final int DELIVERY_STATES_COUNT = DeliveryState.DeliveryStateType.values().length + 1;
4848

@@ -59,7 +59,7 @@ public class AmqpMetricsProvider {
5959
private LongCounter sessionErrors = null;
6060
private LongCounter linkErrors = null;
6161
private LongCounter transportErrors = null;
62-
private DoubleHistogram receivedLag = null;
62+
private LongGauge prefetchedSequenceNumber = null;
6363
private LongCounter addCredits = null;
6464

6565
/**
@@ -131,7 +131,7 @@ public AmqpMetricsProvider(Meter meter, String namespace, String entityPath) {
131131
this.linkErrors = this.meter.createLongCounter("messaging.az.amqp.client.link.errors", "AMQP link errors", "errors");
132132
this.transportErrors = this.meter.createLongCounter("messaging.az.amqp.client.transport.errors", "AMQP session errors", "errors");
133133
this.addCredits = this.meter.createLongCounter("messaging.az.amqp.consumer.credits.requested", "Number of requested credits", "credits");
134-
this.receivedLag = this.meter.createDoubleHistogram("messaging.az.amqp.consumer.lag", "Approximate lag between time message was received and time it was enqueued on the broker.", "sec");
134+
this.prefetchedSequenceNumber = this.meter.createLongGauge("messaging.az.amqp.prefetch.sequence_number", "Last prefetched sequence number", "seqNo");
135135
}
136136
}
137137

@@ -140,12 +140,20 @@ public static AmqpMetricsProvider noop() {
140140
}
141141

142142
/**
143-
* Checks if record delivers is enabled (for micro-optimizations).
143+
* Checks if send delivery metric is enabled (for micro-optimizations).
144144
*/
145145
public boolean isSendDeliveryEnabled() {
146146
return isEnabled && sendDuration.isEnabled();
147147
}
148148

149+
/**
150+
* Checks if prefetched sequence number is enabled (for micro-optimizations).
151+
*/
152+
public boolean isPrefetchedSequenceNumberEnabled() {
153+
return isEnabled && prefetchedSequenceNumber.isEnabled();
154+
}
155+
156+
149157
/**
150158
* Records duration of AMQP send call.
151159
*/
@@ -178,28 +186,14 @@ public void recordConnectionClosed(ErrorCondition condition) {
178186
}
179187

180188
/**
181-
* Records the message was received.
189+
* Creates gauge subscription to report latest sequence number value.
182190
*/
183-
public void recordReceivedMessage(Message message) {
184-
if (!isEnabled || !receivedLag.isEnabled()
185-
|| message == null
186-
|| message.getMessageAnnotations() == null
187-
|| message.getBody() == null) {
188-
return;
191+
public AutoCloseable trackPrefetchSequenceNumber(Supplier<Long> valueSupplier) {
192+
if (!isEnabled || !prefetchedSequenceNumber.isEnabled()) {
193+
return NOOP_CLOSEABLE;
189194
}
190195

191-
Map<Symbol, Object> properties = message.getMessageAnnotations().getValue();
192-
Object enqueuedTimeDate = properties != null ? properties.get(ENQUEUED_TIME_ANNOTATION) : null;
193-
if (enqueuedTimeDate instanceof Date) {
194-
Instant enqueuedTime = ((Date) enqueuedTimeDate).toInstant();
195-
long deltaMs = Instant.now().toEpochMilli() - enqueuedTime.toEpochMilli();
196-
if (deltaMs < 0) {
197-
deltaMs = 0;
198-
}
199-
receivedLag.record(deltaMs / 1000d, commonAttributes, Context.NONE);
200-
} else if (enqueuedTimeDate != null) {
201-
LOGGER.verbose("Received message has unexpected `x-opt-enqueued-time` annotation value - `{}`. Ignoring it.", enqueuedTimeDate);
202-
}
196+
return prefetchedSequenceNumber.registerCallback(valueSupplier, commonAttributes);
203197
}
204198

205199
/**

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import java.util.concurrent.RejectedExecutionException;
3232
import java.util.concurrent.TimeoutException;
3333
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicLong;
3435
import java.util.concurrent.atomic.AtomicReference;
3536
import java.util.function.Supplier;
3637

38+
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
3739
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addErrorCondition;
3840
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addSignalTypeAndResult;
3941
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
@@ -45,6 +47,7 @@
4547
* Handles receiving events from Event Hubs service and translating them to proton-j messages.
4648
*/
4749
public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoCloseable {
50+
private static final Symbol SEQUENCE_NUMBER_ANNOTATION = Symbol.valueOf(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue());
4851
private final String entityPath;
4952
private final Receiver receiver;
5053
private final ReceiveLinkHandler handler;
@@ -65,6 +68,8 @@ public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoClo
6568

6669
private final AtomicReference<Supplier<Integer>> creditSupplier = new AtomicReference<>();
6770
private final AmqpMetricsProvider metricsProvider;
71+
private final AtomicLong lastSequenceNumber = new AtomicLong();
72+
private final AutoCloseable trackPrefetchSeqNoSubscription;
6873

6974
@Deprecated
7075
protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Receiver receiver,
@@ -83,6 +88,7 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece
8388
this.tokenManager = tokenManager;
8489
this.dispatcher = dispatcher;
8590
this.metricsProvider = metricsProvider;
91+
this.trackPrefetchSeqNoSubscription = this.metricsProvider.trackPrefetchSequenceNumber(lastSequenceNumber::get);
8692

8793
Map<String, Object> loggingContext = createContextWithConnectionId(handler.getConnectionId());
8894
loggingContext.put(LINK_NAME_KEY, this.handler.getLinkName());
@@ -104,8 +110,14 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece
104110
"Cannot decode delivery when ReactorReceiver instance is closed."));
105111
return;
106112
}
113+
107114
final Message message = decodeDelivery(delivery);
108-
metricsProvider.recordReceivedMessage(message);
115+
if (metricsProvider.isPrefetchedSequenceNumberEnabled()) {
116+
Long seqNo = getSequenceNumber(message);
117+
if (seqNo != null) {
118+
lastSequenceNumber.set(seqNo);
119+
}
120+
}
109121

110122
final int creditsLeft = receiver.getRemoteCredit();
111123

@@ -122,13 +134,13 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece
122134
.addKeyValue("credits", credits)
123135
.log("Adding credits.");
124136
receiver.flow(credits);
125-
metricsProvider.recordAddCredits(credits);
126137
} else {
127138
logger.atVerbose()
128139
.addKeyValue("credits", credits)
129140
.log("There are no credits to add.");
130141
}
131142

143+
metricsProvider.recordAddCredits(credits == null ? 0 : credits);
132144
sink.success(message);
133145
});
134146
} catch (IOException | RejectedExecutionException e) {
@@ -448,6 +460,29 @@ private void completeClose() {
448460

449461
handler.close();
450462
receiver.free();
463+
try {
464+
trackPrefetchSeqNoSubscription.close();
465+
} catch (Exception e) {
466+
logger.verbose("Error closing metrics subscription.", e);
467+
}
468+
}
469+
470+
private Long getSequenceNumber(Message message) {
471+
if (message == null || message.getMessageAnnotations() == null || message.getBody() == null) {
472+
return null;
473+
}
474+
475+
Map<Symbol, Object> properties = message.getMessageAnnotations().getValue();
476+
Object seqNo = properties != null ? properties.get(SEQUENCE_NUMBER_ANNOTATION) : null;
477+
if (seqNo instanceof Integer) {
478+
return ((Integer) seqNo).longValue();
479+
} else if (seqNo instanceof Long) {
480+
return (Long) seqNo;
481+
} else if (seqNo != null) {
482+
logger.verbose("Received message has unexpected `x-opt-sequence-number` annotation value - `{}`. Ignoring it.", seqNo);
483+
}
484+
485+
return null;
451486
}
452487

453488
@Override

sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpMetricsProviderTest.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.azure.core.amqp.exception.AmqpResponseCode;
77
import com.azure.core.test.utils.metrics.TestCounter;
8+
import com.azure.core.test.utils.metrics.TestGauge;
89
import com.azure.core.test.utils.metrics.TestHistogram;
910
import com.azure.core.test.utils.metrics.TestMeasurement;
1011
import com.azure.core.test.utils.metrics.TestMeter;
@@ -22,12 +23,13 @@
2223

2324
import java.time.Instant;
2425
import java.util.Collections;
25-
import java.util.Date;
2626
import java.util.Map;
27+
import java.util.concurrent.atomic.AtomicLong;
2728

2829
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2930
import static org.junit.jupiter.api.Assertions.assertEquals;
3031
import static org.junit.jupiter.api.Assertions.assertFalse;
32+
import static org.junit.jupiter.api.Assertions.assertSame;
3133
import static org.junit.jupiter.api.Assertions.assertTrue;
3234

3335
public class AmqpMetricsProviderTest {
@@ -56,7 +58,7 @@ public void disabledMeter() {
5658
assertDoesNotThrow(() -> provider.recordRequestResponseDuration(0, "foo", null));
5759
assertDoesNotThrow(() -> provider.recordSend(0, DeliveryState.DeliveryStateType.Accepted));
5860
assertDoesNotThrow(() -> provider.recordSend(0, null));
59-
assertDoesNotThrow(() -> provider.recordReceivedMessage(Proton.message()));
61+
assertDoesNotThrow(() -> provider.trackPrefetchSequenceNumber(() -> Long.valueOf(1)));
6062
assertDoesNotThrow(() -> provider.recordHandlerError(AmqpMetricsProvider.ErrorSource.TRANSPORT, new ErrorCondition(TIMEOUT_SYMBOL, "")));
6163
assertDoesNotThrow(() -> provider.recordHandlerError(null, new ErrorCondition(TIMEOUT_SYMBOL, "")));
6264

@@ -77,7 +79,7 @@ public void defaultMeter() {
7779
assertDoesNotThrow(() -> provider.recordRequestResponseDuration(0, "foo", null));
7880
assertDoesNotThrow(() -> provider.recordSend(0, DeliveryState.DeliveryStateType.Accepted));
7981
assertDoesNotThrow(() -> provider.recordSend(0, null));
80-
assertDoesNotThrow(() -> provider.recordReceivedMessage(Proton.message()));
82+
assertDoesNotThrow(() -> provider.trackPrefetchSequenceNumber(() -> Long.valueOf(1)));
8183
assertDoesNotThrow(() -> provider.recordHandlerError(AmqpMetricsProvider.ErrorSource.LINK, new ErrorCondition(TIMEOUT_SYMBOL, "")));
8284
assertDoesNotThrow(() -> provider.recordHandlerError(null, new ErrorCondition(TIMEOUT_SYMBOL, "")));
8385
}
@@ -196,36 +198,46 @@ public void initCloseConnection() {
196198
}
197199

198200
@Test
199-
public void receivedMessage() {
201+
@SuppressWarnings("try")
202+
public void receivedMessage() throws Exception {
200203
TestMeter meter = new TestMeter();
201204
AmqpMetricsProvider provider = new AmqpMetricsProvider(meter, NAMESPACE, ENTITY_PATH);
202205

203-
Instant now = Instant.now();
204-
MessageAnnotations futureEnqueuedDate = new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-enqueued-time"), Date.from(now.plusSeconds(100))));
206+
AtomicLong seqNo = new AtomicLong(0);
205207

206-
provider.recordReceivedMessage(createMessage(Date.from(now.plusSeconds(100))));
207-
provider.recordReceivedMessage(createMessage(Date.from(now.minusSeconds(100))));
208+
TestGauge gauge;
209+
try (AutoCloseable subscription = provider.trackPrefetchSequenceNumber(seqNo::get)) {
210+
assertTrue(meter.getGauges().containsKey("messaging.az.amqp.prefetch.sequence_number"));
211+
gauge = meter.getGauges().get("messaging.az.amqp.prefetch.sequence_number");
212+
assertEquals(1, gauge.getSubscriptions().size());
213+
assertSame(subscription, gauge.getSubscriptions().get(0));
214+
TestGauge.Subscription testSubscription = (TestGauge.Subscription) subscription;
208215

209-
provider.recordReceivedMessage(Proton.message());
210-
provider.recordReceivedMessage(createMessage("not a date"));
216+
testSubscription.measure();
217+
seqNo.set(1);
218+
seqNo.set(2);
219+
testSubscription.measure();
220+
seqNo.set(42);
221+
testSubscription.measure();
211222

212-
assertTrue(meter.getHistograms().containsKey("messaging.az.amqp.consumer.lag"));
213-
TestHistogram histogram = meter.getHistograms().get("messaging.az.amqp.consumer.lag");
223+
assertEquals(3, testSubscription.getMeasurements().size());
224+
TestMeasurement<Long> measurement1 = testSubscription.getMeasurements().get(0);
214225

215-
assertEquals(2, histogram.getMeasurements().size());
216-
TestMeasurement<Double> measurement1 = histogram.getMeasurements().get(0);
226+
assertEquals(0, measurement1.getValue());
227+
assertEquals(Context.NONE, measurement1.getContext());
228+
assertCommonAttributes(measurement1.getAttributes(), NAMESPACE, ENTITY_NAME, ENTITY_PATH);
217229

218-
// negative delta becomes 0
219-
assertEquals(0, measurement1.getValue());
220-
assertEquals(Context.NONE, measurement1.getContext());
221-
assertCommonAttributes(measurement1.getAttributes(), NAMESPACE, ENTITY_NAME, ENTITY_PATH);
230+
assertEquals(2, testSubscription.getMeasurements().get(1).getValue());
231+
assertEquals(42, testSubscription.getMeasurements().get(2).getValue());
232+
}
222233

223-
assertEquals(100, histogram.getMeasurements().get(1).getValue(), 10);
234+
seqNo.set(-1);
235+
assertEquals(0, gauge.getSubscriptions().size());
224236
}
225237

226-
private Message createMessage(Object enqueuedTime) {
238+
private Message createMessage(Object seqNo) {
227239
Message msg = Proton.message();
228-
MessageAnnotations annotations = new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-enqueued-time"), enqueuedTime));
240+
MessageAnnotations annotations = new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-sequence-number"), seqNo));
229241
msg.setMessageAnnotations(annotations);
230242
msg.setBody(new AmqpValue("body"));
231243

sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.azure.core.amqp.exception.AmqpException;
1515
import com.azure.core.amqp.exception.AmqpResponseCode;
1616
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
17+
import com.azure.core.test.utils.metrics.TestGauge;
1718
import com.azure.core.test.utils.metrics.TestMeasurement;
1819
import com.azure.core.test.utils.metrics.TestMeter;
1920
import com.azure.core.util.Context;
@@ -45,7 +46,6 @@
4546

4647
import java.io.IOException;
4748
import java.time.Duration;
48-
import java.time.Instant;
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.concurrent.CountDownLatch;
@@ -766,13 +766,13 @@ void receiveMetricsAreReportedWithMessageIsReceived() throws IOException {
766766
// Arrange
767767
// This message was copied from one that was received.
768768
final byte[] messageBytes = new byte[] { 0, 83, 114, -63, 73, 6, -93, 21, 120, 45, 111, 112, 116, 45, 115, 101,
769-
113, 117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 85, 0, -93, 12, 120, 45, 111, 112, 116, 45,
769+
113, 117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 84, 42, -93, 12, 120, 45, 111, 112, 116, 45,
770770
111, 102, 102, 115, 101, 116, -95, 1, 48, -93, 19, 120, 45, 111, 112, 116, 45, 101, 110, 113, 117, 101, 117,
771771
101, 100, 45, 116, 105, 109, 101, -125, 0, 0, 1, 112, -54, 124, -41, 90, 0, 83, 117, -96, 12, 80, 111, 115,
772772
105, 116, 105, 111, 110, 53, 58, 32, 48};
773773

774774
// change if changing message above
775-
long messageEnqueuedTime = 1583945144154L;
775+
long sequenceNumber = 42;
776776

777777
final Link link = mock(Link.class);
778778
final Delivery delivery = mock(Delivery.class);
@@ -807,6 +807,9 @@ void receiveMetricsAreReportedWithMessageIsReceived() throws IOException {
807807
ReactorReceiver reactorReceiverWithMetrics = new ReactorReceiver(amqpConnection, "name/and/partition", receiver, receiverHandler, tokenManager,
808808
reactorDispatcher, retryOptions, metricsProvider);
809809

810+
TestGauge sequenceNumberMetric = meter.getGauges().get("messaging.az.amqp.prefetch.sequence_number");
811+
TestGauge.Subscription subscription = sequenceNumberMetric.getSubscriptions().get(0);
812+
810813
reactorReceiverWithMetrics.setEmptyCreditListener(creditSupplier);
811814

812815
doAnswer(invocationOnMock -> {
@@ -822,10 +825,11 @@ void receiveMetricsAreReportedWithMessageIsReceived() throws IOException {
822825
.verify(VERIFY_TIMEOUT);
823826

824827
// Assert
825-
List<TestMeasurement<Double>> receivedLag = meter.getHistograms().get("messaging.az.amqp.consumer.lag").getMeasurements();
826-
assertEquals(1, receivedLag.size());
827-
TestMeasurement<Double> measurement = receivedLag.get(0);
828-
assertEquals((Instant.now().toEpochMilli() - messageEnqueuedTime) / 1000d, measurement.getValue(), 100);
828+
subscription.measure();
829+
List<TestMeasurement<Long>> seqNumbers = subscription.getMeasurements();
830+
assertEquals(1, seqNumbers.size());
831+
TestMeasurement<Long> measurement = seqNumbers.get(0);
832+
assertEquals(sequenceNumber, measurement.getValue());
829833
assertEquals(Context.NONE, measurement.getContext());
830834
assertEquals("namespace", measurement.getAttributes().get(ClientConstants.HOSTNAME_KEY));
831835
assertEquals("name", measurement.getAttributes().get(ClientConstants.ENTITY_NAME_KEY));

0 commit comments

Comments
 (0)