1+ /*
2+ * Copyright 2024 openGemini Authors
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package io .opengemini .client .interceptor ;
18+
19+ import io .github .openfacade .http .HttpResponse ;
20+ import io .opengemini .client .api .Query ;
21+ import io .opengemini .client .api .Write ;
22+ import io .opengemini .client .impl .OpenTelemetryConfig ;
23+ import io .opentelemetry .api .trace .Span ;
24+ import io .opentelemetry .api .trace .SpanKind ;
25+ import io .opentelemetry .api .trace .StatusCode ;
26+ import io .opentelemetry .context .Scope ;
27+
28+ import java .util .concurrent .CompletableFuture ;
29+
30+ public class OtelInterceptor implements Interceptor {
31+ private static final io .opentelemetry .api .trace .Tracer tracer = OpenTelemetryConfig .getTracer ();
32+
33+ @ Override
34+ public CompletableFuture <Void > queryBefore (Query query ) {
35+ return CompletableFuture .runAsync (() -> {
36+ Span querySpan = tracer .spanBuilder ("query" )
37+ .setSpanKind (SpanKind .CLIENT )
38+ .setAttribute ("database" , query .getDatabase () != null ? query .getDatabase () : "unknown" )
39+ .setAttribute ("command" , query .getCommand ())
40+ .startSpan ();
41+
42+ Span queryBeforeSpan = tracer .spanBuilder ("queryBefore" )
43+ .setParent (io .opentelemetry .context .Context .current ().with (querySpan ))
44+ .setSpanKind (SpanKind .INTERNAL )
45+ .startSpan ();
46+
47+ try (Scope scope = queryBeforeSpan .makeCurrent ()) {
48+ query .setAttribute ("querySpan" , querySpan );
49+ query .setAttribute ("queryBeforeSpan" , queryBeforeSpan );
50+ } finally {
51+ queryBeforeSpan .end ();
52+ }
53+ });
54+ }
55+
56+ @ Override
57+ public CompletableFuture <Void > queryAfter (Query query , HttpResponse response ) {
58+ return CompletableFuture .runAsync (() -> {
59+ Span querySpan = (Span ) query .getAttribute ("querySpan" );
60+ if (querySpan != null ) {
61+ Span queryAfterSpan = tracer .spanBuilder ("queryAfter" )
62+ .setParent (io .opentelemetry .context .Context .current ().with (querySpan ))
63+ .setSpanKind (SpanKind .INTERNAL )
64+ .startSpan ();
65+
66+ try (Scope scope = queryAfterSpan .makeCurrent ()) {
67+ int statusCode = response .statusCode ();
68+ queryAfterSpan .setAttribute ("status_code" , statusCode );
69+ if (statusCode >= 400 ) {
70+ String errorBody = response .bodyAsString ();
71+ queryAfterSpan .setStatus (StatusCode .ERROR , "HTTP error: " + statusCode );
72+ queryAfterSpan .setAttribute ("error.message" , errorBody );
73+ querySpan .setStatus (StatusCode .ERROR , "Query failed: " + errorBody );
74+ } else {
75+ queryAfterSpan .setStatus (StatusCode .OK );
76+ querySpan .setStatus (StatusCode .OK );
77+ }
78+ } finally {
79+ queryAfterSpan .end ();
80+ querySpan .end ();
81+ }
82+ }
83+ });
84+ }
85+
86+ @ Override
87+ public CompletableFuture <Void > writeBefore (Write write ) {
88+ return CompletableFuture .runAsync (() -> {
89+ Span writeSpan = tracer .spanBuilder ("write" )
90+ .setSpanKind (SpanKind .CLIENT )
91+ .setAttribute ("database" , write .getDatabase ())
92+ .setAttribute ("retention_policy" , write .getRetentionPolicy ())
93+ .setAttribute ("measurement" , write .getMeasurement ())
94+ .startSpan ();
95+
96+ Span writeBeforeSpan = tracer .spanBuilder ("writeBefore" )
97+ .setParent (io .opentelemetry .context .Context .current ().with (writeSpan ))
98+ .setSpanKind (SpanKind .INTERNAL )
99+ .startSpan ();
100+
101+ try (Scope scope = writeBeforeSpan .makeCurrent ()) {
102+ write .setAttribute ("writeSpan" , writeSpan );
103+ write .setAttribute ("writeBeforeSpan" , writeBeforeSpan );
104+ } finally {
105+ writeBeforeSpan .end ();
106+ }
107+ });
108+ }
109+
110+ @ Override
111+ public CompletableFuture <Void > writeAfter (Write write , HttpResponse response ) {
112+ return CompletableFuture .runAsync (() -> {
113+ Span writeSpan = (Span ) write .getAttribute ("writeSpan" );
114+ if (writeSpan != null ) {
115+ Span writeAfterSpan = tracer .spanBuilder ("writeAfter" )
116+ .setParent (io .opentelemetry .context .Context .current ().with (writeSpan ))
117+ .setSpanKind (SpanKind .INTERNAL )
118+ .startSpan ();
119+
120+ try (Scope scope = writeAfterSpan .makeCurrent ()) {
121+ int statusCode = response .statusCode ();
122+ writeAfterSpan .setAttribute ("status_code" , statusCode );
123+
124+ if (statusCode >= 400 ) {
125+ String errorBody = response .bodyAsString ();
126+ writeAfterSpan .setStatus (StatusCode .ERROR , "HTTP error: " + statusCode );
127+ writeAfterSpan .setAttribute ("error.message" , errorBody );
128+ writeSpan .setStatus (StatusCode .ERROR , "Write failed: " + errorBody );
129+ } else {
130+ writeAfterSpan .setStatus (StatusCode .OK );
131+ writeSpan .setStatus (StatusCode .OK );
132+ }
133+ } finally {
134+ writeAfterSpan .end ();
135+ writeSpan .end ();
136+ }
137+ }
138+ });
139+ }
140+ }
0 commit comments