diff --git a/opengemini-client-api/pom.xml b/opengemini-client-api/pom.xml index f1a57824..637a7b57 100644 --- a/opengemini-client-api/pom.xml +++ b/opengemini-client-api/pom.xml @@ -37,4 +37,16 @@ ${annotations.version} + + + + org.apache.maven.plugins + maven-compiler-plugin + + 11 + 11 + + + + diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Point.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Point.java index 3ae246a4..6dadc20e 100644 --- a/opengemini-client-api/src/main/java/io/opengemini/client/api/Point.java +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Point.java @@ -20,6 +20,7 @@ import lombok.Setter; import java.math.BigDecimal; +import java.util.HashMap; import java.util.Map; @Getter @@ -40,6 +41,36 @@ public class Point { private Map tags; private Map fields; + public Point() { + this.fields = new HashMap<>(); + this.tags = new HashMap<>(); + } + + public Point measurement(String measurement) { + this.measurement = measurement; + return this; + } + + public Point addTag(String key, String value) { + if (key != null && value != null) { + this.tags.put(key, value); + } + return this; + } + + public Point addField(String key, Object value) { + if (key != null && value != null) { + this.fields.put(key, value); + } + return this; + } + + public Point time(long time, Precision precision) { + this.time = time; + this.precision = precision; + return this; + } + /** * Calculate the line protocol string for this point * diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java index d6beb138..c30efb75 100644 --- a/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Query.java @@ -20,6 +20,9 @@ import lombok.Getter; import lombok.Setter; +import java.util.HashMap; +import java.util.Map; + @AllArgsConstructor @Getter @Setter @@ -44,6 +47,8 @@ public class Query { */ private Precision precision; + private Map attributes = new HashMap<>(); + public Query(String command) { this.command = command; } @@ -53,4 +58,19 @@ public Query(String command, String database, String retentionPolicy) { this.database = database; this.retentionPolicy = retentionPolicy; } + + public Query(String command, String database, String retentionPolicy, Precision precision) { + this.command = command; + this.database = database; + this.retentionPolicy = retentionPolicy; + this.precision = precision; + } + + public void setAttribute(String key, Object value) { + attributes.put(key, value); + } + + public Object getAttribute(String key) { + return attributes.get(key); + } } diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java new file mode 100644 index 00000000..72758d73 --- /dev/null +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Write.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.api; + +import lombok.Getter; +import lombok.Setter; + +import java.util.HashMap; +import java.util.Map; + +@Getter +@Setter +public class Write { + private String database; + private String retentionPolicy; + private String measurement; + private String lineProtocol; + private String precision; + + private Map attributes = new HashMap<>(); + + public Write(String database, String retentionPolicy, String measurement, + String lineProtocol, String precision) { + this.database = database; + this.retentionPolicy = retentionPolicy; + this.measurement = measurement; + this.lineProtocol = lineProtocol; + this.precision = precision; + } + + public void setAttribute(String key, Object value) { + attributes.put(key, value); + } + + public Object getAttribute(String key) { + return attributes.get(key); + } +} diff --git a/opengemini-client/pom.xml b/opengemini-client/pom.xml index 5a92c691..09135cf7 100644 --- a/opengemini-client/pom.xml +++ b/opengemini-client/pom.xml @@ -72,5 +72,17 @@ ${vertx.version} + + + + org.apache.maven.plugins + maven-compiler-plugin + + 11 + 11 + + + + diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java index 7621b907..f98de502 100644 --- a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java @@ -21,35 +21,24 @@ import io.github.openfacade.http.HttpClientConfig; import io.github.openfacade.http.HttpClientFactory; import io.github.openfacade.http.HttpResponse; -import io.opengemini.client.api.AuthConfig; -import io.opengemini.client.api.AuthType; -import io.opengemini.client.api.Configuration; -import io.opengemini.client.api.OpenGeminiAsyncClient; -import io.opengemini.client.api.OpenGeminiException; -import io.opengemini.client.api.Point; -import io.opengemini.client.api.Pong; -import io.opengemini.client.api.Query; -import io.opengemini.client.api.QueryResult; -import io.opengemini.client.api.RetentionPolicy; -import io.opengemini.client.api.RpConfig; +import io.opengemini.client.api.*; import io.opengemini.client.common.BaseClient; import io.opengemini.client.common.CommandFactory; import io.opengemini.client.common.HeaderConst; import io.opengemini.client.common.JacksonService; import io.opengemini.client.common.ResultMapper; +import io.opengemini.client.interceptor.Interceptor; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Optional; -import java.util.StringJoiner; +import java.util.*; import java.util.concurrent.CompletableFuture; public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient { + private final List interceptors = new ArrayList<>(); protected final Configuration conf; - private final HttpClient client; public OpenGeminiClient(@NotNull Configuration conf) { @@ -57,6 +46,10 @@ public OpenGeminiClient(@NotNull Configuration conf) { this.conf = conf; AuthConfig authConfig = conf.getAuthConfig(); HttpClientConfig httpConfig = conf.getHttpConfig(); + if (httpConfig == null) { + httpConfig = new HttpClientConfig.Builder().build(); + conf.setHttpConfig(httpConfig); + } if (authConfig != null && authConfig.getAuthType().equals(AuthType.PASSWORD)) { httpConfig.addRequestFilter( new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword()))); @@ -64,6 +57,10 @@ public OpenGeminiClient(@NotNull Configuration conf) { this.client = HttpClientFactory.createHttpClient(httpConfig); } + public void addInterceptors(Interceptor... interceptors) { + Collections.addAll(this.interceptors, interceptors); + } + /** * {@inheritDoc} */ @@ -195,9 +192,22 @@ public CompletableFuture ping() { * * @param query the query to execute. */ - protected CompletableFuture executeQuery(Query query) { + public CompletableFuture executeQuery(Query query) { String queryUrl = getQueryUrl(query); - return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class)); + CompletableFuture beforeFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.queryBefore(query)) + .toArray(CompletableFuture[]::new) + ); + + return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> { + CompletableFuture afterFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.queryAfter(query, response)) + .toArray(CompletableFuture[]::new) + ); + return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class)); + })); } /** @@ -217,9 +227,33 @@ protected CompletableFuture executePostQuery(Query query) { * @param retentionPolicy the name of the retention policy. * @param lineProtocol the line protocol string to write. */ - protected CompletableFuture executeWrite(String database, String retentionPolicy, String lineProtocol) { + public CompletableFuture executeWrite(String database, String retentionPolicy, String lineProtocol) { String writeUrl = getWriteUrl(database, retentionPolicy); - return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class)); + Write write = new Write( + database, + retentionPolicy, + "default_measurement", + lineProtocol, + "ns" + ); + + CompletableFuture beforeFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.writeBefore(write)) + .toArray(CompletableFuture[]::new) + ); + + return beforeFutures.thenCompose(voidResult -> + executeHttpWrite(write).thenCompose(response -> { + CompletableFuture afterFutures = CompletableFuture.allOf( + interceptors.stream() + .map(interceptor -> interceptor.writeAfter(write, response)) + .toArray(CompletableFuture[]::new) + ); + return afterFutures.thenCompose(voidResult2 -> + convertResponse(response, Void.class)); + }) + ); } /** @@ -258,7 +292,7 @@ private CompletableFuture get(String url) { private CompletableFuture post(String url, String body) { return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8), - headers); + headers); } @Override @@ -270,4 +304,14 @@ public void close() throws IOException { public String toString() { return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}'; } + + private CompletableFuture executeHttpQuery(Query query) { + String queryUrl = getQueryUrl(query); + return get(queryUrl); + } + + private CompletableFuture executeHttpWrite(Write write) { + String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy()); + return post(writeUrl, write.getLineProtocol()); + } } diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java index 1a12862d..92b43d78 100644 --- a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClientFactory.java @@ -21,8 +21,12 @@ import io.opengemini.client.api.BatchConfig; import io.opengemini.client.api.Configuration; import io.opengemini.client.api.OpenGeminiException; +import io.opengemini.client.interceptor.Interceptor; +import io.opengemini.client.interceptor.OtelInterceptor; import org.jetbrains.annotations.NotNull; +import java.util.List; + public class OpenGeminiClientFactory { public static OpenGeminiClient create(@NotNull Configuration configuration) throws OpenGeminiException { if (configuration.getAddresses() == null || configuration.getAddresses().isEmpty()) { @@ -54,4 +58,13 @@ public static OpenGeminiClient create(@NotNull Configuration configuration) thro } return new OpenGeminiClient(configuration); } + + public static OpenGeminiClient createClientWithInterceptors(Configuration config) throws OpenGeminiException { + OpenGeminiClient client = create(config); + List interceptors = List.of( + new OtelInterceptor() + ); + client.addInterceptors(interceptors.toArray(new Interceptor[0])); + return client; + } } diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java new file mode 100644 index 00000000..e7b4f9bd --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenTelemetryConfig.java @@ -0,0 +1,75 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; + +/** + * Configuration for OpenTelemetry tracing. + */ +public class OpenTelemetryConfig { + private static volatile OpenTelemetry openTelemetry; + private static volatile Tracer tracer; + + public static synchronized void initialize() { + if (openTelemetry != null) { + return; + } + + try { + JaegerGrpcSpanExporter jaegerExporter = JaegerGrpcSpanExporter.builder() + .setEndpoint("http://localhost:14250") + .build(); + + BatchSpanProcessor spanProcessor = BatchSpanProcessor.builder(jaegerExporter) + .setScheduleDelay(100, java.util.concurrent.TimeUnit.MILLISECONDS) + .build(); + + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(spanProcessor) + .setResource(Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "opengemini-client-java") + )) + .build(); + + openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + + tracer = openTelemetry.getTracer("opengemini-client-java"); + } catch (Exception e) { + // Fallback to no-op implementation + openTelemetry = OpenTelemetry.noop(); + tracer = openTelemetry.getTracer("opengemini-client-java"); + } + } + + public static Tracer getTracer() { + if (tracer == null) { + initialize(); + } + return tracer; + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java b/opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java new file mode 100644 index 00000000..24b28a96 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/interceptor/Interceptor.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.interceptor; + +import io.github.openfacade.http.HttpResponse; +import io.opengemini.client.api.Query; +import io.opengemini.client.api.Write; + +import java.util.concurrent.CompletableFuture; + +/** + * Interceptor interface for OpenGemini client operations. + * Allows custom logic to be executed before and after query and write operations. + */ +public interface Interceptor { + + /** + * Executes before a query operation. + * + * @param query the query to be executed + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture queryBefore(Query query); + + /** + * Executes after a query operation. + * + * @param query the query that was executed + * @param response the HTTP response from the query + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture queryAfter(Query query, HttpResponse response); + + /** + * Executes before a write operation. + * + * @param write the write operation to be executed + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture writeBefore(Write write); + + /** + * Executes after a write operation. + * + * @param write the write operation that was executed + * @param response the HTTP response from the write + * @return a CompletableFuture that completes when the interceptor logic is done + */ + CompletableFuture writeAfter(Write write, HttpResponse response); +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java b/opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java new file mode 100644 index 00000000..4e11c010 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/interceptor/OtelInterceptor.java @@ -0,0 +1,140 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.interceptor; + +import io.github.openfacade.http.HttpResponse; +import io.opengemini.client.api.Query; +import io.opengemini.client.api.Write; +import io.opengemini.client.impl.OpenTelemetryConfig; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; + +import java.util.concurrent.CompletableFuture; + +public class OtelInterceptor implements Interceptor { + private static final io.opentelemetry.api.trace.Tracer tracer = OpenTelemetryConfig.getTracer(); + + @Override + public CompletableFuture queryBefore(Query query) { + return CompletableFuture.runAsync(() -> { + Span querySpan = tracer.spanBuilder("query") + .setSpanKind(SpanKind.CLIENT) + .setAttribute("database", query.getDatabase() != null ? query.getDatabase() : "unknown") + .setAttribute("command", query.getCommand()) + .startSpan(); + + Span queryBeforeSpan = tracer.spanBuilder("queryBefore") + .setParent(io.opentelemetry.context.Context.current().with(querySpan)) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + + try (Scope scope = queryBeforeSpan.makeCurrent()) { + query.setAttribute("querySpan", querySpan); + query.setAttribute("queryBeforeSpan", queryBeforeSpan); + } finally { + queryBeforeSpan.end(); + } + }); + } + + @Override + public CompletableFuture queryAfter(Query query, HttpResponse response) { + return CompletableFuture.runAsync(() -> { + Span querySpan = (Span) query.getAttribute("querySpan"); + if (querySpan != null) { + Span queryAfterSpan = tracer.spanBuilder("queryAfter") + .setParent(io.opentelemetry.context.Context.current().with(querySpan)) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + + try (Scope scope = queryAfterSpan.makeCurrent()) { + int statusCode = response.statusCode(); + queryAfterSpan.setAttribute("status_code", statusCode); + if (statusCode >= 400) { + String errorBody = response.bodyAsString(); + queryAfterSpan.setStatus(StatusCode.ERROR, "HTTP error: " + statusCode); + queryAfterSpan.setAttribute("error.message", errorBody); + querySpan.setStatus(StatusCode.ERROR, "Query failed: " + errorBody); + } else { + queryAfterSpan.setStatus(StatusCode.OK); + querySpan.setStatus(StatusCode.OK); + } + } finally { + queryAfterSpan.end(); + querySpan.end(); + } + } + }); + } + + @Override + public CompletableFuture writeBefore(Write write) { + return CompletableFuture.runAsync(() -> { + Span writeSpan = tracer.spanBuilder("write") + .setSpanKind(SpanKind.CLIENT) + .setAttribute("database", write.getDatabase()) + .setAttribute("retention_policy", write.getRetentionPolicy()) + .setAttribute("measurement", write.getMeasurement()) + .startSpan(); + + Span writeBeforeSpan = tracer.spanBuilder("writeBefore") + .setParent(io.opentelemetry.context.Context.current().with(writeSpan)) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + + try (Scope scope = writeBeforeSpan.makeCurrent()) { + write.setAttribute("writeSpan", writeSpan); + write.setAttribute("writeBeforeSpan", writeBeforeSpan); + } finally { + writeBeforeSpan.end(); + } + }); + } + + @Override + public CompletableFuture writeAfter(Write write, HttpResponse response) { + return CompletableFuture.runAsync(() -> { + Span writeSpan = (Span) write.getAttribute("writeSpan"); + if (writeSpan != null) { + Span writeAfterSpan = tracer.spanBuilder("writeAfter") + .setParent(io.opentelemetry.context.Context.current().with(writeSpan)) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + + try (Scope scope = writeAfterSpan.makeCurrent()) { + int statusCode = response.statusCode(); + writeAfterSpan.setAttribute("status_code", statusCode); + + if (statusCode >= 400) { + String errorBody = response.bodyAsString(); + writeAfterSpan.setStatus(StatusCode.ERROR, "HTTP error: " + statusCode); + writeAfterSpan.setAttribute("error.message", errorBody); + writeSpan.setStatus(StatusCode.ERROR, "Write failed: " + errorBody); + } else { + writeAfterSpan.setStatus(StatusCode.OK); + writeSpan.setStatus(StatusCode.OK); + } + } finally { + writeAfterSpan.end(); + writeSpan.end(); + } + } + }); + } +} diff --git a/opengemini-client/src/test/java/io/opengemini/client/interceptor/TracingIntegrationTest.java b/opengemini-client/src/test/java/io/opengemini/client/interceptor/TracingIntegrationTest.java new file mode 100644 index 00000000..c9f6110e --- /dev/null +++ b/opengemini-client/src/test/java/io/opengemini/client/interceptor/TracingIntegrationTest.java @@ -0,0 +1,189 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.interceptor; + +import io.github.openfacade.http.HttpClientConfig; +import io.opengemini.client.api.*; +import io.opengemini.client.impl.OpenGeminiClient; +import io.opengemini.client.impl.OpenGeminiClientFactory; +import io.opengemini.client.impl.OpenTelemetryConfig; +import lombok.Getter; +import lombok.Setter; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Example demonstrating OpenGemini client usage with interceptors. + */ + +@Getter +public class TracingIntegrationTest { + @Setter + private String database; + @Setter + private String retentionPolicy; + @Setter + private String lineProtocol; + private static final Logger LOG = LoggerFactory.getLogger(TracingIntegrationTest.class); + + private OpenGeminiClient openGeminiClient; + + @BeforeEach + void setUp() { + HttpClientConfig httpConfig = new HttpClientConfig.Builder() + .connectTimeout(Duration.ofSeconds(3)) + .timeout(Duration.ofSeconds(3)) + .build(); + Configuration configuration = Configuration.builder() + .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) + .httpConfig(httpConfig) + .authConfig(new AuthConfig(AuthType.PASSWORD, "test", "testPwd123@".toCharArray(), null)) + .gzipEnabled(false) + .build(); + this.openGeminiClient = new OpenGeminiClient(configuration); + } + + @Test + void testClientCreation() { + Configuration config = new Configuration(); + config.setAddresses(java.util.Collections.singletonList(new io.opengemini.client.api.Address("localhost", 8086))); + if (config.getHttpConfig() == null) { + config.setHttpConfig(new HttpClientConfig.Builder().build()); + } + + assertDoesNotThrow(() -> { + OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config); + assertNotNull(client, "OpenGeminiClient should be created successfully"); + client.close(); + }, "Client creation should not throw an exception"); + } + + @Test + void testDatabaseCreation() { + Configuration config = new Configuration(); + config.setAddresses(java.util.Collections.singletonList(new io.opengemini.client.api.Address("localhost", 8086))); + if (config.getHttpConfig() == null) { + config.setHttpConfig(new HttpClientConfig.Builder().build()); + } + + assertDoesNotThrow(() -> { + try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) { + Query createDbQuery = new Query("CREATE DATABASE test_db"); + client.query(createDbQuery).get(10, TimeUnit.SECONDS); + } + }, "Database creation should not throw an exception"); + } + + @Test + void testQueryOperation() { + Configuration config = new Configuration(); + config.setAddresses(java.util.Collections.singletonList(new Address("localhost", 8086))); + if (config.getHttpConfig() == null) { + config.setHttpConfig(new HttpClientConfig.Builder().build()); + } + + assertDoesNotThrow(() -> { + try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) { + Query createDbQuery = new Query("CREATE DATABASE test_db"); + client.query(createDbQuery).get(10, TimeUnit.SECONDS); + + Query showDbQuery = new Query("SHOW DATABASES"); + QueryResult result = client.query(showDbQuery).get(10, TimeUnit.SECONDS); + assertNotNull(result, "Query result should not be null"); + } + }, "Query operation should not throw an exception"); + } + + @BeforeAll + static void initializeTracing() { + OpenTelemetryConfig.initialize(); + } + + @Test + void testWriteOperation() { + Configuration config = new Configuration(); + config.setAddresses(java.util.Collections.singletonList( + new Address("localhost", 8086))); + + if (config.getHttpConfig() == null) { + config.setHttpConfig(new HttpClientConfig.Builder().build()); + } + + assertDoesNotThrow(() -> { + try (OpenGeminiClient client = OpenGeminiClientFactory.createClientWithInterceptors(config)) { + Query createDbQuery = new Query("CREATE DATABASE test_db"); + client.query(createDbQuery).get(10, TimeUnit.SECONDS); + + Thread.sleep(1000); + + Write write = new Write( + "test_db", + "autogen", + "temperature", + "temperature,location=room1 value=25.5 " + System.currentTimeMillis(), + "ns" + ); + + client.executeWrite( + write.getDatabase(), + write.getRetentionPolicy(), + write.getLineProtocol() + ).get(10, TimeUnit.SECONDS); + } + }, "Write operation should not throw an exception"); + } + + @Test + void testTracingIntegration() throws ExecutionException, InterruptedException { + String databaseTestName = "tracing_test_db"; + CompletableFuture createdb = openGeminiClient.createDatabase(databaseTestName); + createdb.get(); + + assertDoesNotThrow(() -> { + + Write write = new Write( + "tracing_test_db", + "autogen", + "tracing_measurement", + "tracing_measurement,tag=test value=8 " + System.currentTimeMillis(), + "ns" + ); + + openGeminiClient.executeWrite( + write.getDatabase(), + write.getRetentionPolicy(), + write.getLineProtocol() + ).get(10, TimeUnit.SECONDS); + + Query query = new Query("SELECT * FROM tracing_measurement"); + openGeminiClient.query(query).get(10, TimeUnit.SECONDS); + + }, "Tracing integration should not throw an exception"); + } +} diff --git a/pom.xml b/pom.xml index 031041c6..9c2f756e 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,39 @@ + + + io.opentelemetry + opentelemetry-api + 1.28.0 + + + io.opentelemetry + opentelemetry-sdk + 1.28.0 + + + io.opentelemetry + opentelemetry-exporter-jaeger + 1.28.0 + + + io.opentelemetry + opentelemetry-exporter-otlp + 1.32.0 + + + + + com.squareup.okhttp3 + okhttp + ${okhttp.version} + + + io.opentelemetry + opentelemetry-semconv + 1.28.0-alpha + org.projectlombok lombok @@ -193,6 +226,7 @@ protobuf-maven-plugin ${maven-protobuf-maven-plugin} + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} @@ -206,6 +240,7 @@ grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} @@ -219,6 +254,7 @@ grpc-vertx + io.vertx:vertx-grpc-protoc-plugin:${vertx.version}:exe:${os.detected.classifier} @@ -306,6 +342,7 @@ true false forked-path + -Dgpg.passphrase=${gpg.passphrase} @@ -353,7 +390,7 @@ true central - true + @@ -413,6 +450,7 @@ + ${gpg.passphrase} --pinentry-mode