Skip to content

Commit bbe38bb

Browse files
committed
feat: add unit tests to otel-integration
Signed-off-by: chenhuan <[email protected]>
1 parent 7594713 commit bbe38bb

File tree

11 files changed

+226
-167
lines changed

11 files changed

+226
-167
lines changed

opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.opengemini.client.api;
1818

19-
import io.opentelemetry.api.trace.Span;
2019
import lombok.AllArgsConstructor;
2120
import lombok.Getter;
2221
import lombok.Setter;
@@ -74,4 +73,4 @@ public void setAttribute(String key, Object value) {
7473
public Object getAttribute(String key) {
7574
return attributes.get(key);
7675
}
77-
}
76+
}

opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,4 @@ public void setAttribute(String key, Object value) {
4949
public Object getAttribute(String key) {
5050
return attributes.get(key);
5151
}
52-
}
52+
}

opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ public BaseClient(Configuration conf) {
5858
}
5959
String httpPrefix;
6060
HttpClientConfig httpConfig = conf.getHttpConfig();
61-
if (httpConfig != null) {
62-
httpConfig = new HttpClientConfig.Builder().build();
63-
conf.setHttpConfig(httpConfig);
64-
}
6561
if (httpConfig != null && httpConfig.tlsConfig() != null) {
6662
httpPrefix = "https://";
6763
}else {

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java

Lines changed: 27 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,7 @@
3838

3939
public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
4040
private final List<Interceptor> interceptors = new ArrayList<>();
41-
42-
/**
43-
* Add interceptors to the client.
44-
*/
45-
public void addInterceptors(Interceptor... interceptors) {
46-
Collections.addAll(this.interceptors, interceptors);
47-
}
48-
49-
50-
5141
protected final Configuration conf;
52-
5342
private final HttpClient client;
5443

5544
public OpenGeminiClient(@NotNull Configuration conf) {
@@ -68,6 +57,10 @@ public OpenGeminiClient(@NotNull Configuration conf) {
6857
this.client = HttpClientFactory.createHttpClient(httpConfig);
6958
}
7059

60+
public void addInterceptors(Interceptor... interceptors) {
61+
Collections.addAll(this.interceptors, interceptors);
62+
}
63+
7164
/**
7265
* {@inheritDoc}
7366
*/
@@ -163,7 +156,7 @@ public CompletableFuture<Void> write(String database, String retentionPolicy, Po
163156
if (StringUtils.isEmpty(body)) {
164157
return CompletableFuture.completedFuture(null);
165158
}
166-
return executeWrite(database, retentionPolicy, body );
159+
return executeWrite(database, retentionPolicy, body);
167160
}
168161

169162
@Override
@@ -201,25 +194,20 @@ public CompletableFuture<Pong> ping() {
201194
*/
202195
public CompletableFuture<QueryResult> executeQuery(Query query) {
203196
String queryUrl = getQueryUrl(query);
204-
205-
// 执行所有queryBefore拦截器
206197
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
207198
interceptors.stream()
208199
.map(interceptor -> interceptor.queryBefore(query))
209200
.toArray(CompletableFuture[]::new)
210201
);
211202

212-
return beforeFutures.thenCompose(voidResult -> {
213-
return executeHttpQuery(query).thenCompose(response -> {
214-
// 执行所有queryAfter拦截器
215-
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
216-
interceptors.stream()
217-
.map(interceptor -> interceptor.queryAfter(query, response))
218-
.toArray(CompletableFuture[]::new)
219-
);
220-
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
221-
});
222-
});
203+
return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> {
204+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
205+
interceptors.stream()
206+
.map(interceptor -> interceptor.queryAfter(query, response))
207+
.toArray(CompletableFuture[]::new)
208+
);
209+
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
210+
}));
223211
}
224212

225213
/**
@@ -244,28 +232,28 @@ public CompletableFuture<Void> executeWrite(String database, String retentionPol
244232
Write write = new Write(
245233
database,
246234
retentionPolicy,
247-
"default_measurement", // Default measurement name
235+
"default_measurement",
248236
lineProtocol,
249-
"ns" // Default precision
237+
"ns"
250238
);
251239

252-
// Execute all writeBefore interceptors
253240
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
254241
interceptors.stream()
255242
.map(interceptor -> interceptor.writeBefore(write))
256243
.toArray(CompletableFuture[]::new)
257244
);
258245

259-
return beforeFutures.thenCompose(voidResult -> {
260-
return executeHttpWrite(write).thenCompose(response -> { // response 是 io.github.openfacade.http.HttpResponse
261-
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
262-
interceptors.stream()
263-
.map(interceptor -> interceptor.writeAfter(write, response)) // 传递正确的类型
264-
.toArray(CompletableFuture[]::new)
265-
);
266-
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, Void.class));
267-
});
268-
});
246+
return beforeFutures.thenCompose(voidResult ->
247+
executeHttpWrite(write).thenCompose(response -> {
248+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
249+
interceptors.stream()
250+
.map(interceptor -> interceptor.writeAfter(write, response))
251+
.toArray(CompletableFuture[]::new)
252+
);
253+
return afterFutures.thenCompose(voidResult2 ->
254+
convertResponse(response, Void.class));
255+
})
256+
);
269257
}
270258

271259
/**
@@ -317,19 +305,13 @@ public String toString() {
317305
return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}';
318306
}
319307

320-
/**
321-
* 执行 HTTP 查询请求
322-
*/
323308
private CompletableFuture<HttpResponse> executeHttpQuery(Query query) {
324309
String queryUrl = getQueryUrl(query);
325310
return get(queryUrl);
326311
}
327312

328-
/**
329-
* 执行 HTTP 写入请求
330-
*/
331313
private CompletableFuture<HttpResponse> executeHttpWrite(Write write) {
332314
String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy());
333315
return post(writeUrl, write.getLineProtocol());
334316
}
335-
}
317+
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.opengemini.client.interceptor.OtelInterceptor;
2626
import org.jetbrains.annotations.NotNull;
2727

28-
import java.util.Arrays;
2928
import java.util.List;
3029

3130
public class OpenGeminiClientFactory {
@@ -61,16 +60,11 @@ public static OpenGeminiClient create(@NotNull Configuration configuration) thro
6160
}
6261

6362
public static OpenGeminiClient createClientWithInterceptors(Configuration config) throws OpenGeminiException {
64-
6563
OpenGeminiClient client = create(config);
66-
67-
// Add all interceptors at once
68-
List<Interceptor> interceptors = Arrays.asList(
64+
List<Interceptor> interceptors = List.of(
6965
new OtelInterceptor()
70-
// Other interceptors can be added here
7166
);
72-
7367
client.addInterceptors(interceptors.toArray(new Interceptor[0]));
7468
return client;
7569
}
76-
}
70+
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
* Configuration for OpenTelemetry tracing.
3131
*/
3232
public class OpenTelemetryConfig {
33-
3433
private static volatile OpenTelemetry openTelemetry;
3534
private static volatile Tracer tracer;
3635

@@ -73,4 +72,4 @@ public static Tracer getTracer() {
7372
}
7473
return tracer;
7574
}
76-
}
75+
}

opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,4 @@ public interface Interceptor {
6161
* @return a CompletableFuture that completes when the interceptor logic is done
6262
*/
6363
CompletableFuture<Void> writeAfter(Write write, HttpResponse response);
64-
}
64+
}

opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616

1717
package io.opengemini.client.interceptor;
1818

19+
import io.github.openfacade.http.HttpResponse;
1920
import io.opengemini.client.api.Query;
2021
import io.opengemini.client.api.Write;
2122
import io.opengemini.client.impl.OpenTelemetryConfig;
22-
import io.github.openfacade.http.HttpResponse;
2323
import io.opentelemetry.api.trace.Span;
2424
import io.opentelemetry.api.trace.SpanKind;
2525
import io.opentelemetry.api.trace.StatusCode;
@@ -120,6 +120,7 @@ public CompletableFuture<Void> writeAfter(Write write, HttpResponse response) {
120120
try (Scope scope = writeAfterSpan.makeCurrent()) {
121121
int statusCode = response.statusCode();
122122
writeAfterSpan.setAttribute("status_code", statusCode);
123+
123124
if (statusCode >= 400) {
124125
String errorBody = response.bodyAsString();
125126
writeAfterSpan.setStatus(StatusCode.ERROR, "HTTP error: " + statusCode);
@@ -136,4 +137,4 @@ public CompletableFuture<Void> writeAfter(Write write, HttpResponse response) {
136137
}
137138
});
138139
}
139-
}
140+
}

opengemini-client/src/test/java/io/opengemini/client/interceptor/OpenGeminiExample.java

Lines changed: 0 additions & 101 deletions
This file was deleted.

0 commit comments

Comments
 (0)