Skip to content

Commit 9c03e7e

Browse files
committed
feat: add unit tests to otel-integration
Signed-off-by: chenhuan <[email protected]>
1 parent 7a4348f commit 9c03e7e

File tree

9 files changed

+223
-54
lines changed

9 files changed

+223
-54
lines changed

opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.opengemini.client.api;
1818

19-
import io.opentelemetry.api.trace.Span;
2019
import lombok.AllArgsConstructor;
2120
import lombok.Getter;
2221
import lombok.Setter;
@@ -74,4 +73,4 @@ public void setAttribute(String key, Object value) {
7473
public Object getAttribute(String key) {
7574
return attributes.get(key);
7675
}
77-
}
76+
}

opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,4 @@ public void setAttribute(String key, Object value) {
4949
public Object getAttribute(String key) {
5050
return attributes.get(key);
5151
}
52-
}
52+
}

opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,4 @@ protected String getQueryUrl(Query query) {
162162
public void close() throws IOException {
163163
scheduler.ifPresent(ExecutorService::shutdown);
164164
}
165-
}
165+
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java

Lines changed: 27 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,7 @@
3838

3939
public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
4040
private final List<Interceptor> interceptors = new ArrayList<>();
41-
42-
/**
43-
* Add interceptors to the client.
44-
*/
45-
public void addInterceptors(Interceptor... interceptors) {
46-
Collections.addAll(this.interceptors, interceptors);
47-
}
48-
49-
50-
5141
protected final Configuration conf;
52-
5342
private final HttpClient client;
5443

5544
public OpenGeminiClient(@NotNull Configuration conf) {
@@ -68,6 +57,10 @@ public OpenGeminiClient(@NotNull Configuration conf) {
6857
this.client = HttpClientFactory.createHttpClient(httpConfig);
6958
}
7059

60+
public void addInterceptors(Interceptor... interceptors) {
61+
Collections.addAll(this.interceptors, interceptors);
62+
}
63+
7164
/**
7265
* {@inheritDoc}
7366
*/
@@ -163,7 +156,7 @@ public CompletableFuture<Void> write(String database, String retentionPolicy, Po
163156
if (StringUtils.isEmpty(body)) {
164157
return CompletableFuture.completedFuture(null);
165158
}
166-
return executeWrite(database, retentionPolicy, body );
159+
return executeWrite(database, retentionPolicy, body);
167160
}
168161

169162
@Override
@@ -201,25 +194,20 @@ public CompletableFuture<Pong> ping() {
201194
*/
202195
public CompletableFuture<QueryResult> executeQuery(Query query) {
203196
String queryUrl = getQueryUrl(query);
204-
205-
// 执行所有queryBefore拦截器
206197
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
207198
interceptors.stream()
208199
.map(interceptor -> interceptor.queryBefore(query))
209200
.toArray(CompletableFuture[]::new)
210201
);
211202

212-
return beforeFutures.thenCompose(voidResult -> {
213-
return executeHttpQuery(query).thenCompose(response -> {
214-
// 执行所有queryAfter拦截器
215-
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
216-
interceptors.stream()
217-
.map(interceptor -> interceptor.queryAfter(query, response))
218-
.toArray(CompletableFuture[]::new)
219-
);
220-
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
221-
});
222-
});
203+
return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> {
204+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
205+
interceptors.stream()
206+
.map(interceptor -> interceptor.queryAfter(query, response))
207+
.toArray(CompletableFuture[]::new)
208+
);
209+
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
210+
}));
223211
}
224212

225213
/**
@@ -244,28 +232,28 @@ public CompletableFuture<Void> executeWrite(String database, String retentionPol
244232
Write write = new Write(
245233
database,
246234
retentionPolicy,
247-
"default_measurement", // Default measurement name
235+
"default_measurement",
248236
lineProtocol,
249-
"ns" // Default precision
237+
"ns"
250238
);
251239

252-
// Execute all writeBefore interceptors
253240
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
254241
interceptors.stream()
255242
.map(interceptor -> interceptor.writeBefore(write))
256243
.toArray(CompletableFuture[]::new)
257244
);
258245

259-
return beforeFutures.thenCompose(voidResult -> {
260-
return executeHttpWrite(write).thenCompose(response -> { // response 是 io.github.openfacade.http.HttpResponse
261-
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
262-
interceptors.stream()
263-
.map(interceptor -> interceptor.writeAfter(write, response)) // 传递正确的类型
264-
.toArray(CompletableFuture[]::new)
265-
);
266-
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, Void.class));
267-
});
268-
});
246+
return beforeFutures.thenCompose(voidResult ->
247+
executeHttpWrite(write).thenCompose(response -> {
248+
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
249+
interceptors.stream()
250+
.map(interceptor -> interceptor.writeAfter(write, response))
251+
.toArray(CompletableFuture[]::new)
252+
);
253+
return afterFutures.thenCompose(voidResult2 ->
254+
convertResponse(response, Void.class));
255+
})
256+
);
269257
}
270258

271259
/**
@@ -317,19 +305,13 @@ public String toString() {
317305
return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}';
318306
}
319307

320-
/**
321-
* 执行 HTTP 查询请求
322-
*/
323308
private CompletableFuture<HttpResponse> executeHttpQuery(Query query) {
324309
String queryUrl = getQueryUrl(query);
325310
return get(queryUrl);
326311
}
327312

328-
/**
329-
* 执行 HTTP 写入请求
330-
*/
331313
private CompletableFuture<HttpResponse> executeHttpWrite(Write write) {
332314
String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy());
333315
return post(writeUrl, write.getLineProtocol());
334316
}
335-
}
317+
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,4 @@ public static OpenGeminiClient createClientWithInterceptors(Configuration config
6767
client.addInterceptors(interceptors.toArray(new Interceptor[0]));
6868
return client;
6969
}
70-
}
70+
}

opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
* Configuration for OpenTelemetry tracing.
3131
*/
3232
public class OpenTelemetryConfig {
33-
3433
private static volatile OpenTelemetry openTelemetry;
3534
private static volatile Tracer tracer;
3635

@@ -73,4 +72,4 @@ public static Tracer getTracer() {
7372
}
7473
return tracer;
7574
}
76-
}
75+
}

opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,4 @@ public interface Interceptor {
6161
* @return a CompletableFuture that completes when the interceptor logic is done
6262
*/
6363
CompletableFuture<Void> writeAfter(Write write, HttpResponse response);
64-
}
64+
}

opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,4 @@ public CompletableFuture<Void> writeAfter(Write write, HttpResponse response) {
137137
}
138138
});
139139
}
140-
}
140+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright 2024 openGemini Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opengemini.client.interceptor;
18+
19+
import io.github.openfacade.http.HttpClientConfig;
20+
import io.opengemini.client.api.*;
21+
import io.opengemini.client.impl.OpenGeminiClient;
22+
import io.opengemini.client.impl.OpenGeminiClientFactory;
23+
import io.opengemini.client.impl.OpenTelemetryConfig;
24+
import lombok.Getter;
25+
import lombok.Setter;
26+
import org.junit.jupiter.api.BeforeAll;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.time.Duration;
33+
import java.util.Collections;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.TimeUnit;
37+
38+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
39+
import static org.junit.jupiter.api.Assertions.assertNotNull;
40+
41+
/**
42+
* Example demonstrating OpenGemini client usage with interceptors.
43+
*/
44+
45+
@Getter
46+
public class TracingIntegrationTest {
47+
@Setter
48+
private String database;
49+
@Setter
50+
private String retentionPolicy;
51+
@Setter
52+
private String lineProtocol;
53+
private static final Logger LOG = LoggerFactory.getLogger(TracingIntegrationTest.class);
54+
55+
private OpenGeminiClient openGeminiClient;
56+
57+
@BeforeEach
58+
void setUp() {
59+
HttpClientConfig httpConfig = new HttpClientConfig.Builder()
60+
.connectTimeout(Duration.ofSeconds(3))
61+
.timeout(Duration.ofSeconds(3))
62+
.build();
63+
Configuration configuration = Configuration.builder()
64+
.addresses(Collections.singletonList(new Address("127.0.0.1", 8086)))
65+
.httpConfig(httpConfig)
66+
.authConfig(new AuthConfig(AuthType.PASSWORD, "test", "testPwd123@".toCharArray(), null))
67+
.gzipEnabled(false)
68+
.build();
69+
this.openGeminiClient = new OpenGeminiClient(configuration);
70+
}
71+
72+
@Test
73+
void testClientCreation() {
74+
Configuration config = new Configuration();
75+
config.setAddresses(java.util.Collections.singletonList(new io.opengemini.client.api.Address("localhost", 8086)));
76+
if (config.getHttpConfig() == null) {
77+
config.setHttpConfig(new HttpClientConfig.Builder().build());
78+
}
79+
80+
assertDoesNotThrow(() -> {
81+
OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config);
82+
assertNotNull(client, "OpenGeminiClient should be created successfully");
83+
client.close();
84+
}, "Client creation should not throw an exception");
85+
}
86+
87+
@Test
88+
void testDatabaseCreation() {
89+
Configuration config = new Configuration();
90+
config.setAddresses(java.util.Collections.singletonList(new io.opengemini.client.api.Address("localhost", 8086)));
91+
if (config.getHttpConfig() == null) {
92+
config.setHttpConfig(new HttpClientConfig.Builder().build());
93+
}
94+
95+
assertDoesNotThrow(() -> {
96+
try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) {
97+
Query createDbQuery = new Query("CREATE DATABASE test_db");
98+
client.query(createDbQuery).get(10, TimeUnit.SECONDS);
99+
}
100+
}, "Database creation should not throw an exception");
101+
}
102+
103+
@Test
104+
void testQueryOperation() {
105+
Configuration config = new Configuration();
106+
config.setAddresses(java.util.Collections.singletonList(new Address("localhost", 8086)));
107+
if (config.getHttpConfig() == null) {
108+
config.setHttpConfig(new HttpClientConfig.Builder().build());
109+
}
110+
111+
assertDoesNotThrow(() -> {
112+
try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) {
113+
Query createDbQuery = new Query("CREATE DATABASE test_db");
114+
client.query(createDbQuery).get(10, TimeUnit.SECONDS);
115+
116+
Query showDbQuery = new Query("SHOW DATABASES");
117+
QueryResult result = client.query(showDbQuery).get(10, TimeUnit.SECONDS);
118+
assertNotNull(result, "Query result should not be null");
119+
}
120+
}, "Query operation should not throw an exception");
121+
}
122+
123+
@BeforeAll
124+
static void initializeTracing() {
125+
OpenTelemetryConfig.initialize();
126+
}
127+
128+
@Test
129+
void testWriteOperation() {
130+
Configuration config = new Configuration();
131+
config.setAddresses(java.util.Collections.singletonList(
132+
new Address("localhost", 8086)));
133+
134+
if (config.getHttpConfig() == null) {
135+
config.setHttpConfig(new HttpClientConfig.Builder().build());
136+
}
137+
138+
assertDoesNotThrow(() -> {
139+
try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) {
140+
Query createDbQuery = new Query("CREATE DATABASE test_db");
141+
client.query(createDbQuery).get(10, TimeUnit.SECONDS);
142+
143+
Thread.sleep(1000);
144+
145+
Write write = new Write(
146+
"test_db",
147+
"autogen",
148+
"temperature",
149+
"temperature,location=room1 value=25.5 " + System.currentTimeMillis(),
150+
"ns"
151+
);
152+
153+
client.executeWrite(
154+
write.getDatabase(),
155+
write.getRetentionPolicy(),
156+
write.getLineProtocol()
157+
).get(10, TimeUnit.SECONDS);
158+
}
159+
}, "Write operation should not throw an exception");
160+
}
161+
162+
@Test
163+
void testTracingIntegration() throws ExecutionException, InterruptedException {
164+
String databaseTestName = "tracing_test_db";
165+
CompletableFuture<Void> createdb = openGeminiClient.createDatabase(databaseTestName);
166+
createdb.get();
167+
168+
assertDoesNotThrow(() -> {
169+
170+
Write write = new Write(
171+
"tracing_test_db",
172+
"autogen",
173+
"tracing_measurement",
174+
"tracing_measurement,tag=test value=8 " + System.currentTimeMillis(),
175+
"ns"
176+
);
177+
178+
openGeminiClient.executeWrite(
179+
write.getDatabase(),
180+
write.getRetentionPolicy(),
181+
write.getLineProtocol()
182+
).get(10, TimeUnit.SECONDS);
183+
184+
Query query = new Query("SELECT * FROM tracing_measurement");
185+
openGeminiClient.query(query).get(10, TimeUnit.SECONDS);
186+
187+
}, "Tracing integration should not throw an exception");
188+
}
189+
}

0 commit comments

Comments
 (0)