Skip to content

Commit 5bdf09a

Browse files
author
Liudmila Molkova
authored
Tracing for eventhubs consumer and batch processor (Azure#31197)
* Trace eventhubs consumer and batch processor
1 parent 6e3a415 commit 5bdf09a

File tree

49 files changed

+1804
-597
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1804
-597
lines changed

eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
<suppress checks="IllegalImport" files=".*[/\\]com[/\\]azure[/\\]identity[/\\]*"/>
101101
<suppress checks="IllegalImport" files="com.azure.messaging.servicebus.TracingIntegrationTests.java"/>
102102
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.TracingIntegrationTests.java"/>
103+
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.PublishEventsTracingWithCustomContextSample.java"/>
103104

104105
<!-- Suppress warnings for Event Processor until the usage of "Client" is discussed and resolved:
105106
https://github.com/Azure/azure-sdk/issues/321 -->

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpMetricsProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ public boolean isSendDeliveryEnabled() {
146146
return isEnabled && sendDuration.isEnabled();
147147
}
148148

149+
/**
150+
* Checks if request-response duration metric is enabled (for micro-optimizations).
151+
*/
152+
public boolean isRequestResponseDurationEnabled() {
153+
return isEnabled && sendDuration.isEnabled();
154+
}
155+
156+
149157
/**
150158
* Checks if prefetched sequence number is enabled (for micro-optimizations).
151159
*/

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ private void terminateUnconfirmedSends(Throwable error) {
504504
* Captures current time in mono context - used to report send metric
505505
*/
506506
private Mono<Message> captureStartTime(Message toSend, Mono<Message> publisher) {
507-
if (metricsProvider.isSendDeliveryEnabled()) {
507+
if (metricsProvider.isRequestResponseDurationEnabled()) {
508508
String operationName = "unknown";
509509
if (toSend != null && toSend.getApplicationProperties() != null && toSend.getApplicationProperties().getValue() != null) {
510510
Map<String, Object> properties = toSend.getApplicationProperties().getValue();
@@ -532,7 +532,7 @@ private static ContextView getSinkContext(MonoSink<?> sink) {
532532
* Records send call duration metric.
533533
**/
534534
private void recordDelivery(ContextView context, Message response) {
535-
if (metricsProvider.isSendDeliveryEnabled()) {
535+
if (metricsProvider.isRequestResponseDurationEnabled()) {
536536
Object startTimestamp = context.getOrDefault(START_SEND_TIME_CONTEXT_KEY, null);
537537
Object operationName = context.getOrDefault(OPERATION_CONTEXT_KEY, null);
538538
AmqpResponseCode responseCode = response == null ? null : RequestResponseUtils.getStatusCode(response);

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/TracerProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
import java.util.Objects;
1313

14+
@Deprecated
15+
/**
16+
* @deprecated use EventHubs*Tracer and ServiceBus*Tracer instead.
17+
*/
1418
public class TracerProvider {
1519
private static final ClientLogger LOGGER = new ClientLogger(TracerProvider.class);
1620
private Tracer tracer;

sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/TracerProviderTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.mockito.Mockito.verify;
3232
import static org.mockito.Mockito.when;
3333

34+
@SuppressWarnings("deprecation")
3435
public class TracerProviderTest {
3536
private static final String SERVICE_BASE_NAME = "serviceBaseName";
3637
private static final String METHOD_NAME = SERVICE_BASE_NAME + "send";

sdk/core/azure-core-tracing-opentelemetry/README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,13 @@ try {
122122

123123
Send a single event/message using [azure-messaging-eventhubs][azure-messaging-eventhubs] with tracing enabled.
124124

125-
Users can additionally pass the value of the current tracing span to the EventData object with key **PARENT_TRACE_CONTEXT_KEY** on the [Context][context] object:
125+
Users can additionally pass custom value of the trace context to the EventData object with key **PARENT_TRACE_CONTEXT_KEY** on the [Context][context] object.
126+
127+
Please refer to [Event Hubs samples][event_hubs_samples]
128+
for more information.
129+
130+
```java
126131

127-
```java readme-sample-context-manual-propagation-amqp
128132
Flux<EventData> events = Flux.just(
129133
new EventData("EventData Sample 1"),
130134
new EventData("EventData Sample 2"));
@@ -151,6 +155,7 @@ events.collect(batchRef::get, (b, e) ->
151155
return ctx.put(PARENT_TRACE_CONTEXT_KEY, traceContextRef.updateAndGet(traceContext -> traceContext.with(span)));
152156
})
153157
.block();
158+
154159
```
155160

156161
## Troubleshooting
@@ -203,9 +208,9 @@ This project has adopted the [Microsoft Open Source Code of Conduct](https://ope
203208
[OpenTelemetry]: https://github.com/open-telemetry/opentelemetry-java#opentelemetry-for-java
204209
[sample_app_config]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-tracing-opentelemetry/src/samples/java/com/azure/core/tracing/opentelemetry/CreateConfigurationSettingLoggingExporterSample.java
205210
[sample_async_key_vault]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-tracing-opentelemetry/src/samples/java/com/azure/core/tracing/opentelemetry/ListKeyVaultSecretsAutoConfigurationSample.java
206-
[sample_eventhubs]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-tracing-opentelemetry/src/samples/java/com/azure/core/tracing/opentelemetry/PublishEventsJaegerExporterSample.java
207211
[sample_key_vault]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-tracing-opentelemetry/src/samples/java/com/azure/core/tracing/opentelemetry/ListKeyVaultSecretsJaegerExporterSample.java
208212
[samples]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-tracing-opentelemetry/src/samples/
209213
[source_code]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-tracing-opentelemetry/src
214+
[event_hubs_samples](https://github.com/Azure/azure-sdk-for-java/blob/10a18ccc2f20cad6004ae90d64f22009d65e9ef7/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PublishEventsTracingWithCustomContextSample.java)
210215

211216
![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fcore%2Fazure-core-tracing-opentelemetry%2FREADME.png)

sdk/core/azure-core-tracing-opentelemetry/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,6 @@
134134
<version>1.6.1</version> <!-- {x-version-update;com.azure:azure-identity;dependency} -->
135135
<scope>test</scope>
136136
</dependency>
137-
<dependency>
138-
<groupId>com.azure</groupId>
139-
<artifactId>azure-messaging-eventhubs</artifactId>
140-
<version>5.14.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;dependency} -->
141-
<scope>test</scope>
142-
</dependency>
143137
<dependency>
144138
<groupId>io.opentelemetry</groupId>
145139
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>

sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracer.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.opentelemetry.api.trace.Tracer;
2323
import io.opentelemetry.context.Scope;
2424

25+
import java.time.Instant;
2526
import java.time.OffsetDateTime;
2627
import java.util.Map;
2728
import java.util.Objects;
@@ -36,6 +37,8 @@
3637
*/
3738
public class OpenTelemetryTracer implements com.azure.core.util.tracing.Tracer {
3839
private static final StartSpanOptions DEFAULT_OPTIONS = new StartSpanOptions(com.azure.core.util.tracing.SpanKind.INTERNAL);
40+
private static final String SPAN_KIND_KEY = "span-kind";
41+
private static final String START_TIME_KEY = "span-start-time";
3942
private final Tracer tracer;
4043

4144
/**
@@ -65,7 +68,7 @@ public OpenTelemetryTracer() {
6568

6669
private static final ClientLogger LOGGER = new ClientLogger(OpenTelemetryTracer.class);
6770
private static final AutoCloseable NOOP_CLOSEABLE = () -> { };
68-
private static final SpanKind SHARED_SPAN_BUILDER_KIND = SpanKind.CLIENT;
71+
private static final SpanKind DEFAULT_SHARED_SPAN_BUILDER_KIND = SpanKind.CLIENT;
6972
private static final String SUPPRESSED_SPAN_FLAG = "suppressed-span-flag";
7073
private static final String CLIENT_METHOD_CALL_FLAG = "client-method-call-flag";
7174

@@ -114,7 +117,7 @@ public Context start(String spanName, Context context, ProcessKind processKind)
114117
context = unsuppress(context);
115118
switch (processKind) {
116119
case SEND:
117-
// use previously created span builder from the LINK process.
120+
// use previously created span builder with the links
118121
spanBuilder = getOrNull(context, SPAN_BUILDER_KEY, SpanBuilder.class);
119122
if (spanBuilder == null) {
120123
// we can't return context here, because caller would not know that span was not created.
@@ -123,17 +126,23 @@ public Context start(String spanName, Context context, ProcessKind processKind)
123126
.addKeyValue("spanName", spanName)
124127
.addKeyValue("processKind", processKind)
125128
.log("Start span is called without builder on the context, creating default builder.");
126-
spanBuilder = createSpanBuilder(spanName, null, SHARED_SPAN_BUILDER_KIND, null, context);
129+
spanBuilder = createSpanBuilder(spanName, null, SpanKind.CLIENT, null, context);
127130
}
128131

129-
return startSpanInternal(spanBuilder, isClientCall(SHARED_SPAN_BUILDER_KIND), this::addMessagingAttributes, context);
132+
return startSpanInternal(spanBuilder, true, this::addMessagingAttributes, context);
130133
case MESSAGE:
131134
spanBuilder = createSpanBuilder(spanName, null, SpanKind.PRODUCER, null, context);
132135
context = startSpanInternal(spanBuilder, false, this::addMessagingAttributes, context);
133136
return setDiagnosticId(context);
134137
case PROCESS:
135-
SpanContext remoteParentContext = getOrNull(context, SPAN_CONTEXT_KEY, SpanContext.class);
136-
spanBuilder = createSpanBuilder(spanName, remoteParentContext, SpanKind.CONSUMER, null, context);
138+
// use previously created span builder with the links
139+
spanBuilder = getOrNull(context, SPAN_BUILDER_KEY, SpanBuilder.class);
140+
if (spanBuilder == null) {
141+
// if there is no builder, create new one from parent in context
142+
SpanContext remoteParentContext = getOrNull(context, SPAN_CONTEXT_KEY, SpanContext.class);
143+
spanBuilder = createSpanBuilder(spanName, remoteParentContext, SpanKind.CONSUMER, null, context);
144+
}
145+
137146
context = startSpanInternal(spanBuilder, false, this::addMessagingAttributes, context);
138147

139148
// TODO (limolkova) we should do this in the EventHub/ServiceBus SDK instead to make sure scope is
@@ -227,7 +236,14 @@ public void addLink(Context context) {
227236
if (spanContext == null) {
228237
return;
229238
}
230-
spanBuilder.addLink(spanContext);
239+
240+
Attributes linkAttributes = Attributes.empty();
241+
Long messageEnqueuedTime = getOrNull(context, MESSAGE_ENQUEUED_TIME, Long.class);
242+
if (messageEnqueuedTime != null) {
243+
linkAttributes = Attributes.of(AttributeKey.longKey(MESSAGE_ENQUEUED_TIME), messageEnqueuedTime);
244+
}
245+
246+
spanBuilder.addLink(spanContext, linkAttributes);
231247
}
232248

233249
/**
@@ -243,8 +259,18 @@ public Context extractContext(String diagnosticId, Context context) {
243259
*/
244260
@Override
245261
public Context getSharedSpanBuilder(String spanName, Context context) {
246-
// this is used to create messaging send spanBuilder, and it's a CLIENT span
247-
return context.addData(SPAN_BUILDER_KEY, createSpanBuilder(spanName, null, SHARED_SPAN_BUILDER_KIND, null, context));
262+
com.azure.core.util.tracing.SpanKind spanKind = getOrNull(context, SPAN_KIND_KEY, com.azure.core.util.tracing.SpanKind.class);
263+
if (spanKind == null) {
264+
spanKind = com.azure.core.util.tracing.SpanKind.CLIENT;
265+
}
266+
267+
SpanBuilder builder = createSpanBuilder(spanName, null, convertToOtelKind(spanKind), null, context);
268+
Instant startTime = getOrNull(context, START_TIME_KEY, Instant.class);
269+
if (startTime != null) {
270+
builder.setStartTimestamp(startTime);
271+
}
272+
273+
return context.addData(SPAN_BUILDER_KEY, builder);
248274
}
249275

250276
/**
@@ -563,7 +589,7 @@ private Span getSpanOrNull(Context azContext) {
563589
private SpanKind processKindToSpanKind(ProcessKind processKind) {
564590
switch (processKind) {
565591
case SEND:
566-
return SHARED_SPAN_BUILDER_KIND;
592+
return SpanKind.CLIENT;
567593
case MESSAGE:
568594
return SpanKind.PRODUCER;
569595
case PROCESS:

sdk/core/azure-core-tracing-opentelemetry/src/test/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracerTest.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,58 @@ public void startProcessSpanWithRemoteParent() {
323323
assertTrue(updatedContext.getData(SCOPE_KEY).isPresent());
324324
}
325325

326+
@Test
327+
public void startProcessSpanWithLinks() {
328+
// Arrange
329+
final Context spanBuilder = openTelemetryTracer.getSharedSpanBuilder("span", Context.NONE);
330+
331+
Span link1 = tracer.spanBuilder("link1").startSpan();
332+
Span link2 = tracer.spanBuilder("link2").startSpan();
333+
334+
openTelemetryTracer.addLink(spanBuilder.addData(SPAN_CONTEXT_KEY, link1.getSpanContext()));
335+
openTelemetryTracer.addLink(spanBuilder
336+
.addData(SPAN_CONTEXT_KEY, link2.getSpanContext())
337+
.addData(MESSAGE_ENQUEUED_TIME, MESSAGE_ENQUEUED_VALUE));
338+
339+
// Act
340+
final Context spanCtx = openTelemetryTracer.start(METHOD_NAME, spanBuilder, ProcessKind.PROCESS);
341+
openTelemetryTracer.end(null, null, spanCtx);
342+
343+
// Assert
344+
ReadableSpan span = getSpan(spanCtx);
345+
List<LinkData> links = span.toSpanData().getLinks();
346+
assertEquals(2, links.size());
347+
assertEquals(link1.getSpanContext().getTraceId(), links.get(0).getSpanContext().getTraceId());
348+
assertEquals(link1.getSpanContext().getSpanId(), links.get(0).getSpanContext().getSpanId());
349+
assertEquals(0, links.get(0).getAttributes().size());
350+
351+
assertEquals(link2.getSpanContext().getTraceId(), links.get(1).getSpanContext().getTraceId());
352+
assertEquals(link2.getSpanContext().getSpanId(), links.get(1).getSpanContext().getSpanId());
353+
Attributes linkAttributes = links.get(1).getAttributes();
354+
assertEquals(1, linkAttributes.size());
355+
assertEquals(MESSAGE_ENQUEUED_VALUE, linkAttributes.get(AttributeKey.longKey(MESSAGE_ENQUEUED_TIME)));
356+
}
357+
358+
@Test
359+
public void startConsumeSpanWitStartTimeInContext() {
360+
// Arrange
361+
final Context spanBuilder = openTelemetryTracer.getSharedSpanBuilder("span",
362+
new Context("span-start-time", Instant.now().minusSeconds(1000)));
363+
364+
Span link = tracer.spanBuilder("link1").startSpan();
365+
366+
openTelemetryTracer.addLink(spanBuilder.addData(SPAN_CONTEXT_KEY, link.getSpanContext()));
367+
368+
// Act
369+
final Context spanCtx = openTelemetryTracer.start(METHOD_NAME, spanBuilder, ProcessKind.PROCESS);
370+
openTelemetryTracer.end(null, null, spanCtx);
371+
372+
// Assert
373+
ReadableSpan span = getSpan(spanCtx);
374+
assertEquals(1, span.toSpanData().getLinks().size());
375+
assertEquals(span.getLatencyNanos() / 1000_000_000d, 1000d, 10);
376+
}
377+
326378
@Test
327379
public void startSpanOverloadNullPointerException() {
328380

@@ -930,7 +982,6 @@ private static Stream<Arguments> spanKinds() {
930982
Arguments.of(com.azure.core.util.tracing.SpanKind.SERVER, com.azure.core.util.tracing.SpanKind.PRODUCER, false),
931983
Arguments.of(com.azure.core.util.tracing.SpanKind.SERVER, com.azure.core.util.tracing.SpanKind.CONSUMER, false),
932984
Arguments.of(com.azure.core.util.tracing.SpanKind.SERVER, com.azure.core.util.tracing.SpanKind.SERVER, false));
933-
934985
}
935986

936987
@Test

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
### Features Added
1616

17+
- Enabled metrics for sent events, consumer lag, checkpointing. ([#31024](https://github.com/Azure/azure-sdk-for-java/pull/31024))
18+
- Enabled distributed tracing for consumer and batch processor. ([#31197](https://github.com/Azure/azure-sdk-for-java/pull/31197))
1719
- Added algorithm for mapping partition keys to partition ids.
1820
- Added EventHubBufferedProducerAsyncClient and EventHubBufferedProducerClient
1921

0 commit comments

Comments
 (0)