textMapGetter) {
+ this.textMapGetter = textMapGetter;
+ return this;
+ }
+
+ /**
+ * This method overrides the original items.
+ *
+ * See {@link DefaultMessagingProcessWrapperBuilder#addAttributesExtractor} if you just want to
+ * append one.
+ */
+ @CanIgnoreReturnValue
+ public DefaultMessagingProcessWrapperBuilder attributesExtractors(
+ Collection> attributesExtractors) {
+ this.attributesExtractors = new ArrayList<>();
+ this.attributesExtractors.addAll(attributesExtractors);
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public DefaultMessagingProcessWrapperBuilder addAttributesExtractor(
+ AttributesExtractor attributesExtractor) {
+ this.attributesExtractors.add(attributesExtractor);
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public DefaultMessagingProcessWrapperBuilder addAttributesExtractors(
+ Collection> attributesExtractor) {
+ this.attributesExtractors.addAll(attributesExtractor);
+ return this;
+ }
+
+ public MessagingProcessWrapper build() {
+ return new MessagingProcessWrapper<>(
+ this.openTelemetry == null ? GlobalOpenTelemetry.get() : this.openTelemetry,
+ this.textMapGetter == null ? NoopTextMapGetter.create() : this.textMapGetter,
+ this.attributesExtractors);
+ }
+
+ protected DefaultMessagingProcessWrapperBuilder() {
+ // init attributes extractors by default
+ this.attributesExtractors = new ArrayList<>();
+ this.attributesExtractors.add(
+ MessagingAttributesExtractor.create(
+ DefaultMessagingAttributesGetter.create(), MessageOperation.PROCESS));
+ }
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/MessagingProcessWrapper.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/MessagingProcessWrapper.java
new file mode 100644
index 000000000..89a8f099b
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/MessagingProcessWrapper.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
+import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
+import java.util.List;
+import javax.annotation.Nullable;
+
+public class MessagingProcessWrapper {
+
+ private static final String INSTRUMENTATION_SCOPE = "messaging-process-wrapper";
+
+ private static final String INSTRUMENTATION_VERSION = "1.0.0";
+
+ private static final String OPERATION_NAME = "process";
+
+ private final TextMapPropagator textMapPropagator;
+
+ private final Tracer tracer;
+
+ private final TextMapGetter textMapGetter;
+
+ // no attributes need to be extracted from responses in process operations
+ private final List> attributesExtractors;
+
+ public static
+ DefaultMessagingProcessWrapperBuilder defaultBuilder() {
+ return new DefaultMessagingProcessWrapperBuilder<>();
+ }
+
+ public void doProcess(REQUEST request, ThrowingRunnable runnable)
+ throws E {
+ Span span = handleStart(request);
+
+ try (Scope scope = span.makeCurrent()) {
+ runnable.run();
+ } catch (Throwable t) {
+ handleEnd(span, request, t);
+ throw t;
+ }
+
+ handleEnd(span, request, null);
+ }
+
+ public R doProcess(REQUEST request, ThrowingSupplier supplier)
+ throws E {
+ Span span = handleStart(request);
+
+ R result = null;
+ try (Scope scope = span.makeCurrent()) {
+ result = supplier.get();
+ } catch (Throwable t) {
+ handleEnd(span, request, t);
+ throw t;
+ }
+
+ handleEnd(span, request, null);
+ return result;
+ }
+
+ protected Span handleStart(REQUEST request) {
+ Context context =
+ this.textMapPropagator.extract(Context.current(), request, this.textMapGetter);
+ SpanBuilder spanBuilder = this.tracer.spanBuilder(getDefaultSpanName(request.getDestination()));
+ spanBuilder.setSpanKind(CONSUMER).setParent(context);
+
+ AttributesBuilder builder = Attributes.builder();
+ for (AttributesExtractor extractor : this.attributesExtractors) {
+ extractor.onStart(builder, context, request);
+ }
+ return spanBuilder.setAllAttributes(builder.build()).startSpan();
+ }
+
+ protected void handleEnd(Span span, REQUEST request, @Nullable Throwable t) {
+ AttributesBuilder builder = Attributes.builder();
+ for (AttributesExtractor extractor : this.attributesExtractors) {
+ extractor.onEnd(builder, Context.current(), request, null, t);
+ }
+ span.end();
+ }
+
+ protected String getDefaultSpanName(@Nullable String destination) {
+ if (destination == null) {
+ destination = "unknown";
+ }
+ return OPERATION_NAME + " " + destination;
+ }
+
+ protected MessagingProcessWrapper(
+ OpenTelemetry openTelemetry,
+ TextMapGetter textMapGetter,
+ List> attributesExtractors) {
+ this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
+ this.tracer = openTelemetry.getTracer(INSTRUMENTATION_SCOPE, INSTRUMENTATION_VERSION);
+ this.textMapGetter = textMapGetter;
+ this.attributesExtractors = attributesExtractors;
+ }
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/NoopTextMapGetter.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/NoopTextMapGetter.java
new file mode 100644
index 000000000..213fb4a6a
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/NoopTextMapGetter.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
+import java.util.Collections;
+import javax.annotation.Nullable;
+
+public class NoopTextMapGetter
+ implements TextMapGetter {
+
+ public static TextMapGetter create() {
+ return new NoopTextMapGetter<>();
+ }
+
+ @Override
+ public Iterable keys(REQUEST request) {
+ return Collections.emptyList();
+ }
+
+ @Nullable
+ @Override
+ public String get(@Nullable REQUEST request, String s) {
+ return null;
+ }
+
+ private NoopTextMapGetter() {}
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingRunnable.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingRunnable.java
new file mode 100644
index 000000000..e1bbd05b0
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingRunnable.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+/**
+ * A utility interface representing a {@link Runnable} that may throw.
+ *
+ * Inspired from ThrowingRunnable.
+ *
+ * @param Thrown exception type.
+ */
+@FunctionalInterface
+public interface ThrowingRunnable {
+ void run() throws E;
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingSupplier.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingSupplier.java
new file mode 100644
index 000000000..9bec00d4b
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingSupplier.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+import java.util.function.Supplier;
+
+/**
+ * A utility interface representing a {@link Supplier} that may throw.
+ *
+ * Inspired from ThrowingSupplier.
+ *
+ * @param Thrown exception type.
+ */
+@FunctionalInterface
+public interface ThrowingSupplier {
+ T get() throws E;
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
new file mode 100644
index 000000000..502b96ca6
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
@@ -0,0 +1,2 @@
+/** OpenTelemetry messaging wrappers extension. */
+package io.opentelemetry.contrib.messaging.wrappers;
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessagingAttributesGetter.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessagingAttributesGetter.java
new file mode 100644
index 000000000..89c37c703
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessagingAttributesGetter.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.semconv;
+
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
+import java.util.List;
+import javax.annotation.Nullable;
+
+public class DefaultMessagingAttributesGetter
+ implements MessagingAttributesGetter {
+
+ public static
+ MessagingAttributesGetter create() {
+ return new DefaultMessagingAttributesGetter<>();
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationPartitionId(REQUEST request) {
+ return request.getDestinationPartitionId();
+ }
+
+ @Override
+ public List getMessageHeader(REQUEST request, String name) {
+ return request.getMessageHeader(name);
+ }
+
+ @Nullable
+ @Override
+ public String getSystem(REQUEST request) {
+ return request.getSystem();
+ }
+
+ @Nullable
+ @Override
+ public String getDestination(REQUEST request) {
+ return request.getDestination();
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationTemplate(REQUEST request) {
+ return request.getDestinationTemplate();
+ }
+
+ @Override
+ public boolean isTemporaryDestination(REQUEST request) {
+ return request.isTemporaryDestination();
+ }
+
+ @Override
+ public boolean isAnonymousDestination(REQUEST request) {
+ return request.isAnonymousDestination();
+ }
+
+ @Nullable
+ @Override
+ public String getConversationId(REQUEST request) {
+ return request.getConversationId();
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageBodySize(REQUEST request) {
+ return request.getMessageBodySize();
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageEnvelopeSize(REQUEST request) {
+ return request.getMessageEnvelopeSize();
+ }
+
+ @Nullable
+ @Override
+ public String getMessageId(REQUEST request, @Nullable Void unused) {
+ return request.getMessageId();
+ }
+
+ @Nullable
+ @Override
+ public String getClientId(REQUEST request) {
+ return request.getClientId();
+ }
+
+ @Nullable
+ @Override
+ public Long getBatchMessageCount(REQUEST request, @Nullable Void unused) {
+ return request.getBatchMessageCount();
+ }
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/MessagingProcessRequest.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/MessagingProcessRequest.java
new file mode 100644
index 000000000..42d342d79
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/MessagingProcessRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.semconv;
+
+import static java.util.Collections.emptyList;
+
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * An interface to expose messaging properties for the pre-defined process wrapper.
+ *
+ * Inspired from MessagingAttributesGetter.
+ */
+public interface MessagingProcessRequest {
+
+ String getSystem();
+
+ @Nullable
+ String getDestination();
+
+ @Nullable
+ String getDestinationTemplate();
+
+ boolean isTemporaryDestination();
+
+ boolean isAnonymousDestination();
+
+ @Nullable
+ String getConversationId();
+
+ @Nullable
+ Long getMessageBodySize();
+
+ @Nullable
+ Long getMessageEnvelopeSize();
+
+ @Nullable
+ String getMessageId();
+
+ @Nullable
+ default String getClientId() {
+ return null;
+ }
+
+ @Nullable
+ default Long getBatchMessageCount() {
+ return null;
+ }
+
+ @Nullable
+ default String getDestinationPartitionId() {
+ return null;
+ }
+
+ /**
+ * Extracts all values of header named {@code name} from the request, or an empty list if there
+ * were none.
+ *
+ *
Implementations of this method must not return a null value; an empty list should be
+ * returned instead.
+ */
+ default List getMessageHeader(String name) {
+ return emptyList();
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/TestConstants.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/TestConstants.java
new file mode 100644
index 000000000..a826c2393
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/TestConstants.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+public final class TestConstants {
+
+ public static final String MESSAGE_ID = "42";
+
+ public static final String MESSAGE_BODY = "Hello messaging wrapper!";
+
+ public static final String EVENTBUS_NAME = "test-eb";
+
+ public static final String CLIENT_ID = "eventbus-client-0";
+
+ private TestConstants() {}
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/UserDefinedMessageSystemTest.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/UserDefinedMessageSystemTest.java
new file mode 100644
index 000000000..43fca9167
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/UserDefinedMessageSystemTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.CLIENT_ID;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.EVENTBUS_NAME;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.MESSAGE_BODY;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.MESSAGE_ID;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
+
+import com.google.common.eventbus.EventBus;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.contrib.messaging.wrappers.impl.MessageRequest;
+import io.opentelemetry.contrib.messaging.wrappers.impl.MessageTextMapGetter;
+import io.opentelemetry.contrib.messaging.wrappers.model.Message;
+import io.opentelemetry.contrib.messaging.wrappers.model.MessageListener;
+import io.opentelemetry.contrib.messaging.wrappers.model.MessageTextMapSetter;
+import io.opentelemetry.contrib.messaging.wrappers.testing.AbstractBaseTest;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@SuppressWarnings("OtelInternalJavadoc")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class UserDefinedMessageSystemTest extends AbstractBaseTest {
+
+ private OpenTelemetry otel;
+
+ private Tracer tracer;
+
+ private EventBus eventBus;
+
+ @BeforeAll
+ void setupClass() {
+ otel = GlobalOpenTelemetry.get();
+ tracer = otel.getTracer("test-tracer", "1.0.0");
+ MessagingProcessWrapper wrapper =
+ MessagingProcessWrapper.defaultBuilder()
+ .openTelemetry(otel)
+ .textMapGetter(MessageTextMapGetter.create())
+ .build();
+
+ eventBus = new EventBus();
+
+ eventBus.register(MessageListener.create(tracer, wrapper));
+ }
+
+ @Test
+ void testSendAndConsume() {
+ sendWithParent(tracer);
+
+ assertTraces();
+ }
+
+ public void sendWithParent(Tracer tracer) {
+ // mock a send span
+ Span parent =
+ tracer.spanBuilder("publish " + EVENTBUS_NAME).setSpanKind(SpanKind.PRODUCER).startSpan();
+
+ try (Scope scope = parent.makeCurrent()) {
+ Message message = Message.create(new HashMap<>(), MESSAGE_ID, MESSAGE_BODY);
+ otel.getPropagators()
+ .getTextMapPropagator()
+ .inject(Context.current(), message, MessageTextMapSetter.create());
+ eventBus.post(message);
+ }
+
+ parent.end();
+ }
+
+ /**
+ * Copied from testing-common.
+ */
+ @SuppressWarnings("deprecation") // using deprecated semconv
+ public void assertTraces() {
+ waitAndAssertTraces(
+ sortByRootSpanName("parent", "producer callback"),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ // No need to verify the attribute here because it is generated by
+ // instrumentation library.
+ span.hasName("publish " + EVENTBUS_NAME)
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent(),
+ span ->
+ span.hasName("process " + EVENTBUS_NAME)
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(0))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_SYSTEM, "guava-eventbus"),
+ equalTo(MESSAGING_DESTINATION_NAME, EVENTBUS_NAME),
+ equalTo(
+ MESSAGING_MESSAGE_BODY_SIZE,
+ MESSAGE_BODY.getBytes(StandardCharsets.UTF_8).length),
+ // FIXME: We do have "messaging.client_id" in instrumentation but
+ // "messaging.client.id" in
+ // semconv library right now. It should be replaced after semconv
+ // release.
+ equalTo(AttributeKey.stringKey("messaging.client_id"), CLIENT_ID),
+ equalTo(MESSAGING_OPERATION, "process")),
+ span ->
+ span.hasName("process child")
+ .hasKind(SpanKind.INTERNAL)
+ .hasParent(trace.getSpan(1))));
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageRequest.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageRequest.java
new file mode 100644
index 000000000..c70523c4b
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.impl;
+
+import io.opentelemetry.contrib.messaging.wrappers.model.Message;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+
+public class MessageRequest implements MessagingProcessRequest {
+
+ private final Message message;
+
+ @Nullable private final String clientId;
+
+ @Nullable private final String eventBusName;
+
+ public static MessageRequest of(Message message) {
+ return of(message, null, null);
+ }
+
+ public static MessageRequest of(
+ Message message, @Nullable String clientId, @Nullable String eventBusName) {
+ return new MessageRequest(message, clientId, eventBusName);
+ }
+
+ @Override
+ public String getSystem() {
+ return "guava-eventbus";
+ }
+
+ @Nullable
+ @Override
+ public String getDestination() {
+ return eventBusName;
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationTemplate() {
+ return null;
+ }
+
+ @Override
+ public boolean isTemporaryDestination() {
+ return false;
+ }
+
+ @Override
+ public boolean isAnonymousDestination() {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public String getConversationId() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageBodySize() {
+ return (long) message.getBody().getBytes(StandardCharsets.UTF_8).length;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageEnvelopeSize() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getMessageId() {
+ return message.getId();
+ }
+
+ @Nullable
+ @Override
+ public String getClientId() {
+ return clientId;
+ }
+
+ @Override
+ public List getMessageHeader(String name) {
+ return Collections.singletonList(message.getHeaders().get(name));
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ private MessageRequest(
+ Message message, @Nullable String clientId, @Nullable String eventBusName) {
+ this.message = message;
+ this.clientId = clientId;
+ this.eventBusName = eventBusName;
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageTextMapGetter.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageTextMapGetter.java
new file mode 100644
index 000000000..4fa167fcb
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageTextMapGetter.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.impl;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+import java.util.Collections;
+import javax.annotation.Nullable;
+
+public class MessageTextMapGetter implements TextMapGetter {
+
+ public static TextMapGetter create() {
+ return new MessageTextMapGetter();
+ }
+
+ @Override
+ public Iterable keys(MessageRequest carrier) {
+ if (carrier == null || carrier.getMessage() == null) {
+ return Collections.emptyList();
+ }
+ return carrier.getMessage().getHeaders().keySet();
+ }
+
+ @Nullable
+ @Override
+ public String get(@Nullable MessageRequest carrier, String key) {
+ if (carrier == null || carrier.getMessage() == null) {
+ return null;
+ }
+ return carrier.getMessage().getHeaders().get(key);
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/Message.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/Message.java
new file mode 100644
index 000000000..e606e214e
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/Message.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.model;
+
+import java.util.Map;
+
+public class Message {
+
+ private Map headers;
+
+ private String id;
+
+ private String body;
+
+ public static Message create(Map headers, String id, String body) {
+ return new Message(headers, id, body);
+ }
+
+ private Message(Map headers, String id, String body) {
+ this.headers = headers;
+ this.id = id;
+ this.body = body;
+ }
+
+ public Map getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map headers) {
+ this.headers = headers;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageListener.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageListener.java
new file mode 100644
index 000000000..cc22a8958
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageListener.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.model;
+
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.CLIENT_ID;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.EVENTBUS_NAME;
+
+import com.google.common.eventbus.Subscribe;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.contrib.messaging.wrappers.MessagingProcessWrapper;
+import io.opentelemetry.contrib.messaging.wrappers.impl.MessageRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(MessageListener.class);
+
+ private final Tracer tracer;
+
+ private final MessagingProcessWrapper wrapper;
+
+ public static MessageListener create(
+ Tracer tracer, MessagingProcessWrapper wrapper) {
+ return new MessageListener(tracer, wrapper);
+ }
+
+ @Subscribe
+ public void handleEvent(Message event) {
+ wrapper.doProcess(
+ MessageRequest.of(event, CLIENT_ID, EVENTBUS_NAME),
+ () -> {
+ Span span = tracer.spanBuilder("process child").startSpan();
+ logger.info("Received event from <" + EVENTBUS_NAME + ">: " + event.getId());
+ span.end();
+ });
+ }
+
+ private MessageListener(Tracer tracer, MessagingProcessWrapper wrapper) {
+ this.tracer = tracer;
+ this.wrapper = wrapper;
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageTextMapSetter.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageTextMapSetter.java
new file mode 100644
index 000000000..77e5a56f4
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageTextMapSetter.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.model;
+
+import io.opentelemetry.context.propagation.TextMapSetter;
+import javax.annotation.Nullable;
+
+public class MessageTextMapSetter implements TextMapSetter {
+
+ public static TextMapSetter create() {
+ return new MessageTextMapSetter();
+ }
+
+ @Override
+ public void set(@Nullable Message carrier, String key, String value) {
+ if (carrier == null) {
+ return;
+ }
+ carrier.getHeaders().put(key, value);
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
new file mode 100644
index 000000000..502b96ca6
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
@@ -0,0 +1,2 @@
+/** OpenTelemetry messaging wrappers extension. */
+package io.opentelemetry.contrib.messaging.wrappers;
diff --git a/messaging-wrappers/kafka-clients/build.gradle.kts b/messaging-wrappers/kafka-clients/build.gradle.kts
new file mode 100644
index 000000000..56ad60c53
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/build.gradle.kts
@@ -0,0 +1,40 @@
+plugins {
+ id("otel.java-conventions")
+
+ id("otel.publish-conventions")
+}
+
+description = "OpenTelemetry Messaging Wrappers - kafka-clients implementation"
+otelJava.moduleName.set("io.opentelemetry.contrib.messaging.wrappers.kafka")
+
+dependencies {
+ api(project(":messaging-wrappers:api"))
+
+ // FIXME: We shouldn't depend on the library "opentelemetry-kafka-clients-common" directly because the api in this
+ // package could be mutable, unless the components were maintained in "opentelemetry-java-instrumentation" project.
+ // implementation("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common:2.13.3-alpha")
+
+ compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")
+
+ testImplementation("org.apache.kafka:kafka-clients:0.11.0.0")
+ testImplementation("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6")
+ testImplementation(project(":messaging-wrappers:testing"))
+
+ testAnnotationProcessor("com.google.auto.service:auto-service")
+ testCompileOnly("com.google.auto.service:auto-service-annotations")
+ testImplementation("org.testcontainers:kafka")
+ testImplementation("org.testcontainers:junit-jupiter")
+}
+
+tasks {
+ withType().configureEach {
+ jvmArgs("-Dotel.java.global-autoconfigure.enabled=true")
+ // TODO: According to https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#message-creation-context-as-parent-of-process-span,
+ // process span should be the child of receive span. However, we couldn't access the trace context with receive span
+ // in wrappers, unless we add a generic accessor for that.
+ jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
+ jvmArgs("-Dotel.traces.exporter=logging")
+ jvmArgs("-Dotel.metrics.exporter=logging")
+ jvmArgs("-Dotel.logs.exporter=logging")
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/gradle.properties b/messaging-wrappers/kafka-clients/gradle.properties
new file mode 100644
index 000000000..a0402e1e2
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/gradle.properties
@@ -0,0 +1,2 @@
+# TODO: uncomment when ready to mark as stable
+# otel.stable=true
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaHelper.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaHelper.java
new file mode 100644
index 000000000..fee425043
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaHelper.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaProcessRequest;
+
+public final class KafkaHelper {
+
+ public static
+ KafkaProcessWrapperBuilder processWrapperBuilder() {
+ return new KafkaProcessWrapperBuilder<>();
+ }
+
+ private KafkaHelper() {}
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaProcessWrapperBuilder.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaProcessWrapperBuilder.java
new file mode 100644
index 000000000..75716640c
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaProcessWrapperBuilder.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import io.opentelemetry.contrib.messaging.wrappers.DefaultMessagingProcessWrapperBuilder;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaProcessRequest;
+
+public class KafkaProcessWrapperBuilder
+ extends DefaultMessagingProcessWrapperBuilder {
+
+ KafkaProcessWrapperBuilder() {
+ super();
+ super.textMapGetter = KafkaTextMapGetter.create();
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaTextMapGetter.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaTextMapGetter.java
new file mode 100644
index 000000000..483561525
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaTextMapGetter.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaProcessRequest;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.kafka.common.header.Header;
+
+/**
+ * Copied from KafkaConsumerRecordGetter.
+ */
+public class KafkaTextMapGetter
+ implements TextMapGetter {
+
+ public static TextMapGetter create() {
+ return new KafkaTextMapGetter<>();
+ }
+
+ @Override
+ public Iterable keys(@Nullable REQUEST carrier) {
+ if (carrier == null || carrier.getRecord() == null) {
+ return Collections.emptyList();
+ }
+ return StreamSupport.stream(carrier.getRecord().headers().spliterator(), false)
+ .map(Header::key)
+ .collect(Collectors.toList());
+ }
+
+ @Nullable
+ @Override
+ public String get(@Nullable REQUEST carrier, String key) {
+ if (carrier == null || carrier.getRecord() == null) {
+ return null;
+ }
+ Header header = carrier.getRecord().headers().lastHeader(key);
+ if (header == null) {
+ return null;
+ }
+ byte[] value = header.value();
+ if (value == null) {
+ return null;
+ }
+ return new String(value, StandardCharsets.UTF_8);
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/package-info.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/package-info.java
new file mode 100644
index 000000000..7e988a5b6
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/package-info.java
@@ -0,0 +1,2 @@
+/** OpenTelemetry messaging wrappers extension - kafka implementation. */
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesExtractor.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesExtractor.java
new file mode 100644
index 000000000..e65df97d8
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesExtractor.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka.semconv;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Copied from KafkaConsumerAttributesExtractor.
+ */
+public final class KafkaConsumerAttributesExtractor
+ implements AttributesExtractor {
+
+ // copied from MessagingIncubatingAttributes
+ private static final AttributeKey MESSAGING_DESTINATION_PARTITION_ID =
+ AttributeKey.stringKey("messaging.destination.partition.id");
+ private static final AttributeKey MESSAGING_CONSUMER_GROUP_NAME =
+ AttributeKey.stringKey("messaging.consumer.group.name");
+ private static final AttributeKey MESSAGING_KAFKA_OFFSET =
+ AttributeKey.longKey("messaging.kafka.offset");
+ private static final AttributeKey MESSAGING_KAFKA_MESSAGE_KEY =
+ AttributeKey.stringKey("messaging.kafka.message.key");
+ private static final AttributeKey MESSAGING_KAFKA_MESSAGE_TOMBSTONE =
+ AttributeKey.booleanKey("messaging.kafka.message.tombstone");
+
+ public static AttributesExtractor create() {
+ return new KafkaConsumerAttributesExtractor<>();
+ }
+
+ @Override
+ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
+
+ ConsumerRecord, ?> record = request.getRecord();
+
+ attributes.put(MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(record.partition()));
+ attributes.put(MESSAGING_KAFKA_OFFSET, record.offset());
+
+ Object key = record.key();
+ if (key != null && canSerialize(key.getClass())) {
+ attributes.put(MESSAGING_KAFKA_MESSAGE_KEY, key.toString());
+ }
+ if (record.value() == null) {
+ attributes.put(MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true);
+ }
+
+ String consumerGroup = request.getConsumerGroup();
+ if (consumerGroup != null) {
+ attributes.put(MESSAGING_CONSUMER_GROUP_NAME, consumerGroup);
+ }
+ }
+
+ private static boolean canSerialize(Class> keyClass) {
+ // we make a simple assumption here that we can serialize keys by simply calling toString()
+ // and that does not work for byte[] or ByteBuffer
+ return !(keyClass.isArray() || keyClass == ByteBuffer.class);
+ }
+
+ @Override
+ public void onEnd(
+ AttributesBuilder attributes,
+ Context context,
+ REQUEST request,
+ @Nullable Void unused,
+ @Nullable Throwable error) {}
+
+ private KafkaConsumerAttributesExtractor() {}
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaProcessRequest.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaProcessRequest.java
new file mode 100644
index 000000000..ef9149b2b
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaProcessRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka.semconv;
+
+import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaProcessRequest implements MessagingProcessRequest {
+
+ private final ConsumerRecord, ?> consumerRecord;
+
+ @Nullable private final String clientId;
+
+ @Nullable private final String consumerGroup;
+
+ public static KafkaProcessRequest of(ConsumerRecord, ?> consumerRecord) {
+ return of(consumerRecord, null, null);
+ }
+
+ public static KafkaProcessRequest of(
+ ConsumerRecord, ?> consumerRecord,
+ @Nullable String consumerGroup,
+ @Nullable String clientId) {
+ return new KafkaProcessRequest(consumerRecord, consumerGroup, clientId);
+ }
+
+ @Override
+ public String getSystem() {
+ return "kafka";
+ }
+
+ @Nullable
+ @Override
+ public String getDestination() {
+ return this.consumerRecord.topic();
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationTemplate() {
+ return null;
+ }
+
+ @Override
+ public boolean isTemporaryDestination() {
+ return false;
+ }
+
+ @Override
+ public boolean isAnonymousDestination() {
+ return false;
+ }
+
+ @Override
+ @Nullable
+ public String getConversationId() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageBodySize() {
+ long size = this.consumerRecord.serializedValueSize();
+ return size >= 0 ? size : null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageEnvelopeSize() {
+ return null;
+ }
+
+ @Override
+ @Nullable
+ public String getMessageId() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ @Nullable
+ @Override
+ public Long getBatchMessageCount() {
+ return null;
+ }
+
+ @Override
+ public List getMessageHeader(String name) {
+ return StreamSupport.stream(this.consumerRecord.headers().headers(name).spliterator(), false)
+ .map(header -> new String(header.value(), StandardCharsets.UTF_8))
+ .collect(Collectors.toList());
+ }
+
+ @Nullable
+ public String getConsumerGroup() {
+ return this.consumerGroup;
+ }
+
+ public ConsumerRecord, ?> getRecord() {
+ return consumerRecord;
+ }
+
+ private KafkaProcessRequest(
+ ConsumerRecord, ?> consumerRecord,
+ @Nullable String consumerGroup,
+ @Nullable String clientId) {
+ this.consumerRecord = consumerRecord;
+ this.consumerGroup = consumerGroup;
+ this.clientId = clientId;
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientBaseTest.java b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientBaseTest.java
new file mode 100644
index 000000000..b2c55397d
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientBaseTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import io.opentelemetry.contrib.messaging.wrappers.testing.AbstractBaseTest;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Copied from KafkaClientBaseTest.
+ */
+@SuppressWarnings("OtelInternalJavadoc")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class KafkaClientBaseTest extends AbstractBaseTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaClientBaseTest.class);
+
+ protected static final String SHARED_TOPIC = "shared.topic";
+
+ private KafkaContainer kafka;
+ protected Producer producer;
+ protected Consumer consumer;
+ private final CountDownLatch consumerReady = new CountDownLatch(1);
+
+ public static final int partition = 0;
+ public static final TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, partition);
+
+ @BeforeAll
+ void setupClass() throws ExecutionException, InterruptedException, TimeoutException {
+ kafka =
+ new KafkaContainer(DockerImageName.parse("apache/kafka:3.8.0"))
+ .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
+ .withLogConsumer(new Slf4jLogConsumer(logger))
+ .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1))
+ .withStartupTimeout(Duration.ofMinutes(1));
+ kafka.start();
+
+ // create test topic
+ HashMap adminProps = new HashMap<>();
+ adminProps.put("bootstrap.servers", kafka.getBootstrapServers());
+
+ try (AdminClient admin = AdminClient.create(adminProps)) {
+ admin
+ .createTopics(Collections.singletonList(new NewTopic(SHARED_TOPIC, 1, (short) 1)))
+ .all()
+ .get(30, TimeUnit.SECONDS);
+ }
+
+ producer = new KafkaProducer<>(producerProps());
+
+ consumer = new KafkaConsumer<>(consumerProps());
+
+ consumer.subscribe(
+ Collections.singletonList(SHARED_TOPIC),
+ new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection collection) {}
+
+ @Override
+ public void onPartitionsAssigned(Collection collection) {
+ consumerReady.countDown();
+ }
+ });
+ }
+
+ public Map consumerProps() {
+ HashMap props = new HashMap<>();
+ props.put("bootstrap.servers", kafka.getBootstrapServers());
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", 10);
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+ return props;
+ }
+
+ public Map producerProps() {
+ HashMap props = new HashMap<>();
+ props.put("bootstrap.servers", kafka.getBootstrapServers());
+ props.put("retries", 0);
+ props.put("batch.size", "16384");
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", "33554432");
+ props.put("key.serializer", IntegerSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ return props;
+ }
+
+ @AfterAll
+ void cleanupClass() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+ kafka.stop();
+ }
+
+ @SuppressWarnings("PreferJavaTimeOverload")
+ public void awaitUntilConsumerIsReady() throws InterruptedException {
+ if (consumerReady.await(0, TimeUnit.SECONDS)) {
+ return;
+ }
+ for (int i = 0; i < 60; i++) {
+ consumer.poll(Duration.ZERO);
+ if (consumerReady.await(3, TimeUnit.SECONDS)) {
+ break;
+ }
+ logger.info("Consumer has not been ready for {} time(s).", i);
+ }
+ if (consumerReady.getCount() != 0) {
+ throw new AssertionError("Consumer wasn't assigned any partitions!");
+ }
+ consumer.seekToBeginning(Collections.emptyList());
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java
new file mode 100644
index 000000000..cc61e0965
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_CONSUMER_GROUP_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_OFFSET;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.contrib.messaging.wrappers.MessagingProcessWrapper;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaConsumerAttributesExtractor;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaProcessRequest;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.assertj.core.api.AbstractAssert;
+import org.junit.jupiter.api.Test;
+
+public class KafkaClientTest extends KafkaClientBaseTest {
+
+ static final String greeting = "Hello Kafka!";
+
+ static final String clientId = "test-consumer-1";
+
+ static final String groupId = "test";
+
+ @Override
+ public Map producerProps() {
+ Map props = super.producerProps();
+ props.put(
+ ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
+ return props;
+ }
+
+ @Override
+ public Map consumerProps() {
+ Map props = super.consumerProps();
+ props.put(
+ ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ return props;
+ }
+
+ @Test
+ void testInterceptors() throws InterruptedException {
+ OpenTelemetry otel = GlobalOpenTelemetry.get();
+ Tracer tracer = otel.getTracer("test-tracer", "1.0.0");
+ MessagingProcessWrapper wrapper =
+ KafkaHelper.processWrapperBuilder()
+ .openTelemetry(otel)
+ .addAttributesExtractor(KafkaConsumerAttributesExtractor.create())
+ .build();
+
+ sendWithParent(tracer);
+
+ awaitUntilConsumerIsReady();
+
+ consumeWithChild(tracer, wrapper);
+
+ assertTraces();
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void sendWithParent(Tracer tracer) {
+ Span parent = tracer.spanBuilder("parent").startSpan();
+ try (Scope scope = parent.makeCurrent()) {
+ producer.send(
+ new ProducerRecord<>(SHARED_TOPIC, greeting),
+ (meta, ex) -> {
+ if (ex == null) {
+ tracer.spanBuilder("producer callback").startSpan().end();
+ } else {
+ tracer.spanBuilder("producer exception: " + ex).startSpan().end();
+ }
+ });
+ }
+ parent.end();
+ }
+
+ public void consumeWithChild(
+ Tracer tracer, MessagingProcessWrapper wrapper) {
+ // check that the message was received
+ ConsumerRecords, ?> records = consumer.poll(Duration.ofSeconds(5));
+ assertThat(records.count()).isEqualTo(1);
+ ConsumerRecord, ?> record = records.iterator().next();
+ assertThat(record.value()).isEqualTo(greeting);
+ assertThat(record.key()).isNull();
+
+ wrapper.doProcess(
+ KafkaProcessRequest.of(record, groupId, clientId),
+ () -> {
+ tracer.spanBuilder("process child").startSpan().end();
+ });
+ }
+
+ /**
+ * Copied from testing-common.
+ */
+ @SuppressWarnings("deprecation") // using deprecated semconv
+ public void assertTraces() {
+ waitAndAssertTraces(
+ sortByRootSpanName("parent", "producer callback"),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+ span ->
+ // No need to verify the attribute here because it is generated by
+ // instrumentation library.
+ span.hasName(SHARED_TOPIC + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0)),
+ span ->
+ span.hasName("process " + SHARED_TOPIC)
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(1))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_SYSTEM, "kafka"),
+ equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+ equalTo(
+ MESSAGING_MESSAGE_BODY_SIZE,
+ greeting.getBytes(StandardCharsets.UTF_8).length),
+ satisfies(
+ MESSAGING_DESTINATION_PARTITION_ID,
+ org.assertj.core.api.AbstractStringAssert::isNotEmpty),
+ // FIXME: We do have "messaging.client_id" in instrumentation but
+ // "messaging.client.id" in
+ // semconv library right now. It should be replaced after semconv
+ // release.
+ equalTo(
+ AttributeKey.stringKey("messaging.client_id"), "test-consumer-1"),
+ satisfies(MESSAGING_KAFKA_OFFSET, AbstractAssert::isNotNull),
+ equalTo(MESSAGING_CONSUMER_GROUP_NAME, "test"),
+ equalTo(MESSAGING_OPERATION, "process")),
+ span ->
+ span.hasName("process child")
+ .hasKind(SpanKind.INTERNAL)
+ .hasParent(trace.getSpan(2))),
+ // ideally we'd want producer callback to be part of the main trace, we just aren't able to
+ // instrument that
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/internal/AutoConfiguredDataCapture.java b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/internal/AutoConfiguredDataCapture.java
new file mode 100644
index 000000000..177cbf06b
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/internal/AutoConfiguredDataCapture.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka.internal;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.exporter.logging.LoggingSpanExporter;
+import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
+import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import java.util.List;
+
+@AutoService(AutoConfigurationCustomizerProvider.class)
+public class AutoConfiguredDataCapture implements AutoConfigurationCustomizerProvider {
+
+ private static final InMemorySpanExporter inMemorySpanExporter = InMemorySpanExporter.create();
+
+ /*
+ Returns the spans which have been exported by the autoconfigured global OpenTelemetry SDK.
+ */
+ public static List getSpans() {
+ return inMemorySpanExporter.getFinishedSpanItems();
+ }
+
+ @Override
+ public void customize(AutoConfigurationCustomizer autoConfiguration) {
+ autoConfiguration.addSpanExporterCustomizer(
+ (spanExporter, config) -> {
+ // we piggy-back onto the autoconfigured logging exporter for now,
+ // because that one uses a SimpleSpanProcessor which does not impose a batching delay
+ if (spanExporter instanceof LoggingSpanExporter) {
+ inMemorySpanExporter.reset();
+ return SpanExporter.composite(inMemorySpanExporter, spanExporter);
+ }
+ return spanExporter;
+ });
+ }
+
+ @Override
+ public int order() {
+ // There might be other autoconfigurations wrapping SpanExporters,
+ // which can result in us failing to detect it
+ // We avoid this by ensuring that we run first
+ return Integer.MIN_VALUE;
+ }
+}
diff --git a/messaging-wrappers/testing/build.gradle.kts b/messaging-wrappers/testing/build.gradle.kts
new file mode 100644
index 000000000..6c0983923
--- /dev/null
+++ b/messaging-wrappers/testing/build.gradle.kts
@@ -0,0 +1,20 @@
+plugins {
+ id("otel.java-conventions")
+}
+
+description = "OpenTelemetry Messaging Wrappers testing"
+
+dependencies {
+ annotationProcessor("com.google.auto.service:auto-service")
+ compileOnly("com.google.auto.service:auto-service-annotations")
+
+ api("org.junit.jupiter:junit-jupiter-api")
+ api("org.junit.jupiter:junit-jupiter-params")
+ api("io.opentelemetry:opentelemetry-sdk-testing")
+
+ implementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
+ implementation("io.opentelemetry:opentelemetry-sdk-trace")
+ implementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
+ implementation("io.opentelemetry:opentelemetry-exporter-logging")
+ implementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
+}
diff --git a/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/AbstractBaseTest.java b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/AbstractBaseTest.java
new file mode 100644
index 000000000..9ed951bce
--- /dev/null
+++ b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/AbstractBaseTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.testing;
+
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.waitForTraces;
+import static org.awaitility.Awaitility.await;
+
+import io.opentelemetry.contrib.messaging.wrappers.testing.internal.AutoConfiguredDataCapture;
+import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil;
+import io.opentelemetry.sdk.testing.assertj.TraceAssert;
+import io.opentelemetry.sdk.testing.assertj.TracesAssert;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.awaitility.core.ConditionTimeoutException;
+
+public abstract class AbstractBaseTest {
+
+ public static Comparator> sortByRootSpanName(String... names) {
+ return orderByRootSpanName(names);
+ }
+
+ @SafeVarargs
+ @SuppressWarnings("varargs")
+ public static void waitAndAssertTraces(
+ @Nullable Comparator> traceComparator, Consumer... assertions) {
+ List> assertionsList = new ArrayList<>(Arrays.asList(assertions));
+ try {
+ await()
+ .untilAsserted(
+ () ->
+ doAssertTraces(
+ traceComparator, AutoConfiguredDataCapture::getSpans, assertionsList));
+ } catch (Throwable t) {
+ // awaitility is doing a jmx call that is not implemented in GraalVM:
+ // call:
+ // https://github.com/awaitility/awaitility/blob/fbe16add874b4260dd240108304d5c0be84eabc8/awaitility/src/main/java/org/awaitility/core/ConditionAwaiter.java#L157
+ // see https://github.com/oracle/graal/issues/6101 (spring boot graal native image)
+ if (t.getClass().getName().equals("com.oracle.svm.core.jdk.UnsupportedFeatureError")
+ || t instanceof ConditionTimeoutException) {
+ // Don't throw this failure since the stack is the awaitility thread, causing confusion.
+ // Instead, just assert one more time on the test thread, which will fail with a better
+ // stack trace.
+ // TODO(anuraaga): There is probably a better way to do this.
+ doAssertTraces(traceComparator, AutoConfiguredDataCapture::getSpans, assertionsList);
+ } else {
+ throw t;
+ }
+ }
+ }
+
+ public static void doAssertTraces(
+ @Nullable Comparator> traceComparator,
+ Supplier> supplier,
+ List> assertionsList) {
+ try {
+ List> traces = waitForTraces(supplier, assertionsList.size());
+ TelemetryDataUtil.assertScopeVersion(traces);
+ if (traceComparator != null) {
+ traces.sort(traceComparator);
+ }
+ TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList);
+ } catch (InterruptedException | TimeoutException e) {
+ throw new AssertionError("Error waiting for " + assertionsList.size() + " traces", e);
+ }
+ }
+}
diff --git a/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/internal/AutoConfiguredDataCapture.java b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/internal/AutoConfiguredDataCapture.java
new file mode 100644
index 000000000..9d25fe83f
--- /dev/null
+++ b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/internal/AutoConfiguredDataCapture.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.testing.internal;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.exporter.logging.LoggingSpanExporter;
+import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
+import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import java.util.List;
+
+@AutoService(AutoConfigurationCustomizerProvider.class)
+public class AutoConfiguredDataCapture implements AutoConfigurationCustomizerProvider {
+
+ private static final InMemorySpanExporter inMemorySpanExporter = InMemorySpanExporter.create();
+
+ /*
+ Returns the spans which have been exported by the autoconfigured global OpenTelemetry SDK.
+ */
+ public static List getSpans() {
+ return inMemorySpanExporter.getFinishedSpanItems();
+ }
+
+ @Override
+ public void customize(AutoConfigurationCustomizer autoConfiguration) {
+ autoConfiguration.addSpanExporterCustomizer(
+ (spanExporter, config) -> {
+ // we piggy-back onto the autoconfigured logging exporter for now,
+ // because that one uses a SimpleSpanProcessor which does not impose a batching delay
+ if (spanExporter instanceof LoggingSpanExporter) {
+ inMemorySpanExporter.reset();
+ return SpanExporter.composite(inMemorySpanExporter, spanExporter);
+ }
+ return spanExporter;
+ });
+ }
+
+ @Override
+ public int order() {
+ // There might be other autoconfigurations wrapping SpanExporters,
+ // which can result in us failing to detect it
+ // We avoid this by ensuring that we run first
+ return Integer.MIN_VALUE;
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index ef819d82a..db03bb591 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -53,6 +53,10 @@ include(":jmx-scraper")
include(":jmx-scraper:test-app")
include(":jmx-scraper:test-webapp")
include(":maven-extension")
+include(":messaging-wrappers:aliyun-mns-sdk")
+include(":messaging-wrappers:api")
+include(":messaging-wrappers:kafka-clients")
+include(":messaging-wrappers:testing")
include(":micrometer-meter-provider")
include(":noop-api")
include(":processors")