|
| 1 | +/* |
| 2 | + * Copyright The OpenZipkin Authors |
| 3 | + * SPDX-License-Identifier: Apache-2.0 |
| 4 | + */ |
| 5 | +package brave.rocketmq.client; |
| 6 | + |
| 7 | +import brave.Span; |
| 8 | +import brave.Tracer; |
| 9 | +import brave.Tracing; |
| 10 | +import brave.internal.Nullable; |
| 11 | +import brave.messaging.MessagingRequest; |
| 12 | +import brave.propagation.TraceContext; |
| 13 | +import brave.propagation.TraceContextOrSamplingFlags; |
| 14 | +import brave.sampler.SamplerFunction; |
| 15 | +import org.apache.rocketmq.common.message.MessageExt; |
| 16 | + |
| 17 | +import java.util.List; |
| 18 | +import java.util.function.BiFunction; |
| 19 | +import java.util.function.Function; |
| 20 | + |
| 21 | +import static brave.Span.Kind.CONSUMER; |
| 22 | +import static brave.internal.Throwables.propagateIfFatal; |
| 23 | +import static brave.rocketmq.client.RocketMQTracing.ROCKETMQ_TOPIC; |
| 24 | + |
| 25 | +/** |
| 26 | + * Read records headers to create and complete a child of the incoming |
| 27 | + * producers span if possible. |
| 28 | + * The spans are modeled as a duration 1 {@link Span.Kind#CONSUMER} span to represent consuming the |
| 29 | + * message from the rocketmq broker with a child span representing the processing of the message. |
| 30 | + */ |
| 31 | +abstract class AbstractMessageListener { |
| 32 | + final RocketMQTracing rocketMQTracing; |
| 33 | + final Tracing tracing; |
| 34 | + final Tracer tracer; |
| 35 | + final TraceContext.Extractor<MessageConsumerRequest> extractor; |
| 36 | + final SamplerFunction<MessagingRequest> sampler; |
| 37 | + @Nullable final String remoteServiceName; |
| 38 | + |
| 39 | + AbstractMessageListener(RocketMQTracing rocketMQTracing) { |
| 40 | + this.rocketMQTracing = rocketMQTracing; |
| 41 | + this.tracing = rocketMQTracing.messagingTracing.tracing(); |
| 42 | + this.tracer = tracing.tracer(); |
| 43 | + this.extractor = rocketMQTracing.consumerExtractor; |
| 44 | + this.sampler = rocketMQTracing.consumerSampler; |
| 45 | + this.remoteServiceName = rocketMQTracing.remoteServiceName; |
| 46 | + } |
| 47 | + |
| 48 | + <T> T processConsumeMessage( |
| 49 | + List<MessageExt> msgs, |
| 50 | + Function<List<MessageExt>, T> consumerFunc, |
| 51 | + BiFunction<T, T, Boolean> successFunc, |
| 52 | + T successStatus |
| 53 | + ) { |
| 54 | + for (MessageExt message : msgs) { |
| 55 | + MessageConsumerRequest request = new MessageConsumerRequest(message); |
| 56 | + TraceContextOrSamplingFlags extracted = |
| 57 | + rocketMQTracing.extractAndClearTraceIdHeaders(extractor, request, message.getProperties()); |
| 58 | + Span consumerSpan = rocketMQTracing.nextMessagingSpan(sampler, request, extracted); |
| 59 | + Span listenerSpan = tracer.newChild(consumerSpan.context()); |
| 60 | + |
| 61 | + if (!consumerSpan.isNoop()) { |
| 62 | + setConsumerSpan(consumerSpan, message.getTopic()); |
| 63 | + // incur timestamp overhead only once |
| 64 | + long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds(); |
| 65 | + consumerSpan.start(timestamp); |
| 66 | + long consumerFinish = timestamp + 1L; // save a clock reading |
| 67 | + consumerSpan.finish(consumerFinish); |
| 68 | + // not using scoped span as we want to start with a pre-configured time |
| 69 | + listenerSpan.name("on-message").start(consumerFinish); |
| 70 | + } |
| 71 | + |
| 72 | + Tracer.SpanInScope scope = tracer.withSpanInScope(listenerSpan); |
| 73 | + Throwable error = null; |
| 74 | + T result; |
| 75 | + |
| 76 | + try { |
| 77 | + result = consumerFunc.apply(msgs); |
| 78 | + } catch (Throwable t) { |
| 79 | + propagateIfFatal(t); |
| 80 | + error = t; |
| 81 | + throw t; |
| 82 | + } finally { |
| 83 | + if (error != null) listenerSpan.error(error); |
| 84 | + listenerSpan.finish(); |
| 85 | + scope.close(); |
| 86 | + } |
| 87 | + |
| 88 | + if (!successFunc.apply(result, successStatus)) { |
| 89 | + return result; |
| 90 | + } |
| 91 | + } |
| 92 | + return successStatus; |
| 93 | + } |
| 94 | + |
| 95 | + void setConsumerSpan(Span span, String topic) { |
| 96 | + span.name("receive").kind(CONSUMER); |
| 97 | + span.tag(ROCKETMQ_TOPIC, topic); |
| 98 | + if (remoteServiceName != null) span.remoteServiceName(remoteServiceName); |
| 99 | + } |
| 100 | +} |
0 commit comments