Skip to content

Commit 0b8da23

Browse files
committed
feat: integrate OpenTelemetry observability
Signed-off-by: chenhuan <[email protected]>
1 parent 5dddefb commit 0b8da23

File tree

4 files changed

+129
-9
lines changed

4 files changed

+129
-9
lines changed

opengemini-client/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,17 @@
7272
<version>${vertx.version}</version>
7373
</dependency>
7474
</dependencies>
75+
<build>
76+
<plugins>
77+
<plugin>
78+
<groupId>org.apache.maven.plugins</groupId>
79+
<artifactId>maven-compiler-plugin</artifactId>
80+
<configuration>
81+
<source>9</source>
82+
<target>9</target>
83+
</configuration>
84+
</plugin>
85+
</plugins>
86+
</build>
7587

7688
</project>

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,50 @@
3232
import io.opengemini.client.api.QueryResult;
3333
import io.opengemini.client.api.RetentionPolicy;
3434
import io.opengemini.client.api.RpConfig;
35+
import io.opengemini.client.api.Write;
3536
import io.opengemini.client.common.BaseClient;
3637
import io.opengemini.client.common.CommandFactory;
3738
import io.opengemini.client.common.HeaderConst;
3839
import io.opengemini.client.common.JacksonService;
3940
import io.opengemini.client.common.ResultMapper;
41+
import io.opengemini.client.interceptor.Interceptor;
4042
import org.apache.commons.lang3.StringUtils;
4143
import org.jetbrains.annotations.NotNull;
4244

4345
import java.io.IOException;
4446
import java.nio.charset.StandardCharsets;
47+
import java.util.ArrayList;
48+
import java.util.Collections;
4549
import java.util.List;
4650
import java.util.Optional;
4751
import java.util.StringJoiner;
4852
import java.util.concurrent.CompletableFuture;
4953

5054
public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
55+
private final List<Interceptor> interceptors = new ArrayList<>();
5156
protected final Configuration conf;
52-
5357
private final HttpClient client;
5458

5559
public OpenGeminiClient(@NotNull Configuration conf) {
5660
super(conf);
5761
this.conf = conf;
5862
AuthConfig authConfig = conf.getAuthConfig();
5963
HttpClientConfig httpConfig = conf.getHttpConfig();
64+
if (httpConfig == null) {
65+
httpConfig = new HttpClientConfig.Builder().build();
66+
conf.setHttpConfig(httpConfig);
67+
}
6068
if (authConfig != null && authConfig.getAuthType().equals(AuthType.PASSWORD)) {
6169
httpConfig.addRequestFilter(
6270
new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword())));
6371
}
6472
this.client = HttpClientFactory.createHttpClient(httpConfig);
6573
}
6674

75+
public void addInterceptors(Interceptor... interceptors) {
76+
Collections.addAll(this.interceptors, interceptors);
77+
}
78+
6779
/**
6880
* {@inheritDoc}
6981
*/
@@ -195,9 +207,21 @@ public CompletableFuture<Pong> ping() {
195207
*
196208
* @param query the query to execute.
197209
*/
198-
protected CompletableFuture<QueryResult> executeQuery(Query query) {
199-
String queryUrl = getQueryUrl(query);
200-
return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class));
210+
public CompletableFuture<QueryResult> executeQuery(Query query) {
211+
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
212+
interceptors.stream()
213+
.map(interceptor -> interceptor.queryBefore(query))
214+
.toArray(CompletableFuture[]::new)
215+
);
216+
217+
return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> {
218+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
219+
interceptors.stream()
220+
.map(interceptor -> interceptor.queryAfter(query, response))
221+
.toArray(CompletableFuture[]::new)
222+
);
223+
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
224+
}));
201225
}
202226

203227
/**
@@ -217,9 +241,32 @@ protected CompletableFuture<QueryResult> executePostQuery(Query query) {
217241
* @param retentionPolicy the name of the retention policy.
218242
* @param lineProtocol the line protocol string to write.
219243
*/
220-
protected CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
221-
String writeUrl = getWriteUrl(database, retentionPolicy);
222-
return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class));
244+
public CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
245+
Write write = new Write(
246+
database,
247+
retentionPolicy,
248+
"default_measurement",
249+
lineProtocol,
250+
"ns"
251+
);
252+
253+
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
254+
interceptors.stream()
255+
.map(interceptor -> interceptor.writeBefore(write))
256+
.toArray(CompletableFuture[]::new)
257+
);
258+
259+
return beforeFutures.thenCompose(voidResult ->
260+
executeHttpWrite(write).thenCompose(response -> {
261+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
262+
interceptors.stream()
263+
.map(interceptor -> interceptor.writeAfter(write, response))
264+
.toArray(CompletableFuture[]::new)
265+
);
266+
return afterFutures.thenCompose(voidResult2 ->
267+
convertResponse(response, Void.class));
268+
})
269+
);
223270
}
224271

225272
/**
@@ -258,7 +305,7 @@ private CompletableFuture<HttpResponse> get(String url) {
258305

259306
private CompletableFuture<HttpResponse> post(String url, String body) {
260307
return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8),
261-
headers);
308+
headers);
262309
}
263310

264311
@Override
@@ -270,4 +317,14 @@ public void close() throws IOException {
270317
public String toString() {
271318
return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}';
272319
}
320+
321+
private CompletableFuture<HttpResponse> executeHttpQuery(Query query) {
322+
String queryUrl = getQueryUrl(query);
323+
return get(queryUrl);
324+
}
325+
326+
private CompletableFuture<HttpResponse> executeHttpWrite(Write write) {
327+
String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy());
328+
return post(writeUrl, write.getLineProtocol());
329+
}
273330
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
import io.opengemini.client.api.BatchConfig;
2222
import io.opengemini.client.api.Configuration;
2323
import io.opengemini.client.api.OpenGeminiException;
24+
import io.opengemini.client.interceptor.Interceptor;
25+
import io.opengemini.client.interceptor.OtelInterceptor;
2426
import org.jetbrains.annotations.NotNull;
2527

28+
import java.util.List;
29+
2630
public class OpenGeminiClientFactory {
2731
public static OpenGeminiClient create(@NotNull Configuration configuration) throws OpenGeminiException {
2832
if (configuration.getAddresses() == null || configuration.getAddresses().isEmpty()) {
@@ -54,4 +58,13 @@ public static OpenGeminiClient create(@NotNull Configuration configuration) thro
5458
}
5559
return new OpenGeminiClient(configuration);
5660
}
61+
62+
public static OpenGeminiClient createClientWithInterceptors(Configuration config) throws OpenGeminiException {
63+
OpenGeminiClient client = create(config);
64+
List<Interceptor> interceptors = List.of(
65+
new OtelInterceptor()
66+
);
67+
client.addInterceptors(interceptors.toArray(new Interceptor[0]));
68+
return client;
69+
}
5770
}

pom.xml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,39 @@
117117
</dependencyManagement>
118118

119119
<dependencies>
120+
<!-- OpenTelemetry 核心依赖 -->
121+
<dependency>
122+
<groupId>io.opentelemetry</groupId>
123+
<artifactId>opentelemetry-api</artifactId>
124+
<version>1.28.0</version>
125+
</dependency>
126+
<dependency>
127+
<groupId>io.opentelemetry</groupId>
128+
<artifactId>opentelemetry-sdk</artifactId>
129+
<version>1.28.0</version>
130+
</dependency>
131+
<dependency>
132+
<groupId>io.opentelemetry</groupId>
133+
<artifactId>opentelemetry-exporter-jaeger</artifactId>
134+
<version>1.28.0</version>
135+
</dependency>
136+
<dependency>
137+
<groupId>io.opentelemetry</groupId>
138+
<artifactId>opentelemetry-exporter-otlp</artifactId>
139+
<version>1.32.0</version>
140+
</dependency>
141+
142+
<!-- HTTP 客户端 -->
143+
<dependency>
144+
<groupId>com.squareup.okhttp3</groupId>
145+
<artifactId>okhttp</artifactId>
146+
<version>${okhttp.version}</version>
147+
</dependency>
148+
<dependency>
149+
<groupId>io.opentelemetry</groupId>
150+
<artifactId>opentelemetry-semconv</artifactId>
151+
<version>1.28.0-alpha</version>
152+
</dependency>
120153
<dependency>
121154
<groupId>io.opentelemetry</groupId>
122155
<artifactId>opentelemetry-api</artifactId>
@@ -229,6 +262,7 @@
229262
<artifactId>protobuf-maven-plugin</artifactId>
230263
<version>${maven-protobuf-maven-plugin}</version>
231264
<configuration>
265+
<!--suppress UnresolvedMavenProperty -->
232266
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
233267
</protocArtifact>
234268
</configuration>
@@ -242,6 +276,7 @@
242276
</goals>
243277
<configuration>
244278
<pluginId>grpc-java</pluginId>
279+
<!--suppress UnresolvedMavenProperty -->
245280
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
246281
</pluginArtifact>
247282
</configuration>
@@ -255,6 +290,7 @@
255290
</goals>
256291
<configuration>
257292
<pluginId>grpc-vertx</pluginId>
293+
<!--suppress UnresolvedMavenProperty -->
258294
<pluginArtifact>
259295
io.vertx:vertx-grpc-protoc-plugin:${vertx.version}:exe:${os.detected.classifier}
260296
</pluginArtifact>
@@ -342,6 +378,7 @@
342378
<localCheckout>true</localCheckout>
343379
<pushChanges>false</pushChanges>
344380
<mavenExecutorId>forked-path</mavenExecutorId>
381+
<!--suppress UnresolvedMavenProperty -->
345382
<arguments>-Dgpg.passphrase=${gpg.passphrase}</arguments>
346383
</configuration>
347384
<dependencies>
@@ -389,7 +426,7 @@
389426
<extensions>true</extensions>
390427
<configuration>
391428
<publishingServerId>central</publishingServerId>
392-
<tokenAuth>true</tokenAuth>
429+
<!-- <tokenAuth>true</tokenAuth>-->
393430
</configuration>
394431
</plugin>
395432
</plugins>
@@ -449,6 +486,7 @@
449486
</execution>
450487
</executions>
451488
<configuration>
489+
<!--suppress UnresolvedMavenProperty -->
452490
<passphraseServerId>${gpg.passphrase}</passphraseServerId>
453491
<gpgArguments>
454492
<arg>--pinentry-mode</arg>

0 commit comments

Comments
 (0)