diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseAsyncClient.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseAsyncClient.java deleted file mode 100644 index 60cf32a4..00000000 --- a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseAsyncClient.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright 2024 openGemini Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.opengemini.client.common; - -import io.opengemini.client.api.Configuration; -import io.opengemini.client.api.OpenGeminiAsyncClient; -import io.opengemini.client.api.Point; -import io.opengemini.client.api.Pong; -import io.opengemini.client.api.Query; -import io.opengemini.client.api.QueryResult; -import io.opengemini.client.api.RetentionPolicy; -import io.opengemini.client.api.RpConfig; -import org.apache.commons.lang3.StringUtils; - -import java.util.List; -import java.util.StringJoiner; -import java.util.concurrent.CompletableFuture; - -public abstract class BaseAsyncClient extends BaseClient implements OpenGeminiAsyncClient { - - public BaseAsyncClient(Configuration conf) { - super(conf); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture createDatabase(String database) { - String command = CommandFactory.createDatabase(database); - Query query = new Query(command); - return executePostQuery(query).thenApply(rsp -> null); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture dropDatabase(String database) { - String command = CommandFactory.dropDatabase(database); - Query query = new Query(command); - return executePostQuery(query).thenApply(rsp -> null); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture> showDatabases() { - String command = CommandFactory.showDatabases(); - Query query = new Query(command); - return executeQuery(query).thenApply(ResultMapper::toDatabases); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture createRetentionPolicy(String database, RpConfig rpConfig, boolean isDefault) { - String command = CommandFactory.createRetentionPolicy(database, rpConfig, isDefault); - Query query = new Query(command); - return executePostQuery(query).thenApply(rsp -> null); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture> showRetentionPolicies(String database) { - if (StringUtils.isBlank(database)) { - return null; - } - - String command = CommandFactory.showRetentionPolicies(database); - Query query = new Query(command); - query.setDatabase(database); - return executeQuery(query).thenApply(ResultMapper::toRetentionPolicies); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture dropRetentionPolicy(String database, String retentionPolicy) { - String command = CommandFactory.dropRetentionPolicy(database, retentionPolicy); - Query query = new Query(command); - return executePostQuery(query).thenApply(rsp -> null); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture query(Query query) { - return executeQuery(query); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture write(String database, Point point) { - return write(database, null, point); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture write(String database, List points) { - return write(database, null, points); - } - - @Override - public CompletableFuture write(String database, String retentionPolicy, Point point) { - String body = point.lineProtocol(); - if (StringUtils.isEmpty(body)) { - return CompletableFuture.completedFuture(null); - } - return executeWrite(database, retentionPolicy, body); - } - - @Override - public CompletableFuture write(String database, String retentionPolicy, List points) { - if (points.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - StringJoiner sj = new StringJoiner("\n"); - for (Point point : points) { - String lineProtocol = point.lineProtocol(); - if (StringUtils.isEmpty(lineProtocol)) { - continue; - } - sj.add(lineProtocol); - } - String body = sj.toString(); - if (StringUtils.isEmpty(body)) { - return CompletableFuture.completedFuture(null); - } - return executeWrite(database, retentionPolicy, body); - } - - /** - * {@inheritDoc} - */ - @Override - public CompletableFuture ping() { - return executePing(); - } - - /** - * The implementation class needs to implement this method to execute a GET query call. - * - * @param query the query to execute. - */ - protected abstract CompletableFuture executeQuery(Query query); - - /** - * The implementation class needs to implement this method to execute a POST query call. - * - * @param query the query to execute. - */ - protected abstract CompletableFuture executePostQuery(Query query); - - /** - * The implementation class needs to implement this method to execute a write call. - * - * @param database the name of the database. - * @param retentionPolicy the name of the retention policy. - * @param lineProtocol the line protocol string to write. - */ - protected abstract CompletableFuture executeWrite(String database, - String retentionPolicy, - String lineProtocol); - - /** - * The implementation class needs to implement this method to execute a ping call. - */ - protected abstract CompletableFuture executePing(); - -} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java index 1957257c..7621b907 100644 --- a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java @@ -24,21 +24,30 @@ import io.opengemini.client.api.AuthConfig; import io.opengemini.client.api.AuthType; import io.opengemini.client.api.Configuration; +import io.opengemini.client.api.OpenGeminiAsyncClient; import io.opengemini.client.api.OpenGeminiException; +import io.opengemini.client.api.Point; import io.opengemini.client.api.Pong; import io.opengemini.client.api.Query; import io.opengemini.client.api.QueryResult; -import io.opengemini.client.common.BaseAsyncClient; +import io.opengemini.client.api.RetentionPolicy; +import io.opengemini.client.api.RpConfig; +import io.opengemini.client.common.BaseClient; +import io.opengemini.client.common.CommandFactory; import io.opengemini.client.common.HeaderConst; import io.opengemini.client.common.JacksonService; +import io.opengemini.client.common.ResultMapper; +import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Optional; +import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; -public class OpenGeminiClient extends BaseAsyncClient { +public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient { protected final Configuration conf; private final HttpClient client; @@ -55,12 +64,137 @@ public OpenGeminiClient(@NotNull Configuration conf) { this.client = HttpClientFactory.createHttpClient(httpConfig); } + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture createDatabase(String database) { + String command = CommandFactory.createDatabase(database); + Query query = new Query(command); + return executePostQuery(query).thenApply(rsp -> null); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture dropDatabase(String database) { + String command = CommandFactory.dropDatabase(database); + Query query = new Query(command); + return executePostQuery(query).thenApply(rsp -> null); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture> showDatabases() { + String command = CommandFactory.showDatabases(); + Query query = new Query(command); + return executeQuery(query).thenApply(ResultMapper::toDatabases); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture createRetentionPolicy(String database, RpConfig rpConfig, boolean isDefault) { + String command = CommandFactory.createRetentionPolicy(database, rpConfig, isDefault); + Query query = new Query(command); + return executePostQuery(query).thenApply(rsp -> null); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture> showRetentionPolicies(String database) { + if (StringUtils.isBlank(database)) { + return null; + } + + String command = CommandFactory.showRetentionPolicies(database); + Query query = new Query(command); + query.setDatabase(database); + return executeQuery(query).thenApply(ResultMapper::toRetentionPolicies); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture dropRetentionPolicy(String database, String retentionPolicy) { + String command = CommandFactory.dropRetentionPolicy(database, retentionPolicy); + Query query = new Query(command); + return executePostQuery(query).thenApply(rsp -> null); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture query(Query query) { + return executeQuery(query); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture write(String database, Point point) { + return write(database, null, point); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture write(String database, List points) { + return write(database, null, points); + } + + @Override + public CompletableFuture write(String database, String retentionPolicy, Point point) { + String body = point.lineProtocol(); + if (StringUtils.isEmpty(body)) { + return CompletableFuture.completedFuture(null); + } + return executeWrite(database, retentionPolicy, body); + } + + @Override + public CompletableFuture write(String database, String retentionPolicy, List points) { + if (points.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + StringJoiner sj = new StringJoiner("\n"); + for (Point point : points) { + String lineProtocol = point.lineProtocol(); + if (StringUtils.isEmpty(lineProtocol)) { + continue; + } + sj.add(lineProtocol); + } + String body = sj.toString(); + if (StringUtils.isEmpty(body)) { + return CompletableFuture.completedFuture(null); + } + return executeWrite(database, retentionPolicy, body); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture ping() { + return executePing(); + } + /** * Execute a GET query call with java HttpClient. * * @param query the query to execute. */ - @Override protected CompletableFuture executeQuery(Query query) { String queryUrl = getQueryUrl(query); return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class)); @@ -71,7 +205,6 @@ protected CompletableFuture executeQuery(Query query) { * * @param query the query to execute. */ - @Override protected CompletableFuture executePostQuery(Query query) { String queryUrl = getQueryUrl(query); return post(queryUrl, null).thenCompose(response -> convertResponse(response, QueryResult.class)); @@ -84,7 +217,6 @@ protected CompletableFuture executePostQuery(Query query) { * @param retentionPolicy the name of the retention policy. * @param lineProtocol the line protocol string to write. */ - @Override protected CompletableFuture executeWrite(String database, String retentionPolicy, String lineProtocol) { String writeUrl = getWriteUrl(database, retentionPolicy); return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class)); @@ -93,7 +225,6 @@ protected CompletableFuture executeWrite(String database, String retention /** * Execute a ping call with java HttpClient. */ - @Override protected CompletableFuture executePing() { String pingUrl = getPingUrl(); return get(pingUrl).thenApply(response -> Optional.ofNullable(response.headers().get(HeaderConst.VERSION)) @@ -121,11 +252,11 @@ protected CompletableFuture executePing() { } } - public CompletableFuture get(String url) { + private CompletableFuture get(String url) { return client.get(buildUriWithPrefix(url), headers); } - public CompletableFuture post(String url, String body) { + private CompletableFuture post(String url, String body) { return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8), headers); }