Skip to content

Commit 79734bf

Browse files
committed
feat: add OpenTelemetry Tracing with Interceptors for Query and Write Operations
Signed-off-by: chenhuan <[email protected]>
1 parent 1a2f51f commit 79734bf

File tree

8 files changed

+379
-8
lines changed

8 files changed

+379
-8
lines changed

opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import lombok.Getter;
2121
import lombok.Setter;
2222

23+
import java.util.HashMap;
24+
import java.util.Map;
25+
2326
@AllArgsConstructor
2427
@Getter
2528
@Setter
@@ -44,6 +47,8 @@ public class Query {
4447
*/
4548
private Precision precision;
4649

50+
private Map<String, Object> attributes = new HashMap<>();
51+
4752
public Query(String command) {
4853
this.command = command;
4954
}
@@ -53,4 +58,19 @@ public Query(String command, String database, String retentionPolicy) {
5358
this.database = database;
5459
this.retentionPolicy = retentionPolicy;
5560
}
61+
62+
public Query(String command, String database, String retentionPolicy, Precision precision) {
63+
this.command = command;
64+
this.database = database;
65+
this.retentionPolicy = retentionPolicy;
66+
this.precision = precision;
67+
}
68+
69+
public void setAttribute(String key, Object value) {
70+
attributes.put(key, value);
71+
}
72+
73+
public Object getAttribute(String key) {
74+
return attributes.get(key);
75+
}
5676
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2025 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.api;
18+
19+
import lombok.Getter;
20+
import lombok.Setter;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
@Getter
26+
@Setter
27+
public class Write {
28+
private String database;
29+
private String retentionPolicy;
30+
private String measurement;
31+
private String lineProtocol;
32+
private String precision;
33+
34+
private Map<String, Object> attributes = new HashMap<>();
35+
36+
public Write(String database, String retentionPolicy, String measurement,
37+
String lineProtocol, String precision) {
38+
this.database = database;
39+
this.retentionPolicy = retentionPolicy;
40+
this.measurement = measurement;
41+
this.lineProtocol = lineProtocol;
42+
this.precision = precision;
43+
}
44+
45+
public void setAttribute(String key, Object value) {
46+
attributes.put(key, value);
47+
}
48+
49+
public Object getAttribute(String key) {
50+
return attributes.get(key);
51+
}
52+
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,50 @@
3232
import io.opengemini.client.api.QueryResult;
3333
import io.opengemini.client.api.RetentionPolicy;
3434
import io.opengemini.client.api.RpConfig;
35+
import io.opengemini.client.api.Write;
3536
import io.opengemini.client.common.BaseClient;
3637
import io.opengemini.client.common.CommandFactory;
3738
import io.opengemini.client.common.HeaderConst;
3839
import io.opengemini.client.common.JacksonService;
3940
import io.opengemini.client.common.ResultMapper;
41+
import io.opengemini.client.interceptor.Interceptor;
4042
import org.apache.commons.lang3.StringUtils;
4143
import org.jetbrains.annotations.NotNull;
4244

4345
import java.io.IOException;
4446
import java.nio.charset.StandardCharsets;
47+
import java.util.ArrayList;
48+
import java.util.Collections;
4549
import java.util.List;
4650
import java.util.Optional;
4751
import java.util.StringJoiner;
4852
import java.util.concurrent.CompletableFuture;
4953

5054
public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
55+
private final List<Interceptor> interceptors = new ArrayList<>();
5156
protected final Configuration conf;
52-
5357
private final HttpClient client;
5458

5559
public OpenGeminiClient(@NotNull Configuration conf) {
5660
super(conf);
5761
this.conf = conf;
5862
AuthConfig authConfig = conf.getAuthConfig();
5963
HttpClientConfig httpConfig = conf.getHttpConfig();
64+
if (httpConfig == null) {
65+
httpConfig = new HttpClientConfig.Builder().build();
66+
conf.setHttpConfig(httpConfig);
67+
}
6068
if (authConfig != null && authConfig.getAuthType().equals(AuthType.PASSWORD)) {
6169
httpConfig.addRequestFilter(
6270
new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword())));
6371
}
6472
this.client = HttpClientFactory.createHttpClient(httpConfig);
6573
}
6674

75+
public void addInterceptors(Interceptor... interceptors) {
76+
Collections.addAll(this.interceptors, interceptors);
77+
}
78+
6779
/**
6880
* {@inheritDoc}
6981
*/
@@ -195,9 +207,21 @@ public CompletableFuture<Pong> ping() {
195207
*
196208
* @param query the query to execute.
197209
*/
198-
protected CompletableFuture<QueryResult> executeQuery(Query query) {
199-
String queryUrl = getQueryUrl(query);
200-
return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class));
210+
public CompletableFuture<QueryResult> executeQuery(Query query) {
211+
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
212+
interceptors.stream()
213+
.map(interceptor -> interceptor.queryBefore(query))
214+
.toArray(CompletableFuture[]::new)
215+
);
216+
217+
return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> {
218+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
219+
interceptors.stream()
220+
.map(interceptor -> interceptor.queryAfter(query, response))
221+
.toArray(CompletableFuture[]::new)
222+
);
223+
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
224+
}));
201225
}
202226

203227
/**
@@ -217,9 +241,32 @@ protected CompletableFuture<QueryResult> executePostQuery(Query query) {
217241
* @param retentionPolicy the name of the retention policy.
218242
* @param lineProtocol the line protocol string to write.
219243
*/
220-
protected CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
221-
String writeUrl = getWriteUrl(database, retentionPolicy);
222-
return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class));
244+
public CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
245+
Write write = new Write(
246+
database,
247+
retentionPolicy,
248+
"default_measurement",
249+
lineProtocol,
250+
"ns"
251+
);
252+
253+
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
254+
interceptors.stream()
255+
.map(interceptor -> interceptor.writeBefore(write))
256+
.toArray(CompletableFuture[]::new)
257+
);
258+
259+
return beforeFutures.thenCompose(voidResult ->
260+
executeHttpWrite(write).thenCompose(response -> {
261+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
262+
interceptors.stream()
263+
.map(interceptor -> interceptor.writeAfter(write, response))
264+
.toArray(CompletableFuture[]::new)
265+
);
266+
return afterFutures.thenCompose(voidResult2 ->
267+
convertResponse(response, Void.class));
268+
})
269+
);
223270
}
224271

225272
/**
@@ -258,7 +305,7 @@ private CompletableFuture<HttpResponse> get(String url) {
258305

259306
private CompletableFuture<HttpResponse> post(String url, String body) {
260307
return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8),
261-
headers);
308+
headers);
262309
}
263310

264311
@Override
@@ -270,4 +317,14 @@ public void close() throws IOException {
270317
public String toString() {
271318
return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}';
272319
}
320+
321+
private CompletableFuture<HttpResponse> executeHttpQuery(Query query) {
322+
String queryUrl = getQueryUrl(query);
323+
return get(queryUrl);
324+
}
325+
326+
private CompletableFuture<HttpResponse> executeHttpWrite(Write write) {
327+
String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy());
328+
return post(writeUrl, write.getLineProtocol());
329+
}
273330
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2025 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.impl;
18+
19+
import io.opentelemetry.api.trace.Tracer;
20+
import lombok.Getter;
21+
import lombok.Setter;
22+
23+
/**
24+
* Configuration for OpenTelemetry tracing.
25+
*/
26+
@Getter
27+
@Setter
28+
public class OpenTelemetryConfig {
29+
private volatile Tracer tracer;
30+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2025 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.interceptor;
18+
19+
import io.github.openfacade.http.HttpResponse;
20+
import io.opengemini.client.api.Query;
21+
import io.opengemini.client.api.Write;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
25+
/**
26+
* Interceptor interface for OpenGemini client operations.
27+
* Allows custom logic to be executed before and after query and write operations.
28+
*/
29+
public interface Interceptor {
30+
31+
/**
32+
* Executes before a query operation.
33+
*
34+
* @param query the query to be executed
35+
* @return a CompletableFuture that completes when the interceptor logic is done
36+
*/
37+
CompletableFuture<Void> queryBefore(Query query);
38+
39+
/**
40+
* Executes after a query operation.
41+
*
42+
* @param query the query that was executed
43+
* @param response the HTTP response from the query
44+
* @return a CompletableFuture that completes when the interceptor logic is done
45+
*/
46+
CompletableFuture<Void> queryAfter(Query query, HttpResponse response);
47+
48+
/**
49+
* Executes before a write operation.
50+
*
51+
* @param write the write operation to be executed
52+
* @return a CompletableFuture that completes when the interceptor logic is done
53+
*/
54+
CompletableFuture<Void> writeBefore(Write write);
55+
56+
/**
57+
* Executes after a write operation.
58+
*
59+
* @param write the write operation that was executed
60+
* @param response the HTTP response from the write
61+
* @return a CompletableFuture that completes when the interceptor logic is done
62+
*/
63+
CompletableFuture<Void> writeAfter(Write write, HttpResponse response);
64+
}

0 commit comments

Comments
 (0)