Skip to content

Commit 58b9003

Browse files
mznetlaurit
andauthored
OpenSearch Transport v3.0 Implementation (#14823)
Co-authored-by: Lauri Tulmin <[email protected]>
1 parent 31b8a88 commit 58b9003

File tree

13 files changed

+909
-0
lines changed

13 files changed

+909
-0
lines changed

.fossa.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,9 @@ targets:
757757
- type: gradle
758758
path: ./
759759
target: ':instrumentation:openai:openai-java-1.1:library'
760+
- type: gradle
761+
path: ./
762+
target: ':instrumentation:opensearch:opensearch-java-3.0:javaagent'
760763
- type: gradle
761764
path: ./
762765
target: ':instrumentation:opensearch:opensearch-rest-1.0:javaagent'
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("org.opensearch.client")
8+
module.set("opensearch-java")
9+
versions.set("[3.0,)")
10+
}
11+
}
12+
13+
otelJava {
14+
minJavaVersionSupported.set(JavaVersion.VERSION_11)
15+
}
16+
17+
dependencies {
18+
library("org.opensearch.client:opensearch-java:3.0.0")
19+
compileOnly("com.google.auto.value:auto-value-annotations")
20+
annotationProcessor("com.google.auto.value:auto-value")
21+
22+
testImplementation("org.opensearch.client:opensearch-rest-client:3.0.0")
23+
testImplementation(project(":instrumentation:opensearch:opensearch-rest-common:testing"))
24+
testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-5.0:javaagent"))
25+
26+
// For testing AwsSdk2Transport
27+
testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent"))
28+
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
29+
testImplementation("software.amazon.awssdk:auth:2.22.0")
30+
testImplementation("software.amazon.awssdk:identity-spi:2.22.0")
31+
testImplementation("software.amazon.awssdk:apache-client:2.22.0")
32+
testImplementation("software.amazon.awssdk:netty-nio-client:2.22.0")
33+
testImplementation("software.amazon.awssdk:regions:2.22.0")
34+
}
35+
36+
tasks {
37+
test {
38+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
39+
}
40+
41+
val testStableSemconv by registering(Test::class) {
42+
testClassesDirs = sourceSets.test.get().output.classesDirs
43+
classpath = sourceSets.test.get().runtimeClasspath
44+
jvmArgs("-Dotel.semconv-stability.opt-in=database")
45+
}
46+
47+
check {
48+
dependsOn(testStableSemconv)
49+
}
50+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;
7+
8+
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesGetter;
9+
import io.opentelemetry.semconv.incubating.DbIncubatingAttributes;
10+
import javax.annotation.Nullable;
11+
12+
final class OpenSearchAttributesGetter
13+
implements DbClientAttributesGetter<OpenSearchRequest, Void> {
14+
15+
@Override
16+
public String getDbSystem(OpenSearchRequest request) {
17+
return DbIncubatingAttributes.DbSystemNameIncubatingValues.OPENSEARCH;
18+
}
19+
20+
@Override
21+
@Nullable
22+
public String getDbNamespace(OpenSearchRequest request) {
23+
return null;
24+
}
25+
26+
@Override
27+
@Nullable
28+
public String getDbQueryText(OpenSearchRequest request) {
29+
return request.getMethod() + " " + request.getOperation();
30+
}
31+
32+
@Override
33+
@Nullable
34+
public String getDbOperationName(OpenSearchRequest request) {
35+
return request.getMethod();
36+
}
37+
38+
@Nullable
39+
@Override
40+
public String getResponseStatus(@Nullable Void response, @Nullable Throwable error) {
41+
return null; // Response status is handled by HTTP instrumentation
42+
}
43+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;
7+
8+
import static java.util.Collections.singletonList;
9+
10+
import com.google.auto.service.AutoService;
11+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import java.util.List;
14+
15+
@AutoService(InstrumentationModule.class)
16+
public class OpenSearchInstrumentationModule extends InstrumentationModule {
17+
public OpenSearchInstrumentationModule() {
18+
super("opensearch-java", "opensearch-java-3.0", "opensearch");
19+
}
20+
21+
@Override
22+
public List<TypeInstrumentation> typeInstrumentations() {
23+
return singletonList(new OpenSearchTransportInstrumentation());
24+
}
25+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;
7+
8+
import com.google.auto.value.AutoValue;
9+
10+
@AutoValue
11+
public abstract class OpenSearchRequest {
12+
13+
public static OpenSearchRequest create(String method, String endpoint) {
14+
return new AutoValue_OpenSearchRequest(method, endpoint);
15+
}
16+
17+
public abstract String getMethod();
18+
19+
public abstract String getOperation();
20+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.opensearch.v3_0.OpenSearchSingletons.instrumenter;
9+
10+
import io.opentelemetry.context.Context;
11+
import java.util.function.BiConsumer;
12+
13+
public final class OpenSearchResponseHandler implements BiConsumer<Object, Throwable> {
14+
private final Context context;
15+
private final OpenSearchRequest otelRequest;
16+
17+
public OpenSearchResponseHandler(Context context, OpenSearchRequest otelRequest) {
18+
this.context = context;
19+
this.otelRequest = otelRequest;
20+
}
21+
22+
@Override
23+
public void accept(Object response, Throwable error) {
24+
// OpenSearch responses don't provide response information, so the span is closed with null.
25+
instrumenter().end(context, otelRequest, null, error);
26+
}
27+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesExtractor;
10+
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientMetrics;
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientSpanNameExtractor;
12+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
13+
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
14+
15+
public final class OpenSearchSingletons {
16+
private static final Instrumenter<OpenSearchRequest, Void> INSTRUMENTER = createInstrumenter();
17+
18+
public static Instrumenter<OpenSearchRequest, Void> instrumenter() {
19+
return INSTRUMENTER;
20+
}
21+
22+
private static Instrumenter<OpenSearchRequest, Void> createInstrumenter() {
23+
OpenSearchAttributesGetter dbClientAttributesGetter = new OpenSearchAttributesGetter();
24+
25+
return Instrumenter.<OpenSearchRequest, Void>builder(
26+
GlobalOpenTelemetry.get(),
27+
"io.opentelemetry.opensearch-java-3.0",
28+
DbClientSpanNameExtractor.create(dbClientAttributesGetter))
29+
.addAttributesExtractor(DbClientAttributesExtractor.create(dbClientAttributesGetter))
30+
.addOperationMetrics(DbClientMetrics.get())
31+
.buildInstrumenter(SpanKindExtractor.alwaysClient());
32+
}
33+
34+
private OpenSearchSingletons() {}
35+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;
7+
8+
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
9+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
10+
import static io.opentelemetry.javaagent.instrumentation.opensearch.v3_0.OpenSearchSingletons.instrumenter;
11+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
12+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
13+
import static net.bytebuddy.matcher.ElementMatchers.named;
14+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
15+
16+
import io.opentelemetry.context.Context;
17+
import io.opentelemetry.context.Scope;
18+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
19+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
20+
import java.util.concurrent.CompletableFuture;
21+
import net.bytebuddy.asm.Advice;
22+
import net.bytebuddy.description.type.TypeDescription;
23+
import net.bytebuddy.matcher.ElementMatcher;
24+
import org.opensearch.client.transport.Endpoint;
25+
26+
public class OpenSearchTransportInstrumentation implements TypeInstrumentation {
27+
@Override
28+
public ElementMatcher<TypeDescription> typeMatcher() {
29+
return implementsInterface(named("org.opensearch.client.transport.OpenSearchTransport"));
30+
}
31+
32+
@Override
33+
public void transform(TypeTransformer transformer) {
34+
transformer.applyAdviceToMethod(
35+
isMethod()
36+
.and(isPublic())
37+
.and(named("performRequest"))
38+
.and(takesArgument(0, Object.class))
39+
.and(takesArgument(1, named("org.opensearch.client.transport.Endpoint"))),
40+
this.getClass().getName() + "$PerformRequestAdvice");
41+
42+
transformer.applyAdviceToMethod(
43+
isMethod()
44+
.and(isPublic())
45+
.and(named("performRequestAsync"))
46+
.and(takesArgument(0, Object.class))
47+
.and(takesArgument(1, named("org.opensearch.client.transport.Endpoint"))),
48+
this.getClass().getName() + "$PerformRequestAsyncAdvice");
49+
}
50+
51+
@SuppressWarnings("unused")
52+
public static class PerformRequestAdvice {
53+
54+
@Advice.OnMethodEnter(suppress = Throwable.class)
55+
public static void onEnter(
56+
@Advice.Argument(0) Object request,
57+
@Advice.Argument(1) Endpoint<Object, Object, Object> endpoint,
58+
@Advice.Local("otelRequest") OpenSearchRequest otelRequest,
59+
@Advice.Local("otelContext") Context context,
60+
@Advice.Local("otelScope") Scope scope) {
61+
62+
Context parentContext = currentContext();
63+
otelRequest =
64+
OpenSearchRequest.create(endpoint.method(request), endpoint.requestUrl(request));
65+
if (!instrumenter().shouldStart(parentContext, otelRequest)) {
66+
return;
67+
}
68+
69+
context = instrumenter().start(parentContext, otelRequest);
70+
scope = context.makeCurrent();
71+
}
72+
73+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
74+
public static void stopSpan(
75+
@Advice.Thrown Throwable throwable,
76+
@Advice.Local("otelRequest") OpenSearchRequest otelRequest,
77+
@Advice.Local("otelContext") Context context,
78+
@Advice.Local("otelScope") Scope scope) {
79+
80+
if (scope == null) {
81+
return;
82+
}
83+
scope.close();
84+
85+
instrumenter().end(context, otelRequest, null, throwable);
86+
}
87+
}
88+
89+
@SuppressWarnings("unused")
90+
public static class PerformRequestAsyncAdvice {
91+
92+
@Advice.OnMethodEnter(suppress = Throwable.class)
93+
public static void onEnter(
94+
@Advice.Argument(0) Object request,
95+
@Advice.Argument(value = 1, readOnly = false) Endpoint<Object, Object, Object> endpoint,
96+
@Advice.Local("otelRequest") OpenSearchRequest otelRequest,
97+
@Advice.Local("otelContext") Context context,
98+
@Advice.Local("otelScope") Scope scope) {
99+
100+
Context parentContext = currentContext();
101+
otelRequest =
102+
OpenSearchRequest.create(endpoint.method(request), endpoint.requestUrl(request));
103+
if (!instrumenter().shouldStart(parentContext, otelRequest)) {
104+
return;
105+
}
106+
107+
context = instrumenter().start(parentContext, otelRequest);
108+
scope = context.makeCurrent();
109+
}
110+
111+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
112+
public static void stopSpan(
113+
@Advice.Thrown Throwable throwable,
114+
@Advice.Return(readOnly = false) CompletableFuture<Object> future,
115+
@Advice.Local("otelRequest") OpenSearchRequest otelRequest,
116+
@Advice.Local("otelContext") Context context,
117+
@Advice.Local("otelScope") Scope scope) {
118+
119+
if (scope == null) {
120+
return;
121+
}
122+
scope.close();
123+
124+
if (throwable != null) {
125+
instrumenter().end(context, otelRequest, null, throwable);
126+
}
127+
128+
future.whenComplete(new OpenSearchResponseHandler(context, otelRequest));
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)