Skip to content

Commit 403e133

Browse files
authored
Improve hibernate reactive instrumentation (#9486)
1 parent 331aa04 commit 403e133

File tree

7 files changed

+379
-18
lines changed

7 files changed

+379
-18
lines changed

instrumentation/hibernate/hibernate-reactive-1.0/javaagent/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ testing {
5252
implementation("org.hibernate.reactive:hibernate-reactive-core:2.0.0.Final")
5353
implementation("io.vertx:vertx-pg-client:4.4.2")
5454
}
55+
compileOnly("io.vertx:vertx-codegen:4.4.2")
5556
}
5657
}
5758
}

instrumentation/hibernate/hibernate-reactive-1.0/javaagent/src/hibernateReactive1Test/java/io/opentelemetry/javaagent/instrumentation/hibernate/reactive/v1_0/HibernateReactiveTest.java

Lines changed: 145 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -123,28 +123,156 @@ void testStage() throws Exception {
123123
testing.runWithSpan(
124124
"parent",
125125
() ->
126-
Vertx.vertx()
127-
.getOrCreateContext()
128-
.runOnContext(
129-
event ->
130-
stageSessionFactory
131-
.withSession(
132-
session -> {
133-
if (!Span.current().getSpanContext().isValid()) {
134-
throw new IllegalStateException("missing parent span");
135-
}
136-
137-
return session
138-
.find(Value.class, 1L)
139-
.thenAccept(
140-
value -> testing.runWithSpan("callback", () -> {}));
141-
})
142-
.thenAccept(unused -> latch.countDown())));
126+
runWithVertx(
127+
() ->
128+
stageSessionFactory
129+
.withSession(
130+
session -> {
131+
if (!Span.current().getSpanContext().isValid()) {
132+
throw new IllegalStateException("missing parent span");
133+
}
134+
135+
return session
136+
.find(Value.class, 1L)
137+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
138+
})
139+
.thenAccept(unused -> latch.countDown())));
140+
latch.await(30, TimeUnit.SECONDS);
141+
142+
assertTrace();
143+
}
144+
145+
@Test
146+
void testStageWithStatelessSession() throws Exception {
147+
CountDownLatch latch = new CountDownLatch(1);
148+
testing.runWithSpan(
149+
"parent",
150+
() ->
151+
runWithVertx(
152+
() ->
153+
stageSessionFactory
154+
.withStatelessSession(
155+
session -> {
156+
if (!Span.current().getSpanContext().isValid()) {
157+
throw new IllegalStateException("missing parent span");
158+
}
159+
160+
return session
161+
.get(Value.class, 1L)
162+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
163+
})
164+
.thenAccept(unused -> latch.countDown())));
165+
latch.await(30, TimeUnit.SECONDS);
166+
167+
assertTrace();
168+
}
169+
170+
@Test
171+
void testStageSessionWithTransaction() throws Exception {
172+
CountDownLatch latch = new CountDownLatch(1);
173+
testing.runWithSpan(
174+
"parent",
175+
() ->
176+
runWithVertx(
177+
() ->
178+
stageSessionFactory
179+
.withSession(
180+
session -> {
181+
if (!Span.current().getSpanContext().isValid()) {
182+
throw new IllegalStateException("missing parent span");
183+
}
184+
185+
return session
186+
.withTransaction(transaction -> session.find(Value.class, 1L))
187+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
188+
})
189+
.thenAccept(unused -> latch.countDown())));
190+
latch.await(30, TimeUnit.SECONDS);
191+
192+
assertTrace();
193+
}
194+
195+
@Test
196+
void testStageStatelessSessionWithTransaction() throws Exception {
197+
CountDownLatch latch = new CountDownLatch(1);
198+
testing.runWithSpan(
199+
"parent",
200+
() ->
201+
runWithVertx(
202+
() ->
203+
stageSessionFactory
204+
.withStatelessSession(
205+
session -> {
206+
if (!Span.current().getSpanContext().isValid()) {
207+
throw new IllegalStateException("missing parent span");
208+
}
209+
210+
return session
211+
.withTransaction(transaction -> session.get(Value.class, 1L))
212+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
213+
})
214+
.thenAccept(unused -> latch.countDown())));
143215
latch.await(30, TimeUnit.SECONDS);
144216

145217
assertTrace();
146218
}
147219

220+
@Test
221+
void testStageOpenSession() throws Exception {
222+
CountDownLatch latch = new CountDownLatch(1);
223+
testing.runWithSpan(
224+
"parent",
225+
() ->
226+
runWithVertx(
227+
() ->
228+
stageSessionFactory
229+
.openSession()
230+
.thenApply(
231+
session -> {
232+
if (!Span.current().getSpanContext().isValid()) {
233+
throw new IllegalStateException("missing parent span");
234+
}
235+
236+
return session
237+
.find(Value.class, 1L)
238+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
239+
})
240+
.thenAccept(unused -> latch.countDown())));
241+
latch.await(30, TimeUnit.SECONDS);
242+
243+
assertTrace();
244+
}
245+
246+
@Test
247+
void testStageOpenStatelessSession() throws Exception {
248+
CountDownLatch latch = new CountDownLatch(1);
249+
testing.runWithSpan(
250+
"parent",
251+
() ->
252+
runWithVertx(
253+
() ->
254+
stageSessionFactory
255+
.openStatelessSession()
256+
.thenApply(
257+
session -> {
258+
if (!Span.current().getSpanContext().isValid()) {
259+
throw new IllegalStateException("missing parent span");
260+
}
261+
262+
return session
263+
.get(Value.class, 1L)
264+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
265+
})
266+
.thenAccept(unused -> latch.countDown())));
267+
latch.await(30, TimeUnit.SECONDS);
268+
269+
assertTrace();
270+
}
271+
272+
private static void runWithVertx(Runnable runnable) {
273+
Vertx.vertx().getOrCreateContext().runOnContext(event -> runnable.run());
274+
}
275+
148276
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
149277
private static void assertTrace() {
150278
testing.waitAndAssertTraces(

instrumentation/hibernate/hibernate-reactive-1.0/javaagent/src/hibernateReactive2Test/java/io/opentelemetry/javaagent/instrumentation/hibernate/reactive/v2_0/HibernateReactiveTest.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import io.opentelemetry.api.trace.SpanKind;
1919
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
2020
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
21+
import io.vertx.core.Vertx;
2122
import jakarta.persistence.EntityManagerFactory;
2223
import jakarta.persistence.Persistence;
2324
import java.time.Duration;
25+
import java.util.concurrent.CountDownLatch;
2426
import java.util.concurrent.TimeUnit;
2527
import org.hibernate.reactive.mutiny.Mutiny;
2628
import org.hibernate.reactive.stage.Stage;
@@ -138,6 +140,131 @@ void testStage() throws Exception {
138140
assertTrace();
139141
}
140142

143+
@Test
144+
void testStageWithStatelessSession() throws Exception {
145+
testing
146+
.runWithSpan(
147+
"parent",
148+
() ->
149+
stageSessionFactory
150+
.withStatelessSession(
151+
session -> {
152+
if (!Span.current().getSpanContext().isValid()) {
153+
throw new IllegalStateException("missing parent span");
154+
}
155+
156+
return session
157+
.get(Value.class, 1L)
158+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
159+
})
160+
.toCompletableFuture())
161+
.get(30, TimeUnit.SECONDS);
162+
163+
assertTrace();
164+
}
165+
166+
@Test
167+
void testStageSessionWithTransaction() throws Exception {
168+
testing
169+
.runWithSpan(
170+
"parent",
171+
() ->
172+
stageSessionFactory
173+
.withSession(
174+
session -> {
175+
if (!Span.current().getSpanContext().isValid()) {
176+
throw new IllegalStateException("missing parent span");
177+
}
178+
179+
return session
180+
.withTransaction(transaction -> session.find(Value.class, 1L))
181+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
182+
})
183+
.toCompletableFuture())
184+
.get(30, TimeUnit.SECONDS);
185+
186+
assertTrace();
187+
}
188+
189+
@Test
190+
void testStageStatelessSessionWithTransaction() throws Exception {
191+
testing
192+
.runWithSpan(
193+
"parent",
194+
() ->
195+
stageSessionFactory
196+
.withStatelessSession(
197+
session -> {
198+
if (!Span.current().getSpanContext().isValid()) {
199+
throw new IllegalStateException("missing parent span");
200+
}
201+
202+
return session
203+
.withTransaction(transaction -> session.get(Value.class, 1L))
204+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
205+
})
206+
.toCompletableFuture())
207+
.get(30, TimeUnit.SECONDS);
208+
209+
assertTrace();
210+
}
211+
212+
@Test
213+
void testStageOpenSession() throws Exception {
214+
CountDownLatch latch = new CountDownLatch(1);
215+
testing.runWithSpan(
216+
"parent",
217+
() ->
218+
runWithVertx(
219+
() ->
220+
stageSessionFactory
221+
.openSession()
222+
.thenApply(
223+
session -> {
224+
if (!Span.current().getSpanContext().isValid()) {
225+
throw new IllegalStateException("missing parent span");
226+
}
227+
228+
return session
229+
.find(Value.class, 1L)
230+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
231+
})
232+
.thenAccept(unused -> latch.countDown())));
233+
latch.await(30, TimeUnit.SECONDS);
234+
235+
assertTrace();
236+
}
237+
238+
@Test
239+
void testStageOpenStatelessSession() throws Exception {
240+
CountDownLatch latch = new CountDownLatch(1);
241+
testing.runWithSpan(
242+
"parent",
243+
() ->
244+
runWithVertx(
245+
() ->
246+
stageSessionFactory
247+
.openStatelessSession()
248+
.thenApply(
249+
session -> {
250+
if (!Span.current().getSpanContext().isValid()) {
251+
throw new IllegalStateException("missing parent span");
252+
}
253+
254+
return session
255+
.get(Value.class, 1L)
256+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
257+
})
258+
.thenAccept(unused -> latch.countDown())));
259+
latch.await(30, TimeUnit.SECONDS);
260+
261+
assertTrace();
262+
}
263+
264+
private static void runWithVertx(Runnable runnable) {
265+
Vertx.vertx().getOrCreateContext().runOnContext(event -> runnable.run());
266+
}
267+
141268
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
142269
private static void assertTrace() {
143270
testing.waitAndAssertTraces(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.hibernate.reactive.v1_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.CompletionStage;
12+
13+
public final class CompletionStageWrapper {
14+
15+
private CompletionStageWrapper() {}
16+
17+
public static <T> CompletionStage<T> wrap(CompletionStage<T> future) {
18+
Context context = Context.current();
19+
if (context != Context.root()) {
20+
return wrap(future, context);
21+
}
22+
return future;
23+
}
24+
25+
private static <T> CompletionStage<T> wrap(CompletionStage<T> completionStage, Context context) {
26+
CompletableFuture<T> result = new CompletableFuture<>();
27+
completionStage.whenComplete(
28+
(T value, Throwable throwable) -> {
29+
try (Scope ignored = context.makeCurrent()) {
30+
if (throwable != null) {
31+
result.completeExceptionally(throwable);
32+
} else {
33+
result.complete(value);
34+
}
35+
}
36+
});
37+
38+
return result;
39+
}
40+
}

instrumentation/hibernate/hibernate-reactive-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/hibernate/reactive/v1_0/HibernateReactiveInstrumentationModule.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public HibernateReactiveInstrumentationModule() {
2222
@Override
2323
public List<TypeInstrumentation> typeInstrumentations() {
2424
return asList(
25-
new StageSessionFactoryInstrumentation(), new MutinySessionFactoryInstrumentation());
25+
new StageSessionFactoryInstrumentation(),
26+
new StageSessionImplInstrumentation(),
27+
new MutinySessionFactoryInstrumentation());
2628
}
2729
}

0 commit comments

Comments
 (0)