Skip to content

Commit 4baa694

Browse files
authored
Add instrumentation for hibernate reactive (#9304)
1 parent 575627d commit 4baa694

File tree

15 files changed

+356
-71
lines changed

15 files changed

+356
-71
lines changed

docs/supported-libraries.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ or [contributing](../CONTRIBUTING.md).
1818
These are the supported libraries and frameworks:
1919

2020
| Library/Framework | Auto-instrumented versions | Standalone Library Instrumentation [1] | Semantic Conventions |
21-
| ------------------------------------------------------------------------------------------------------------------------------------------- |-------------------------------| --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------- |
21+
|---------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------| --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------- |
2222
| [Akka Actors](https://doc.akka.io/docs/akka/current/typed/index.html) | 2.5+ | N/A | Context propagation |
2323
| [Akka HTTP](https://doc.akka.io/docs/akka-http/current/index.html) | 10.0+ | N/A | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] |
2424
| [Apache Axis2](https://axis.apache.org/axis2/java/core/) | 1.6+ | N/A | Provides `http.route` [2], Controller Spans [3] |
@@ -65,6 +65,7 @@ These are the supported libraries and frameworks:
6565
| [Guava ListenableFuture](https://guava.dev/releases/snapshot/api/docs/com/google/common/util/concurrent/ListenableFuture.html) | 10.0+ | [opentelemetry-guava-10.0](../instrumentation/guava-10.0/library) | Context propagation |
6666
| [GWT](http://www.gwtproject.org/) | 2.0+ | N/A | [RPC Server Spans] |
6767
| [Hibernate](https://github.com/hibernate/hibernate-orm) | 3.3+ | N/A | none |
68+
| [Hibernate Reactive](https://hibernate.org/reactive) | 1.0+ | N/A | none |
6869
| [HikariCP](https://github.com/brettwooldridge/HikariCP) | 3.0+ | [opentelemetry-hikaricp-3.0](../instrumentation/hikaricp-3.0/library) | [Database Pool Metrics] |
6970
| [HttpURLConnection](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/net/HttpURLConnection.html) | Java 8+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |
7071
| [Hystrix](https://github.com/Netflix/Hystrix) | 1.4+ | N/A | none |
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("org.hibernate.reactive")
8+
module.set("hibernate-reactive-core")
9+
versions.set("(,)")
10+
assertInverse.set(true)
11+
}
12+
}
13+
14+
dependencies {
15+
compileOnly("org.hibernate.reactive:hibernate-reactive-core:1.0.0.Final")
16+
17+
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
18+
testInstrumentation(project(":instrumentation:vertx:vertx-sql-client-4.0:javaagent"))
19+
20+
library("io.vertx:vertx-sql-client:4.4.2")
21+
compileOnly("io.vertx:vertx-codegen:4.4.2")
22+
23+
testLibrary("io.vertx:vertx-pg-client:4.4.2")
24+
testLibrary("io.vertx:vertx-codegen:4.4.2")
25+
}
26+
27+
val latestDepTest = findProperty("testLatestDeps") as Boolean
28+
29+
testing {
30+
suites {
31+
val hibernateReactive1Test by registering(JvmTestSuite::class) {
32+
dependencies {
33+
implementation("org.testcontainers:testcontainers")
34+
if (latestDepTest) {
35+
implementation("org.hibernate.reactive:hibernate-reactive-core:1.+")
36+
implementation("io.vertx:vertx-pg-client:+")
37+
} else {
38+
implementation("org.hibernate.reactive:hibernate-reactive-core:1.0.0.Final")
39+
implementation("io.vertx:vertx-pg-client:4.1.5")
40+
}
41+
}
42+
}
43+
44+
val hibernateReactive2Test by registering(JvmTestSuite::class) {
45+
dependencies {
46+
implementation("org.testcontainers:testcontainers")
47+
if (latestDepTest) {
48+
implementation("org.hibernate.reactive:hibernate-reactive-core:2.+")
49+
implementation("io.vertx:vertx-pg-client:+")
50+
} else {
51+
implementation("org.hibernate.reactive:hibernate-reactive-core:2.0.0.Final")
52+
implementation("io.vertx:vertx-pg-client:4.4.2")
53+
}
54+
}
55+
}
56+
}
57+
}
58+
59+
tasks {
60+
withType<Test>().configureEach {
61+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
62+
}
63+
named("compileHibernateReactive2TestJava", JavaCompile::class).configure {
64+
options.release.set(11)
65+
}
66+
val testJavaVersion =
67+
gradle.startParameter.projectProperties.get("testJavaVersion")?.let(JavaVersion::toVersion)
68+
?: JavaVersion.current()
69+
if (testJavaVersion.isJava8) {
70+
named("hibernateReactive2Test", Test::class).configure {
71+
enabled = false
72+
}
73+
if (latestDepTest) {
74+
named("hibernateReactive1Test", Test::class).configure {
75+
enabled = false
76+
}
77+
}
78+
}
79+
80+
check {
81+
dependsOn(testing.suites)
82+
}
83+
}
Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_NAME;
1515
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_PORT;
1616

17+
import io.opentelemetry.api.trace.Span;
1718
import io.opentelemetry.api.trace.SpanKind;
1819
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1920
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@@ -98,10 +99,15 @@ void testMutiny() {
9899
() -> {
99100
mutinySessionFactory
100101
.withSession(
101-
session ->
102-
session
103-
.find(Value.class, 1L)
104-
.invoke(value -> testing.runWithSpan("callback", () -> {})))
102+
session -> {
103+
if (!Span.current().getSpanContext().isValid()) {
104+
throw new IllegalStateException("missing parent span");
105+
}
106+
107+
return session
108+
.find(Value.class, 1L)
109+
.invoke(value -> testing.runWithSpan("callback", () -> {}));
110+
})
105111
.await()
106112
.atMost(Duration.ofSeconds(30));
107113
});
@@ -117,10 +123,15 @@ void testStage() throws Exception {
117123
() ->
118124
stageSessionFactory
119125
.withSession(
120-
session ->
121-
session
122-
.find(Value.class, 1L)
123-
.thenAccept(value -> testing.runWithSpan("callback", () -> {})))
126+
session -> {
127+
if (!Span.current().getSpanContext().isValid()) {
128+
throw new IllegalStateException("missing parent span");
129+
}
130+
131+
return session
132+
.find(Value.class, 1L)
133+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
134+
})
124135
.toCompletableFuture())
125136
.get(30, TimeUnit.SECONDS);
126137

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_NAME;
1515
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_PORT;
1616

17+
import io.opentelemetry.api.trace.Span;
1718
import io.opentelemetry.api.trace.SpanKind;
1819
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1920
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@@ -98,10 +99,15 @@ void testMutiny() {
9899
() -> {
99100
mutinySessionFactory
100101
.withSession(
101-
session ->
102-
session
103-
.find(Value.class, 1L)
104-
.invoke(value -> testing.runWithSpan("callback", () -> {})))
102+
session -> {
103+
if (!Span.current().getSpanContext().isValid()) {
104+
throw new IllegalStateException("missing parent span");
105+
}
106+
107+
return session
108+
.find(Value.class, 1L)
109+
.invoke(value -> testing.runWithSpan("callback", () -> {}));
110+
})
105111
.await()
106112
.atMost(Duration.ofSeconds(30));
107113
});
@@ -117,10 +123,15 @@ void testStage() throws Exception {
117123
() ->
118124
stageSessionFactory
119125
.withSession(
120-
session ->
121-
session
122-
.find(Value.class, 1L)
123-
.thenAccept(value -> testing.runWithSpan("callback", () -> {})))
126+
session -> {
127+
if (!Span.current().getSpanContext().isValid()) {
128+
throw new IllegalStateException("missing parent span");
129+
}
130+
131+
return session
132+
.find(Value.class, 1L)
133+
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
134+
})
124135
.toCompletableFuture())
125136
.get(30, TimeUnit.SECONDS);
126137

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.hibernate.reactive.v1_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import io.smallrye.mutiny.Uni;
11+
import io.smallrye.mutiny.operators.UniOperator;
12+
import io.smallrye.mutiny.subscription.UniSubscriber;
13+
import io.smallrye.mutiny.subscription.UniSubscription;
14+
15+
public final class ContextOperator<T> extends UniOperator<T, T> {
16+
private final Context context;
17+
18+
public ContextOperator(Uni<? extends T> upstream, Context context) {
19+
super(upstream);
20+
this.context = context;
21+
}
22+
23+
public static <T> Uni<T> plug(Uni<T> uni) {
24+
if (uni instanceof ContextOperator) {
25+
return uni;
26+
}
27+
Context parentContext = Context.current();
28+
if (parentContext == Context.root()) {
29+
return uni;
30+
}
31+
32+
return uni.plug(u -> new ContextOperator<>(u, parentContext));
33+
}
34+
35+
@Override
36+
public void subscribe(UniSubscriber<? super T> downstream) {
37+
try (Scope ignore = context.makeCurrent()) {
38+
upstream().subscribe().withSubscriber(new ContextSubscriber<>(downstream, context));
39+
}
40+
}
41+
42+
private static class ContextSubscriber<T> implements UniSubscriber<T> {
43+
private final UniSubscriber<? super T> downstream;
44+
private final Context context;
45+
46+
private ContextSubscriber(UniSubscriber<? super T> downstream, Context context) {
47+
this.downstream = downstream;
48+
this.context = context;
49+
}
50+
51+
@Override
52+
public void onSubscribe(UniSubscription uniSubscription) {
53+
try (Scope ignore = context.makeCurrent()) {
54+
downstream.onSubscribe(uniSubscription);
55+
}
56+
}
57+
58+
@Override
59+
public void onItem(T t) {
60+
try (Scope ignore = context.makeCurrent()) {
61+
downstream.onItem(t);
62+
}
63+
}
64+
65+
@Override
66+
public void onFailure(Throwable throwable) {
67+
try (Scope ignore = context.makeCurrent()) {
68+
downstream.onFailure(throwable);
69+
}
70+
}
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.hibernate.reactive.v1_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.util.function.Function;
11+
12+
public final class FunctionWrapper<T, R> implements Function<T, R> {
13+
private final Function<T, R> delegate;
14+
private final Context context;
15+
16+
private FunctionWrapper(Function<T, R> delegate, Context context) {
17+
this.delegate = delegate;
18+
this.context = context;
19+
}
20+
21+
public static <T, R> Function<T, R> wrap(Function<T, R> function) {
22+
if (function instanceof FunctionWrapper) {
23+
return function;
24+
}
25+
Context context = Context.current();
26+
if (context == Context.root()) {
27+
return function;
28+
}
29+
30+
return new FunctionWrapper<>(function, context);
31+
}
32+
33+
@Override
34+
public R apply(T t) {
35+
try (Scope ignore = context.makeCurrent()) {
36+
return delegate.apply(t);
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)