keys(REQUEST request) {
+ return Collections.emptyList();
+ }
+
+ @Nullable
+ @Override
+ public String get(@Nullable REQUEST request, String s) {
+ return null;
+ }
+
+ private NoopTextMapGetter() {}
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingRunnable.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingRunnable.java
new file mode 100644
index 000000000..e1bbd05b0
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingRunnable.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+/**
+ * A utility interface representing a {@link Runnable} that may throw.
+ *
+ * Inspired from ThrowingRunnable.
+ *
+ * @param Thrown exception type.
+ */
+@FunctionalInterface
+public interface ThrowingRunnable {
+ void run() throws E;
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingSupplier.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingSupplier.java
new file mode 100644
index 000000000..9bec00d4b
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/ThrowingSupplier.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+import java.util.function.Supplier;
+
+/**
+ * A utility interface representing a {@link Supplier} that may throw.
+ *
+ * Inspired from ThrowingSupplier.
+ *
+ * @param Thrown exception type.
+ */
+@FunctionalInterface
+public interface ThrowingSupplier {
+ T get() throws E;
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
new file mode 100644
index 000000000..502b96ca6
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
@@ -0,0 +1,2 @@
+/** OpenTelemetry messaging wrappers extension. */
+package io.opentelemetry.contrib.messaging.wrappers;
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessageTextMapGetter.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessageTextMapGetter.java
new file mode 100644
index 000000000..5643a87d2
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessageTextMapGetter.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.semconv;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+import java.util.List;
+import javax.annotation.Nullable;
+
+public enum DefaultMessageTextMapGetter implements TextMapGetter {
+ INSTANCE;
+
+ @Override
+ public Iterable keys(MessagingProcessRequest carrier) {
+ return carrier.getAllMessageHeadersKey();
+ }
+
+ @Nullable
+ @Override
+ public String get(@Nullable MessagingProcessRequest carrier, String key) {
+ if (carrier != null) {
+ List messageHeader = carrier.getMessageHeader(key);
+ if (messageHeader != null && !messageHeader.isEmpty()) {
+ return messageHeader.get(0);
+ }
+ }
+ return null;
+ }
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessagingAttributesGetter.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessagingAttributesGetter.java
new file mode 100644
index 000000000..67f0f543e
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/DefaultMessagingAttributesGetter.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.semconv;
+
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
+import java.util.List;
+import javax.annotation.Nullable;
+
+public enum DefaultMessagingAttributesGetter
+ implements MessagingAttributesGetter {
+ INSTANCE;
+
+ @Nullable
+ @Override
+ public String getDestinationPartitionId(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getDestinationPartitionId();
+ }
+
+ @Override
+ public List getMessageHeader(
+ MessagingProcessRequest messagingProcessRequest, String name) {
+ return messagingProcessRequest.getMessageHeader(name);
+ }
+
+ @Nullable
+ @Override
+ public String getSystem(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getSystem();
+ }
+
+ @Nullable
+ @Override
+ public String getDestination(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getDestination();
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationTemplate(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getDestinationTemplate();
+ }
+
+ @Override
+ public boolean isTemporaryDestination(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.isTemporaryDestination();
+ }
+
+ @Override
+ public boolean isAnonymousDestination(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.isAnonymousDestination();
+ }
+
+ @Nullable
+ @Override
+ public String getConversationId(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getConversationId();
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageBodySize(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getMessageBodySize();
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageEnvelopeSize(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getMessageEnvelopeSize();
+ }
+
+ @Nullable
+ @Override
+ public String getMessageId(
+ MessagingProcessRequest messagingProcessRequest, @Nullable Void unused) {
+ return messagingProcessRequest.getMessageId();
+ }
+
+ @Nullable
+ @Override
+ public String getClientId(MessagingProcessRequest messagingProcessRequest) {
+ return messagingProcessRequest.getClientId();
+ }
+
+ @Nullable
+ @Override
+ public Long getBatchMessageCount(
+ MessagingProcessRequest messagingProcessRequest, @Nullable Void unused) {
+ return messagingProcessRequest.getBatchMessageCount();
+ }
+}
diff --git a/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/MessagingProcessRequest.java b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/MessagingProcessRequest.java
new file mode 100644
index 000000000..119b95895
--- /dev/null
+++ b/messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/MessagingProcessRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.semconv;
+
+import static java.util.Collections.emptyList;
+
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * An interface to expose messaging properties for the pre-defined process wrapper.
+ *
+ * Inspired from MessagingAttributesGetter.
+ */
+public interface MessagingProcessRequest {
+
+ String getSystem();
+
+ @Nullable
+ String getDestination();
+
+ @Nullable
+ String getDestinationTemplate();
+
+ boolean isTemporaryDestination();
+
+ boolean isAnonymousDestination();
+
+ @Nullable
+ String getConversationId();
+
+ @Nullable
+ Long getMessageBodySize();
+
+ @Nullable
+ Long getMessageEnvelopeSize();
+
+ @Nullable
+ String getMessageId();
+
+ @Nullable
+ default String getClientId() {
+ return null;
+ }
+
+ @Nullable
+ default Long getBatchMessageCount() {
+ return null;
+ }
+
+ @Nullable
+ default String getDestinationPartitionId() {
+ return null;
+ }
+
+ /**
+ * Extracts all values of header named {@code name} from the request, or an empty list if there
+ * were none.
+ *
+ *
Implementations of this method must not return a null value; an empty list should be
+ * returned instead.
+ */
+ default List getMessageHeader(String name) {
+ return emptyList();
+ }
+
+ /**
+ * Extracts all keys of headers from the request, or an empty list/set if there were none.
+ *
+ * Implementations of this method must not return a null value; an empty list should be
+ * returned instead.
+ */
+ default Collection getAllMessageHeadersKey() {
+ return emptyList();
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/TestConstants.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/TestConstants.java
new file mode 100644
index 000000000..a826c2393
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/TestConstants.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+public final class TestConstants {
+
+ public static final String MESSAGE_ID = "42";
+
+ public static final String MESSAGE_BODY = "Hello messaging wrapper!";
+
+ public static final String EVENTBUS_NAME = "test-eb";
+
+ public static final String CLIENT_ID = "eventbus-client-0";
+
+ private TestConstants() {}
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/UserDefinedMessageSystemTest.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/UserDefinedMessageSystemTest.java
new file mode 100644
index 000000000..9cbbdf38a
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/UserDefinedMessageSystemTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers;
+
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.CLIENT_ID;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.EVENTBUS_NAME;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.MESSAGE_BODY;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.MESSAGE_ID;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
+
+import com.google.common.eventbus.EventBus;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.contrib.messaging.wrappers.model.Message;
+import io.opentelemetry.contrib.messaging.wrappers.model.MessageListener;
+import io.opentelemetry.contrib.messaging.wrappers.model.MessageTextMapSetter;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.DefaultMessageTextMapGetter;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.DefaultMessagingAttributesGetter;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
+import io.opentelemetry.contrib.messaging.wrappers.testing.AbstractBaseTest;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import org.assertj.core.api.AbstractAssert;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@SuppressWarnings("OtelInternalJavadoc")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class UserDefinedMessageSystemTest extends AbstractBaseTest {
+
+ private OpenTelemetry otel;
+
+ private Tracer tracer;
+
+ private EventBus eventBus;
+
+ @BeforeAll
+ void setupClass() {
+ otel = GlobalOpenTelemetry.get();
+ tracer = otel.getTracer("test-tracer", "1.0.0");
+ MessagingProcessWrapper wrapper =
+ MessagingProcessWrapper.defaultBuilder()
+ .openTelemetry(otel)
+ .textMapGetter(DefaultMessageTextMapGetter.INSTANCE)
+ .spanNameExtractor(
+ MessagingSpanNameExtractor.create(
+ DefaultMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS))
+ .attributesExtractors(
+ Collections.singletonList(
+ MessagingAttributesExtractor.create(
+ DefaultMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS)))
+ .build();
+
+ eventBus = new EventBus();
+
+ eventBus.register(MessageListener.create(tracer, wrapper));
+ }
+
+ @Test
+ void testSendAndConsume() {
+ sendWithParent(tracer);
+
+ assertTraces();
+ }
+
+ public void sendWithParent(Tracer tracer) {
+ // mock a send span
+ Span parent =
+ tracer.spanBuilder("publish " + EVENTBUS_NAME).setSpanKind(SpanKind.PRODUCER).startSpan();
+
+ try (Scope scope = parent.makeCurrent()) {
+ Message message = Message.create(new HashMap<>(), MESSAGE_ID, MESSAGE_BODY);
+ otel.getPropagators()
+ .getTextMapPropagator()
+ .inject(Context.current(), message, MessageTextMapSetter.create());
+ eventBus.post(message);
+ }
+
+ parent.end();
+ }
+
+ /**
+ * Copied from testing-common.
+ */
+ @SuppressWarnings("deprecation") // using deprecated semconv
+ public void assertTraces() {
+ waitAndAssertTraces(
+ sortByRootSpanName("parent", "producer callback"),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ // No need to verify the attribute here because it is generated by
+ // instrumentation library.
+ span.hasName("publish " + EVENTBUS_NAME)
+ .hasKind(SpanKind.PRODUCER)
+ .hasNoParent(),
+ span ->
+ span.hasName(EVENTBUS_NAME + " process")
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(0))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_SYSTEM, "guava-eventbus"),
+ equalTo(MESSAGING_DESTINATION_NAME, EVENTBUS_NAME),
+ equalTo(
+ MESSAGING_MESSAGE_BODY_SIZE,
+ MESSAGE_BODY.getBytes(StandardCharsets.UTF_8).length),
+ // FIXME: We do have "messaging.client_id" in instrumentation but
+ // "messaging.client.id" in
+ // semconv library right now. It should be replaced after semconv
+ // release.
+ equalTo(AttributeKey.stringKey("messaging.client_id"), CLIENT_ID),
+ equalTo(MESSAGING_OPERATION, "process"),
+ satisfies(MESSAGING_MESSAGE_ID, AbstractAssert::isNotNull)),
+ span ->
+ span.hasName("process child")
+ .hasKind(SpanKind.INTERNAL)
+ .hasParent(trace.getSpan(1))));
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageRequest.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageRequest.java
new file mode 100644
index 000000000..8c42a852f
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/impl/MessageRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.impl;
+
+import io.opentelemetry.contrib.messaging.wrappers.model.Message;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+
+public class MessageRequest implements MessagingProcessRequest {
+
+ private final Message message;
+
+ @Nullable private final String clientId;
+
+ @Nullable private final String eventBusName;
+
+ public static MessageRequest of(Message message) {
+ return of(message, null, null);
+ }
+
+ public static MessageRequest of(
+ Message message, @Nullable String clientId, @Nullable String eventBusName) {
+ return new MessageRequest(message, clientId, eventBusName);
+ }
+
+ @Override
+ public String getSystem() {
+ return "guava-eventbus";
+ }
+
+ @Nullable
+ @Override
+ public String getDestination() {
+ return eventBusName;
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationTemplate() {
+ return null;
+ }
+
+ @Override
+ public boolean isTemporaryDestination() {
+ return false;
+ }
+
+ @Override
+ public boolean isAnonymousDestination() {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public String getConversationId() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageBodySize() {
+ return (long) message.getBody().getBytes(StandardCharsets.UTF_8).length;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageEnvelopeSize() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getMessageId() {
+ return message.getId();
+ }
+
+ @Nullable
+ @Override
+ public String getClientId() {
+ return clientId;
+ }
+
+ @Override
+ public List getMessageHeader(String name) {
+ return Collections.singletonList(message.getHeaders().get(name));
+ }
+
+ @Override
+ public Collection getAllMessageHeadersKey() {
+ return message.getHeaders().keySet();
+ }
+
+ private MessageRequest(
+ Message message, @Nullable String clientId, @Nullable String eventBusName) {
+ this.message = message;
+ this.clientId = clientId;
+ this.eventBusName = eventBusName;
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/Message.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/Message.java
new file mode 100644
index 000000000..e606e214e
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/Message.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.model;
+
+import java.util.Map;
+
+public class Message {
+
+ private Map headers;
+
+ private String id;
+
+ private String body;
+
+ public static Message create(Map headers, String id, String body) {
+ return new Message(headers, id, body);
+ }
+
+ private Message(Map headers, String id, String body) {
+ this.headers = headers;
+ this.id = id;
+ this.body = body;
+ }
+
+ public Map getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map headers) {
+ this.headers = headers;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageListener.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageListener.java
new file mode 100644
index 000000000..fccd5c157
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageListener.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.model;
+
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.CLIENT_ID;
+import static io.opentelemetry.contrib.messaging.wrappers.TestConstants.EVENTBUS_NAME;
+
+import com.google.common.eventbus.Subscribe;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.contrib.messaging.wrappers.MessagingProcessWrapper;
+import io.opentelemetry.contrib.messaging.wrappers.impl.MessageRequest;
+import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(MessageListener.class);
+
+ private final Tracer tracer;
+
+ private final MessagingProcessWrapper wrapper;
+
+ public static MessageListener create(
+ Tracer tracer, MessagingProcessWrapper wrapper) {
+ return new MessageListener(tracer, wrapper);
+ }
+
+ @Subscribe
+ public void handleEvent(Message event) {
+ wrapper.doProcess(
+ MessageRequest.of(event, CLIENT_ID, EVENTBUS_NAME),
+ () -> {
+ Span span = tracer.spanBuilder("process child").startSpan();
+ logger.info("Received event from <" + EVENTBUS_NAME + ">: " + event.getId());
+ span.end();
+ });
+ }
+
+ private MessageListener(Tracer tracer, MessagingProcessWrapper wrapper) {
+ this.tracer = tracer;
+ this.wrapper = wrapper;
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageTextMapSetter.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageTextMapSetter.java
new file mode 100644
index 000000000..77e5a56f4
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/model/MessageTextMapSetter.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.model;
+
+import io.opentelemetry.context.propagation.TextMapSetter;
+import javax.annotation.Nullable;
+
+public class MessageTextMapSetter implements TextMapSetter {
+
+ public static TextMapSetter create() {
+ return new MessageTextMapSetter();
+ }
+
+ @Override
+ public void set(@Nullable Message carrier, String key, String value) {
+ if (carrier == null) {
+ return;
+ }
+ carrier.getHeaders().put(key, value);
+ }
+}
diff --git a/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
new file mode 100644
index 000000000..502b96ca6
--- /dev/null
+++ b/messaging-wrappers/api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/package-info.java
@@ -0,0 +1,2 @@
+/** OpenTelemetry messaging wrappers extension. */
+package io.opentelemetry.contrib.messaging.wrappers;
diff --git a/messaging-wrappers/kafka-clients/build.gradle.kts b/messaging-wrappers/kafka-clients/build.gradle.kts
new file mode 100644
index 000000000..ec07abd94
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/build.gradle.kts
@@ -0,0 +1,46 @@
+plugins {
+ id("otel.java-conventions")
+
+ id("otel.publish-conventions")
+}
+
+description = "OpenTelemetry Messaging Wrappers - kafka-clients implementation"
+otelJava.moduleName.set("io.opentelemetry.contrib.messaging.wrappers.kafka")
+
+dependencies {
+ api(project(":messaging-wrappers:api"))
+
+ // FIXME: We shouldn't depend on the library "opentelemetry-kafka-clients-common" directly because the api in this
+ // package could be mutable, unless the components were maintained in "opentelemetry-java-instrumentation" project.
+ // implementation("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common:2.13.3-alpha")
+
+ compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")
+
+ testImplementation("org.apache.kafka:kafka-clients:0.11.0.0")
+ testImplementation("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6")
+ testImplementation(project(":messaging-wrappers:testing"))
+
+ testAnnotationProcessor("com.google.auto.service:auto-service")
+ testCompileOnly("com.google.auto.service:auto-service-annotations")
+ testImplementation("org.testcontainers:kafka")
+ testImplementation("org.testcontainers:junit-jupiter")
+}
+
+tasks {
+ withType().configureEach {
+ jvmArgs("-Dotel.java.global-autoconfigure.enabled=true")
+ // TODO: According to https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#message-creation-context-as-parent-of-process-span,
+ // process span should be the child of receive span. However, we couldn't access the trace context with receive span
+ // in wrappers, unless we add a generic accessor for that.
+ jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
+ jvmArgs("-Dotel.traces.exporter=logging")
+ jvmArgs("-Dotel.metrics.exporter=logging")
+ jvmArgs("-Dotel.logs.exporter=logging")
+ }
+}
+
+configurations.all {
+ resolutionStrategy {
+ force("org.apache.kafka:kafka-clients:0.11.0.0")
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/gradle.properties b/messaging-wrappers/kafka-clients/gradle.properties
new file mode 100644
index 000000000..a0402e1e2
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/gradle.properties
@@ -0,0 +1,2 @@
+# TODO: uncomment when ready to mark as stable
+# otel.stable=true
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaHelper.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaHelper.java
new file mode 100644
index 000000000..10896cae3
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaHelper.java
@@ -0,0 +1,15 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+public final class KafkaHelper {
+
+ public static KafkaProcessWrapperBuilder processWrapperBuilder() {
+ return new KafkaProcessWrapperBuilder();
+ }
+
+ private KafkaHelper() {}
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaProcessWrapperBuilder.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaProcessWrapperBuilder.java
new file mode 100644
index 000000000..52735c180
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaProcessWrapperBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import io.opentelemetry.contrib.messaging.wrappers.MessagingProcessWrapperBuilder;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaConsumerAttributesExtractor;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaConsumerAttributesGetter;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaProcessRequest;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
+import java.util.ArrayList;
+
+public class KafkaProcessWrapperBuilder
+ extends MessagingProcessWrapperBuilder {
+
+ KafkaProcessWrapperBuilder() {
+ super();
+ super.textMapGetter = KafkaTextMapGetter.create();
+ super.spanNameExtractor =
+ MessagingSpanNameExtractor.create(
+ KafkaConsumerAttributesGetter.INSTANCE, MessageOperation.PROCESS);
+ super.attributesExtractors = new ArrayList<>();
+ super.attributesExtractors.add(
+ MessagingAttributesExtractor.create(
+ KafkaConsumerAttributesGetter.INSTANCE, MessageOperation.PROCESS));
+ super.attributesExtractors.add(KafkaConsumerAttributesExtractor.INSTANCE);
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaTextMapGetter.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaTextMapGetter.java
new file mode 100644
index 000000000..820b7bfca
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaTextMapGetter.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaProcessRequest;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.kafka.common.header.Header;
+
+/**
+ * Copied from KafkaConsumerRecordGetter.
+ */
+public class KafkaTextMapGetter implements TextMapGetter {
+
+ public static TextMapGetter create() {
+ return new KafkaTextMapGetter();
+ }
+
+ @Override
+ public Iterable keys(@Nullable KafkaProcessRequest carrier) {
+ if (carrier == null || carrier.getRecord() == null) {
+ return Collections.emptyList();
+ }
+ return StreamSupport.stream(carrier.getRecord().headers().spliterator(), false)
+ .map(Header::key)
+ .collect(Collectors.toList());
+ }
+
+ @Nullable
+ @Override
+ public String get(@Nullable KafkaProcessRequest carrier, String key) {
+ if (carrier == null || carrier.getRecord() == null) {
+ return null;
+ }
+ Header header = carrier.getRecord().headers().lastHeader(key);
+ if (header == null) {
+ return null;
+ }
+ byte[] value = header.value();
+ if (value == null) {
+ return null;
+ }
+ return new String(value, StandardCharsets.UTF_8);
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/package-info.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/package-info.java
new file mode 100644
index 000000000..7e988a5b6
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/package-info.java
@@ -0,0 +1,2 @@
+/** OpenTelemetry messaging wrappers extension - kafka implementation. */
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesExtractor.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesExtractor.java
new file mode 100644
index 000000000..f037427c1
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesExtractor.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka.semconv;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Copied from KafkaConsumerAttributesExtractor.
+ */
+public enum KafkaConsumerAttributesExtractor
+ implements AttributesExtractor {
+ INSTANCE;
+
+ // copied from MessagingIncubatingAttributes
+ private static final AttributeKey MESSAGING_DESTINATION_PARTITION_ID =
+ AttributeKey.stringKey("messaging.destination.partition.id");
+ private static final AttributeKey MESSAGING_KAFKA_CONSUMER_GROUP =
+ AttributeKey.stringKey("messaging.kafka.consumer.group");
+ private static final AttributeKey MESSAGING_KAFKA_MESSAGE_KEY =
+ AttributeKey.stringKey("messaging.kafka.message.key");
+ private static final AttributeKey MESSAGING_KAFKA_MESSAGE_OFFSET =
+ AttributeKey.longKey("messaging.kafka.message.offset");
+ private static final AttributeKey MESSAGING_KAFKA_MESSAGE_TOMBSTONE =
+ AttributeKey.booleanKey("messaging.kafka.message.tombstone");
+
+ @Override
+ public void onStart(
+ AttributesBuilder attributes, Context parentContext, KafkaProcessRequest request) {
+
+ ConsumerRecord, ?> record = request.getRecord();
+
+ attributes.put(MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(record.partition()));
+ attributes.put(MESSAGING_KAFKA_MESSAGE_OFFSET, record.offset());
+
+ Object key = record.key();
+ if (key != null && canSerialize(key.getClass())) {
+ attributes.put(MESSAGING_KAFKA_MESSAGE_KEY, key.toString());
+ }
+ if (record.value() == null) {
+ attributes.put(MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true);
+ }
+
+ String consumerGroup = request.getConsumerGroup();
+ if (consumerGroup != null) {
+ attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, consumerGroup);
+ }
+ }
+
+ private static boolean canSerialize(Class> keyClass) {
+ // we make a simple assumption here that we can serialize keys by simply calling toString()
+ // and that does not work for byte[] or ByteBuffer
+ return !(keyClass.isArray() || keyClass == ByteBuffer.class);
+ }
+
+ @Override
+ public void onEnd(
+ AttributesBuilder attributes,
+ Context context,
+ KafkaProcessRequest request,
+ @Nullable Void unused,
+ @Nullable Throwable error) {}
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesGetter.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesGetter.java
new file mode 100644
index 000000000..5797c7cd1
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesGetter.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka.semconv;
+
+import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+
+public enum KafkaConsumerAttributesGetter
+ implements MessagingAttributesGetter {
+ INSTANCE;
+
+ @Override
+ public String getSystem(KafkaProcessRequest request) {
+ return "kafka";
+ }
+
+ @Override
+ public String getDestination(KafkaProcessRequest request) {
+ return request.getRecord().topic();
+ }
+
+ @Nullable
+ @Override
+ public String getDestinationTemplate(KafkaProcessRequest request) {
+ return null;
+ }
+
+ @Override
+ public boolean isTemporaryDestination(KafkaProcessRequest request) {
+ return false;
+ }
+
+ @Override
+ public boolean isAnonymousDestination(KafkaProcessRequest request) {
+ return false;
+ }
+
+ @Override
+ @Nullable
+ public String getConversationId(KafkaProcessRequest request) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageBodySize(KafkaProcessRequest request) {
+ long size = request.getRecord().serializedValueSize();
+ return size >= 0 ? size : null;
+ }
+
+ @Nullable
+ @Override
+ public Long getMessageEnvelopeSize(KafkaProcessRequest request) {
+ return null;
+ }
+
+ @Override
+ @Nullable
+ public String getMessageId(KafkaProcessRequest request, @Nullable Void unused) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getClientId(KafkaProcessRequest request) {
+ return request.getClientId();
+ }
+
+ @Nullable
+ @Override
+ public Long getBatchMessageCount(KafkaProcessRequest request, @Nullable Void unused) {
+ return null;
+ }
+
+ @Override
+ public List getMessageHeader(KafkaProcessRequest request, String name) {
+ return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false)
+ .map(header -> new String(header.value(), StandardCharsets.UTF_8))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaProcessRequest.java b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaProcessRequest.java
new file mode 100644
index 000000000..5990a8aee
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaProcessRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka.semconv;
+
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaProcessRequest {
+
+ private final ConsumerRecord, ?> consumerRecord;
+
+ @Nullable private final String clientId;
+
+ @Nullable private final String consumerGroup;
+
+ public static KafkaProcessRequest of(ConsumerRecord, ?> consumerRecord) {
+ return of(consumerRecord, null, null);
+ }
+
+ public static KafkaProcessRequest of(
+ ConsumerRecord, ?> consumerRecord,
+ @Nullable String consumerGroup,
+ @Nullable String clientId) {
+ return new KafkaProcessRequest(consumerRecord, consumerGroup, clientId);
+ }
+
+ public ConsumerRecord, ?> getRecord() {
+ return consumerRecord;
+ }
+
+ @Nullable
+ public String getConsumerGroup() {
+ return this.consumerGroup;
+ }
+
+ @Nullable
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ private KafkaProcessRequest(
+ ConsumerRecord, ?> consumerRecord,
+ @Nullable String consumerGroup,
+ @Nullable String clientId) {
+ this.consumerRecord = consumerRecord;
+ this.consumerGroup = consumerGroup;
+ this.clientId = clientId;
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientBaseTest.java b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientBaseTest.java
new file mode 100644
index 000000000..21138dc14
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientBaseTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import io.opentelemetry.contrib.messaging.wrappers.testing.AbstractBaseTest;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Copied from KafkaClientBaseTest.
+ */
+@SuppressWarnings("OtelInternalJavadoc")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class KafkaClientBaseTest extends AbstractBaseTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaClientBaseTest.class);
+
+ protected static final String SHARED_TOPIC = "shared.topic";
+
+ private KafkaContainer kafka;
+ protected Producer producer;
+ protected Consumer consumer;
+ private final CountDownLatch consumerReady = new CountDownLatch(1);
+
+ public static final int partition = 0;
+ public static final TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, partition);
+
+ @BeforeAll
+ void setupClass() throws ExecutionException, InterruptedException, TimeoutException {
+ kafka =
+ new KafkaContainer(DockerImageName.parse("apache/kafka:3.8.0"))
+ .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
+ .withLogConsumer(new Slf4jLogConsumer(logger))
+ .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1))
+ .withStartupTimeout(Duration.ofMinutes(1));
+ kafka.start();
+
+ // create test topic
+ HashMap adminProps = new HashMap<>();
+ adminProps.put("bootstrap.servers", kafka.getBootstrapServers());
+
+ try (AdminClient admin = AdminClient.create(adminProps)) {
+ admin
+ .createTopics(Collections.singletonList(new NewTopic(SHARED_TOPIC, 1, (short) 1)))
+ .all()
+ .get(30, TimeUnit.SECONDS);
+ }
+
+ producer = new KafkaProducer<>(producerProps());
+
+ consumer = new KafkaConsumer<>(consumerProps());
+
+ consumer.subscribe(
+ Collections.singletonList(SHARED_TOPIC),
+ new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection collection) {}
+
+ @Override
+ public void onPartitionsAssigned(Collection collection) {
+ consumerReady.countDown();
+ }
+ });
+ }
+
+ public Map consumerProps() {
+ HashMap props = new HashMap<>();
+ props.put("bootstrap.servers", kafka.getBootstrapServers());
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", 10);
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+ return props;
+ }
+
+ public Map producerProps() {
+ HashMap props = new HashMap<>();
+ props.put("bootstrap.servers", kafka.getBootstrapServers());
+ props.put("retries", 0);
+ props.put("batch.size", "16384");
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", "33554432");
+ props.put("key.serializer", IntegerSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ return props;
+ }
+
+ @AfterAll
+ void cleanupClass() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+ kafka.stop();
+ }
+
+ @SuppressWarnings("PreferJavaTimeOverload")
+ public void awaitUntilConsumerIsReady() throws InterruptedException {
+ if (consumerReady.await(0, TimeUnit.SECONDS)) {
+ return;
+ }
+ for (int i = 0; i < 60; i++) {
+ consumer.poll(0L);
+ if (consumerReady.await(3, TimeUnit.SECONDS)) {
+ break;
+ }
+ logger.info("Consumer has not been ready for {} time(s).", i);
+ }
+ if (consumerReady.getCount() != 0) {
+ throw new AssertionError("Consumer wasn't assigned any partitions!");
+ }
+ consumer.seekToBeginning(Collections.emptyList());
+ }
+}
diff --git a/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java
new file mode 100644
index 000000000..329663a07
--- /dev/null
+++ b/messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.kafka;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
+import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.contrib.messaging.wrappers.MessagingProcessWrapper;
+import io.opentelemetry.contrib.messaging.wrappers.kafka.semconv.KafkaProcessRequest;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.assertj.core.api.AbstractAssert;
+import org.junit.jupiter.api.Test;
+
+public class KafkaClientTest extends KafkaClientBaseTest {
+
+ static final String greeting = "Hello Kafka!";
+
+ static final String clientId = "test-consumer-1";
+
+ static final String groupId = "test";
+
+ @Override
+ public Map producerProps() {
+ Map props = super.producerProps();
+ props.put(
+ ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
+ return props;
+ }
+
+ @Override
+ public Map consumerProps() {
+ Map props = super.consumerProps();
+ props.put(
+ ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ return props;
+ }
+
+ @Test
+ void testInterceptors() throws InterruptedException {
+ OpenTelemetry otel = GlobalOpenTelemetry.get();
+ Tracer tracer = otel.getTracer("test-tracer", "1.0.0");
+ MessagingProcessWrapper wrapper =
+ KafkaHelper.processWrapperBuilder().openTelemetry(otel).build();
+
+ sendWithParent(tracer);
+
+ awaitUntilConsumerIsReady();
+
+ consumeWithChild(tracer, wrapper);
+
+ assertTraces();
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void sendWithParent(Tracer tracer) {
+ Span parent = tracer.spanBuilder("parent").startSpan();
+ try (Scope scope = parent.makeCurrent()) {
+ producer.send(
+ new ProducerRecord<>(SHARED_TOPIC, greeting),
+ (meta, ex) -> {
+ if (ex == null) {
+ tracer.spanBuilder("producer callback").startSpan().end();
+ } else {
+ tracer.spanBuilder("producer exception: " + ex).startSpan().end();
+ }
+ });
+ }
+ parent.end();
+ }
+
+ public void consumeWithChild(
+ Tracer tracer, MessagingProcessWrapper wrapper) {
+ // check that the message was received
+ ConsumerRecords, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
+ assertThat(records.count()).isEqualTo(1);
+ ConsumerRecord, ?> record = records.iterator().next();
+ assertThat(record.value()).isEqualTo(greeting);
+ assertThat(record.key()).isNull();
+
+ wrapper.doProcess(
+ KafkaProcessRequest.of(record, groupId, clientId),
+ () -> {
+ tracer.spanBuilder("process child").startSpan().end();
+ });
+ }
+
+ /**
+ * Copied from testing-common.
+ */
+ @SuppressWarnings("deprecation") // using deprecated semconv
+ public void assertTraces() {
+ waitAndAssertTraces(
+ sortByRootSpanName("parent", "producer callback"),
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+ span ->
+ // No need to verify the attribute here because it is generated by
+ // instrumentation library.
+ span.hasName(SHARED_TOPIC + " publish")
+ .hasKind(SpanKind.PRODUCER)
+ .hasParent(trace.getSpan(0)),
+ span ->
+ span.hasName(SHARED_TOPIC + " process")
+ .hasKind(SpanKind.CONSUMER)
+ .hasParent(trace.getSpan(1))
+ .hasAttributesSatisfyingExactly(
+ equalTo(MESSAGING_SYSTEM, "kafka"),
+ equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
+ equalTo(
+ MESSAGING_MESSAGE_BODY_SIZE,
+ greeting.getBytes(StandardCharsets.UTF_8).length),
+ satisfies(
+ MESSAGING_DESTINATION_PARTITION_ID,
+ org.assertj.core.api.AbstractStringAssert::isNotEmpty),
+ // FIXME: We do have "messaging.client_id" in instrumentation but
+ // "messaging.client.id" in
+ // semconv library right now. It should be replaced after semconv
+ // release.
+ equalTo(
+ AttributeKey.stringKey("messaging.client_id"), "test-consumer-1"),
+ satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractAssert::isNotNull),
+ equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
+ equalTo(MESSAGING_OPERATION, "process")),
+ span ->
+ span.hasName("process child")
+ .hasKind(SpanKind.INTERNAL)
+ .hasParent(trace.getSpan(2))),
+ // ideally we'd want producer callback to be part of the main trace, we just aren't able to
+ // instrument that
+ trace ->
+ trace.hasSpansSatisfyingExactly(
+ span ->
+ span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
+ }
+}
diff --git a/messaging-wrappers/testing/build.gradle.kts b/messaging-wrappers/testing/build.gradle.kts
new file mode 100644
index 000000000..6c0983923
--- /dev/null
+++ b/messaging-wrappers/testing/build.gradle.kts
@@ -0,0 +1,20 @@
+plugins {
+ id("otel.java-conventions")
+}
+
+description = "OpenTelemetry Messaging Wrappers testing"
+
+dependencies {
+ annotationProcessor("com.google.auto.service:auto-service")
+ compileOnly("com.google.auto.service:auto-service-annotations")
+
+ api("org.junit.jupiter:junit-jupiter-api")
+ api("org.junit.jupiter:junit-jupiter-params")
+ api("io.opentelemetry:opentelemetry-sdk-testing")
+
+ implementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
+ implementation("io.opentelemetry:opentelemetry-sdk-trace")
+ implementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
+ implementation("io.opentelemetry:opentelemetry-exporter-logging")
+ implementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
+}
diff --git a/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/AbstractBaseTest.java b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/AbstractBaseTest.java
new file mode 100644
index 000000000..9ed951bce
--- /dev/null
+++ b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/AbstractBaseTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.testing;
+
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
+import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.waitForTraces;
+import static org.awaitility.Awaitility.await;
+
+import io.opentelemetry.contrib.messaging.wrappers.testing.internal.AutoConfiguredDataCapture;
+import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil;
+import io.opentelemetry.sdk.testing.assertj.TraceAssert;
+import io.opentelemetry.sdk.testing.assertj.TracesAssert;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.awaitility.core.ConditionTimeoutException;
+
+public abstract class AbstractBaseTest {
+
+ public static Comparator> sortByRootSpanName(String... names) {
+ return orderByRootSpanName(names);
+ }
+
+ @SafeVarargs
+ @SuppressWarnings("varargs")
+ public static void waitAndAssertTraces(
+ @Nullable Comparator> traceComparator, Consumer... assertions) {
+ List> assertionsList = new ArrayList<>(Arrays.asList(assertions));
+ try {
+ await()
+ .untilAsserted(
+ () ->
+ doAssertTraces(
+ traceComparator, AutoConfiguredDataCapture::getSpans, assertionsList));
+ } catch (Throwable t) {
+ // awaitility is doing a jmx call that is not implemented in GraalVM:
+ // call:
+ // https://github.com/awaitility/awaitility/blob/fbe16add874b4260dd240108304d5c0be84eabc8/awaitility/src/main/java/org/awaitility/core/ConditionAwaiter.java#L157
+ // see https://github.com/oracle/graal/issues/6101 (spring boot graal native image)
+ if (t.getClass().getName().equals("com.oracle.svm.core.jdk.UnsupportedFeatureError")
+ || t instanceof ConditionTimeoutException) {
+ // Don't throw this failure since the stack is the awaitility thread, causing confusion.
+ // Instead, just assert one more time on the test thread, which will fail with a better
+ // stack trace.
+ // TODO(anuraaga): There is probably a better way to do this.
+ doAssertTraces(traceComparator, AutoConfiguredDataCapture::getSpans, assertionsList);
+ } else {
+ throw t;
+ }
+ }
+ }
+
+ public static void doAssertTraces(
+ @Nullable Comparator> traceComparator,
+ Supplier> supplier,
+ List> assertionsList) {
+ try {
+ List> traces = waitForTraces(supplier, assertionsList.size());
+ TelemetryDataUtil.assertScopeVersion(traces);
+ if (traceComparator != null) {
+ traces.sort(traceComparator);
+ }
+ TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList);
+ } catch (InterruptedException | TimeoutException e) {
+ throw new AssertionError("Error waiting for " + assertionsList.size() + " traces", e);
+ }
+ }
+}
diff --git a/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/internal/AutoConfiguredDataCapture.java b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/internal/AutoConfiguredDataCapture.java
new file mode 100644
index 000000000..9d25fe83f
--- /dev/null
+++ b/messaging-wrappers/testing/src/main/java/io/opentelemetry/contrib/messaging/wrappers/testing/internal/AutoConfiguredDataCapture.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.messaging.wrappers.testing.internal;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.exporter.logging.LoggingSpanExporter;
+import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
+import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import java.util.List;
+
+@AutoService(AutoConfigurationCustomizerProvider.class)
+public class AutoConfiguredDataCapture implements AutoConfigurationCustomizerProvider {
+
+ private static final InMemorySpanExporter inMemorySpanExporter = InMemorySpanExporter.create();
+
+ /*
+ Returns the spans which have been exported by the autoconfigured global OpenTelemetry SDK.
+ */
+ public static List getSpans() {
+ return inMemorySpanExporter.getFinishedSpanItems();
+ }
+
+ @Override
+ public void customize(AutoConfigurationCustomizer autoConfiguration) {
+ autoConfiguration.addSpanExporterCustomizer(
+ (spanExporter, config) -> {
+ // we piggy-back onto the autoconfigured logging exporter for now,
+ // because that one uses a SimpleSpanProcessor which does not impose a batching delay
+ if (spanExporter instanceof LoggingSpanExporter) {
+ inMemorySpanExporter.reset();
+ return SpanExporter.composite(inMemorySpanExporter, spanExporter);
+ }
+ return spanExporter;
+ });
+ }
+
+ @Override
+ public int order() {
+ // There might be other autoconfigurations wrapping SpanExporters,
+ // which can result in us failing to detect it
+ // We avoid this by ensuring that we run first
+ return Integer.MIN_VALUE;
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 27db34c41..45b417279 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -53,6 +53,10 @@ include(":jmx-scraper")
include(":jmx-scraper:test-app")
include(":jmx-scraper:test-webapp")
include(":maven-extension")
+include(":messaging-wrappers:aliyun-mns-sdk")
+include(":messaging-wrappers:api")
+include(":messaging-wrappers:kafka-clients")
+include(":messaging-wrappers:testing")
include(":micrometer-meter-provider")
include(":noop-api")
include(":processors")