Skip to content

Commit 34424be

Browse files
raphasilozangunalp
andauthored
Add OpenTelemetry support to PubSub configuration (#2946)
* Bump google-cloud libraries-bom version to 26.55.0 and add smallrye-reactive-messaging-otel dependency * Add OpenTelemetry support to PubSub configuration and connectors * Tests for enabling OTel for gcp pubsub --------- Co-authored-by: Ozan Gunalp <ozangunalp@gmail.com>
1 parent 92ab181 commit 34424be

File tree

7 files changed

+423
-35
lines changed

7 files changed

+423
-35
lines changed

smallrye-reactive-messaging-gcp-pubsub/pom.xml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<dependency>
2020
<groupId>com.google.cloud</groupId>
2121
<artifactId>libraries-bom</artifactId>
22-
<version>16.3.0</version>
22+
<version>26.55.0</version>
2323
<scope>import</scope>
2424
<type>pom</type>
2525
</dependency>
@@ -30,6 +30,11 @@
3030
<groupId>com.google.cloud</groupId>
3131
<artifactId>google-cloud-pubsub</artifactId>
3232
</dependency>
33+
<dependency>
34+
<groupId>io.smallrye.reactive</groupId>
35+
<artifactId>smallrye-reactive-messaging-otel</artifactId>
36+
<version>${project.version}</version>
37+
</dependency>
3338
<dependency>
3439
<groupId>io.smallrye.reactive</groupId>
3540
<artifactId>smallrye-reactive-messaging-provider</artifactId>
@@ -56,6 +61,16 @@
5661
<artifactId>testcontainers</artifactId>
5762
<scope>test</scope>
5863
</dependency>
64+
<dependency>
65+
<groupId>io.opentelemetry</groupId>
66+
<artifactId>opentelemetry-sdk-trace</artifactId>
67+
<scope>test</scope>
68+
</dependency>
69+
<dependency>
70+
<groupId>io.opentelemetry</groupId>
71+
<artifactId>opentelemetry-sdk-testing</artifactId>
72+
<scope>test</scope>
73+
</dependency>
5974
<dependency>
6075
<groupId>io.smallrye.reactive</groupId>
6176
<artifactId>smallrye-connector-attribute-processor</artifactId>

smallrye-reactive-messaging-gcp-pubsub/src/main/java/io/smallrye/reactive/messaging/gcp/pubsub/PubSubConfig.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,30 @@ public class PubSubConfig {
1818
private final String host;
1919
private final Integer port;
2020

21+
private final boolean otelEnabled;
22+
2123
public PubSubConfig(final String projectId, final String topic, final Path credentialPath, final boolean mockPubSubTopics,
22-
final String host, final Integer port) {
24+
final String host, final Integer port, boolean otelEnabled) {
2325
this.projectId = Objects.requireNonNull(projectId, msg.mustNotBeNull("projectId"));
2426
this.topic = Objects.requireNonNull(topic, msg.mustNotBeNull("topic"));
2527
this.credentialPath = credentialPath;
2628
this.subscription = null;
2729
this.mockPubSubTopics = mockPubSubTopics;
2830
this.host = host;
2931
this.port = port;
32+
this.otelEnabled = otelEnabled;
3033
}
3134

3235
public PubSubConfig(final String projectId, final String topic, final Path credentialPath, final String subscription,
33-
final boolean mockPubSubTopics, final String host, final Integer port) {
36+
final boolean mockPubSubTopics, final String host, final Integer port, boolean otelEnabled) {
3437
this.projectId = Objects.requireNonNull(projectId, msg.mustNotBeNull("projectId"));
3538
this.topic = Objects.requireNonNull(topic, msg.mustNotBeNull("topic"));
3639
this.credentialPath = credentialPath;
3740
this.subscription = subscription;
3841
this.mockPubSubTopics = mockPubSubTopics;
3942
this.host = host;
4043
this.port = port;
44+
this.otelEnabled = otelEnabled;
4145
}
4246

4347
public String getProjectId() {
@@ -68,6 +72,10 @@ public Integer getPort() {
6872
return port;
6973
}
7074

75+
public boolean isOtelEnabled() {
76+
return otelEnabled;
77+
}
78+
7179
@Override
7280
public boolean equals(final Object o) {
7381
if (this == o) {

smallrye-reactive-messaging-gcp-pubsub/src/main/java/io/smallrye/reactive/messaging/gcp/pubsub/PubSubConnector.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public class PubSubConnector implements InboundConnector, OutboundConnector {
5959
@ConfigProperty(name = "mock-pubsub-port")
6060
private Optional<Integer> port;
6161

62+
@Inject
63+
@ConfigProperty(name = "gcp-pubsub-otel-enabled", defaultValue = "false")
64+
private boolean otelEnabled;
65+
6266
@Inject
6367
private PubSubManager pubSubManager;
6468

@@ -81,7 +85,7 @@ public void destroy(@Observes @Destroyed(ApplicationScoped.class) final Object c
8185
@Override
8286
public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
8387
final PubSubConfig pubSubConfig = new PubSubConfig(getProjectId(config), getTopic(config), getCredentialPath(config),
84-
getSubscription(config), mockPubSubTopics, host.orElse(null), port.orElse(null));
88+
getSubscription(config), mockPubSubTopics, host.orElse(null), port.orElse(null), getOtelEnabled(config));
8589

8690
return Multi.createFrom().uni(Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> {
8791
if (isUseAdminClient(config)) {
@@ -97,7 +101,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
97101
@Override
98102
public Flow.Subscriber<? extends Message<?>> getSubscriber(final Config config) {
99103
final PubSubConfig pubSubConfig = new PubSubConfig(getProjectId(config), getTopic(config), getCredentialPath(config),
100-
mockPubSubTopics, host.orElse(null), port.orElse(null));
104+
mockPubSubTopics, host.orElse(null), port.orElse(null), getOtelEnabled(config));
101105

102106
return MultiUtils.via(m -> m.onItem()
103107
.transformToUniAndConcatenate(message -> Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> {
@@ -114,6 +118,11 @@ private String getProjectId(Config config) {
114118
.orElse(projectId);
115119
}
116120

121+
private boolean getOtelEnabled(Config config) {
122+
return config.getOptionalValue("otel-enabled", Boolean.class)
123+
.orElse(otelEnabled);
124+
}
125+
117126
boolean isUseAdminClient(Config config) {
118127
return config.getOptionalValue("use-admin-client", Boolean.class).orElse(true);
119128
}

smallrye-reactive-messaging-gcp-pubsub/src/main/java/io/smallrye/reactive/messaging/gcp/pubsub/PubSubManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import jakarta.annotation.PreDestroy;
1616
import jakarta.enterprise.context.ApplicationScoped;
17+
import jakarta.enterprise.inject.Instance;
18+
import jakarta.inject.Inject;
1719

1820
import org.eclipse.microprofile.reactive.messaging.Message;
1921

@@ -36,7 +38,9 @@
3638

3739
import io.grpc.ManagedChannel;
3840
import io.grpc.ManagedChannelBuilder;
41+
import io.opentelemetry.api.OpenTelemetry;
3942
import io.smallrye.mutiny.subscription.MultiEmitter;
43+
import io.smallrye.reactive.messaging.tracing.TracingUtils;
4044

4145
@ApplicationScoped
4246
public class PubSubManager {
@@ -48,6 +52,9 @@ public class PubSubManager {
4852
private final List<MultiEmitter<? super Message<?>>> emitters = new CopyOnWriteArrayList<>();
4953
private final List<ManagedChannel> channels = new CopyOnWriteArrayList<>();
5054

55+
@Inject
56+
private Instance<OpenTelemetry> openTelemetryInstance;
57+
5158
public Publisher publisher(final PubSubConfig config) {
5259
return publishers.computeIfAbsent(config, this::buildPublisher);
5360
}
@@ -142,6 +149,10 @@ private Publisher buildPublisher(final PubSubConfig config) {
142149
buildCredentialsProvider(config).ifPresent(publisherBuilder::setCredentialsProvider);
143150
buildTransportChannelProvider(config).ifPresent(publisherBuilder::setChannelProvider);
144151

152+
final var openTelemetry = TracingUtils.getOpenTelemetry(openTelemetryInstance);
153+
publisherBuilder.setOpenTelemetry(openTelemetry);
154+
publisherBuilder.setEnableOpenTelemetryTracing(config.isOtelEnabled());
155+
145156
return publisherBuilder.build();
146157
} catch (final IOException e) {
147158
throw ex.illegalStateUnableToBuildPublisher(e);
@@ -157,6 +168,10 @@ private Subscriber buildSubscriber(final PubSubConfig config, final PubSubMessag
157168
buildCredentialsProvider(config).ifPresent(subscriberBuilder::setCredentialsProvider);
158169
buildTransportChannelProvider(config).ifPresent(subscriberBuilder::setChannelProvider);
159170

171+
final var openTelemetry = TracingUtils.getOpenTelemetry(openTelemetryInstance);
172+
subscriberBuilder.setOpenTelemetry(openTelemetry);
173+
subscriberBuilder.setEnableOpenTelemetryTracing(config.isOtelEnabled());
174+
160175
return subscriberBuilder.build();
161176
}
162177

smallrye-reactive-messaging-gcp-pubsub/src/test/java/io/smallrye/reactive/messaging/gcp/pubsub/PubSubTest.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,65 +3,71 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.awaitility.Awaitility.await;
55

6-
import java.lang.annotation.Annotation;
6+
import java.lang.reflect.Method;
7+
import java.util.UUID;
78
import java.util.concurrent.Flow;
89

10+
import org.eclipse.microprofile.config.ConfigProvider;
911
import org.eclipse.microprofile.reactive.messaging.Message;
10-
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
12+
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral;
1113
import org.jboss.weld.environment.se.Weld;
1214
import org.jboss.weld.environment.se.WeldContainer;
1315
import org.junit.jupiter.api.AfterEach;
1416
import org.junit.jupiter.api.BeforeEach;
15-
import org.junit.jupiter.api.Disabled;
1617
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.TestInfo;
1719

1820
import com.google.pubsub.v1.ProjectTopicName;
1921
import com.google.pubsub.v1.TopicName;
2022

23+
import io.smallrye.config.SmallRyeConfigProviderResolver;
2124
import io.smallrye.mutiny.Multi;
2225
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
2326

2427
public class PubSubTest extends PubSubTestBase {
2528

2629
private WeldContainer container;
2730

28-
private static final String TOPIC = "pubsub-test";
31+
private String topic;
2932

3033
@BeforeEach
31-
public void initTest() {
32-
initConfiguration(TOPIC);
34+
public void initTest(TestInfo testInfo) {
35+
topic = testInfo.getTestMethod().map(Method::getName).orElse("") + "_" + UUID.randomUUID();
36+
initConfiguration(topic);
3337
final Weld weld = baseWeld();
3438
weld.addBeanClass(ConsumptionBean.class);
35-
addConfig(createSourceConfig(TOPIC, SUBSCRIPTION, PUBSUB_CONTAINER.getFirstMappedPort()));
39+
addConfig(createSourceConfig(topic, SUBSCRIPTION, PUBSUB_CONTAINER.getFirstMappedPort()));
3640
container = weld.initialize();
3741
}
3842

3943
@AfterEach
4044
public void afterEach() {
45+
if (container != null) {
46+
PubSubManager manager = container.select(PubSubManager.class).get();
47+
deleteTopicIfExists(manager, topic);
48+
container.shutdown();
49+
}
50+
SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig());
4151
clear();
42-
PubSubManager manager = container.select(PubSubManager.class).get();
43-
deleteTopicIfExists(manager, TOPIC);
44-
container.shutdown();
4552
}
4653

4754
@Test
48-
@Disabled("Failing on CI - to be investigated")
4955
public void testSourceAndSink() {
5056
final ConsumptionBean consumptionBean = container.select(ConsumptionBean.class).get();
5157

5258
// wait until the subscription is ready
5359
final PubSubManager manager = container.select(PubSubManager.class).get();
5460
await().until(() -> manager
5561
.topicAdminClient(config)
56-
.listTopicSubscriptions((TopicName) ProjectTopicName.of(PROJECT_ID, TOPIC))
62+
.listTopicSubscriptions((TopicName) ProjectTopicName.of(PROJECT_ID, topic))
5763
.getPage()
5864
.getPageElementCount() > 0);
5965

60-
send("Hello-0", TOPIC);
66+
send("Hello-0", topic);
6167
await().until(() -> consumptionBean.getMessages().size() == 1);
6268
assertThat(consumptionBean.getMessages().get(0)).isEqualTo("Hello-0");
6369
for (int i = 1; i < 11; i++) {
64-
send("Hello-" + i, TOPIC);
70+
send("Hello-" + i, topic);
6571
}
6672
await().until(() -> consumptionBean.getMessages().size() == 11);
6773
assertThat(consumptionBean.getMessages()).allSatisfy(s -> assertThat(s).startsWith("Hello-"));
@@ -84,17 +90,7 @@ private Flow.Subscriber<? extends Message<?>> createSinkSubscriber(final String
8490
}
8591

8692
private PubSubConnector getConnector() {
87-
return container.select(PubSubConnector.class, new Connector() {
88-
@Override
89-
public Class<? extends Annotation> annotationType() {
90-
return Connector.class;
91-
}
92-
93-
@Override
94-
public String value() {
95-
return PubSubConnector.CONNECTOR_NAME;
96-
}
97-
}).get();
93+
return container.select(PubSubConnector.class, ConnectorLiteral.of(PubSubConnector.CONNECTOR_NAME)).get();
9894
}
9995

10096
}

smallrye-reactive-messaging-gcp-pubsub/src/test/java/io/smallrye/reactive/messaging/gcp/pubsub/PubSubTestBase.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.smallrye.reactive.messaging.gcp.pubsub;
22

3+
import static io.smallrye.reactive.messaging.gcp.pubsub.i18n.PubSubLogging.log;
4+
import static org.awaitility.Awaitility.await;
5+
36
import java.io.File;
47
import java.io.IOException;
58
import java.nio.file.Files;
@@ -15,6 +18,8 @@
1518
import org.testcontainers.containers.output.Slf4jLogConsumer;
1619
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
1720

21+
import com.google.api.gax.rpc.AlreadyExistsException;
22+
import com.google.api.gax.rpc.NotFoundException;
1823
import com.google.pubsub.v1.TopicName;
1924

2025
import io.smallrye.config.inject.ConfigExtension;
@@ -27,6 +32,7 @@
2732
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
2833
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
2934
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
35+
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
3036
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
3137
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
3238
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
@@ -71,12 +77,32 @@ public static void stopPubSubContainer() {
7177

7278
public void initConfiguration(String topic) {
7379
config = new PubSubConfig(PROJECT_ID, topic, null, true, "localhost",
74-
PUBSUB_CONTAINER.getFirstMappedPort());
80+
PUBSUB_CONTAINER.getFirstMappedPort(), false);
81+
}
82+
83+
protected MapBasedConfig createSinkConfig(String channel, String topic, final int containerPort) {
84+
final String prefix = "mp.messaging.outgoing." + channel + ".";
85+
final Map<String, Object> config = new HashMap<>();
86+
config.put(prefix.concat("connector"), PubSubConnector.CONNECTOR_NAME);
87+
config.put(prefix.concat("topic"), topic);
88+
89+
// connector properties
90+
config.put("gcp-pubsub-project-id", PROJECT_ID);
91+
config.put("mock-pubsub-topics", true);
92+
config.put("mock-pubsub-host", "localhost");
93+
config.put("mock-pubsub-port", containerPort);
94+
95+
return new MapBasedConfig(config);
7596
}
7697

7798
protected MapBasedConfig createSourceConfig(final String topic, final String subscription,
7899
final int containerPort) {
79-
final String prefix = "mp.messaging.incoming.source.";
100+
return createSourceConfig("source", topic, subscription, containerPort);
101+
}
102+
103+
protected MapBasedConfig createSourceConfig(String channel, final String topic, final String subscription,
104+
final int containerPort) {
105+
final String prefix = "mp.messaging.incoming." + channel + ".";
80106
final Map<String, Object> config = new HashMap<>();
81107
config.put(prefix.concat("connector"), PubSubConnector.CONNECTOR_NAME);
82108
config.put(prefix.concat("topic"), topic);
@@ -100,6 +126,7 @@ static Weld baseWeld() {
100126
weld.addExtension(new ConfigExtension());
101127
weld.addBeanClass(MediatorFactory.class);
102128
weld.addBeanClass(MediatorManager.class);
129+
weld.addBeanClass(ConnectorFactories.class);
103130
weld.addBeanClass(InternalChannelRegistry.class);
104131
weld.addBeanClass(ConfiguredChannelFactory.class);
105132
weld.addBeanClass(ChannelProducer.class);
@@ -133,14 +160,41 @@ static void clear() {
133160
}
134161
}
135162

163+
public void createTopicIfNotExists(PubSubManager manager, String topic) {
164+
final TopicName topicName = TopicName.of(config.getProjectId(), topic);
165+
try (var client = manager.topicAdminClient(config)) {
166+
try {
167+
client.getTopic(topicName);
168+
} catch (final NotFoundException nf) {
169+
try {
170+
client.createTopic(topicName);
171+
} catch (final AlreadyExistsException ae) {
172+
log.topicExistAlready(topicName, ae);
173+
}
174+
}
175+
}
176+
}
177+
136178
void deleteTopicIfExists(PubSubManager manager, String topic) {
137179
System.out.println("Deleting topic " + TopicName.of(PROJECT_ID, topic));
138-
try {
139-
manager.topicAdminClient(config)
140-
.deleteTopic(TopicName.of(PROJECT_ID, topic));
180+
try (var client = manager.topicAdminClient(config)) {
181+
client.deleteTopic(TopicName.of(PROJECT_ID, topic));
141182
} catch (com.google.api.gax.rpc.NotFoundException notFoundException) {
142183
// The topic didn't exist.
143184
}
144185
}
145186

187+
void waitUntilSubscription(PubSubManager manager, String topic, String subscription) {
188+
try (var client = manager.topicAdminClient(this.config)) {
189+
await().until(() -> {
190+
for (String sub : client.listTopicSubscriptions(TopicName.of(PROJECT_ID, topic)).iterateAll()) {
191+
if (sub.endsWith(subscription)) {
192+
return true;
193+
}
194+
}
195+
return false;
196+
});
197+
}
198+
}
199+
146200
}

0 commit comments

Comments
 (0)