Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions opengemini-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,16 @@
<version>${annotations.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Setter;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

@Getter
Expand All @@ -40,6 +41,36 @@ public class Point {
private Map<String, String> tags;
private Map<String, Object> fields;

public Point() {
this.fields = new HashMap<>();
this.tags = new HashMap<>();
}

public Point measurement(String measurement) {
this.measurement = measurement;
return this;
}

public Point addTag(String key, String value) {
if (key != null && value != null) {
this.tags.put(key, value);
}
return this;
}

public Point addField(String key, Object value) {
if (key != null && value != null) {
this.fields.put(key, value);
}
return this;
}

public Point time(long time, Precision precision) {
this.time = time;
this.precision = precision;
return this;
}

/**
* Calculate the line protocol string for this point
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import lombok.Getter;
import lombok.Setter;

import java.util.HashMap;
import java.util.Map;

@AllArgsConstructor
@Getter
@Setter
Expand All @@ -44,6 +47,8 @@ public class Query {
*/
private Precision precision;

private Map<String, Object> attributes = new HashMap<>();

public Query(String command) {
this.command = command;
}
Expand All @@ -53,4 +58,19 @@ public Query(String command, String database, String retentionPolicy) {
this.database = database;
this.retentionPolicy = retentionPolicy;
}

public Query(String command, String database, String retentionPolicy, Precision precision) {
this.command = command;
this.database = database;
this.retentionPolicy = retentionPolicy;
this.precision = precision;
}

public void setAttribute(String key, Object value) {
attributes.put(key, value);
}

public Object getAttribute(String key) {
return attributes.get(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.api;

import lombok.Getter;
import lombok.Setter;

import java.util.HashMap;
import java.util.Map;

@Getter
@Setter
public class Write {
private String database;
private String retentionPolicy;
private String measurement;
private String lineProtocol;
private String precision;

private Map<String, Object> attributes = new HashMap<>();

public Write(String database, String retentionPolicy, String measurement,
String lineProtocol, String precision) {
this.database = database;
this.retentionPolicy = retentionPolicy;
this.measurement = measurement;
this.lineProtocol = lineProtocol;
this.precision = precision;
}

public void setAttribute(String key, Object value) {
attributes.put(key, value);
}

public Object getAttribute(String key) {
return attributes.get(key);
}
}
12 changes: 12 additions & 0 deletions opengemini-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,17 @@
<version>${vertx.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,46 @@
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.HttpClientFactory;
import io.github.openfacade.http.HttpResponse;
import io.opengemini.client.api.AuthConfig;
import io.opengemini.client.api.AuthType;
import io.opengemini.client.api.Configuration;
import io.opengemini.client.api.OpenGeminiAsyncClient;
import io.opengemini.client.api.OpenGeminiException;
import io.opengemini.client.api.Point;
import io.opengemini.client.api.Pong;
import io.opengemini.client.api.Query;
import io.opengemini.client.api.QueryResult;
import io.opengemini.client.api.RetentionPolicy;
import io.opengemini.client.api.RpConfig;
import io.opengemini.client.api.*;
import io.opengemini.client.common.BaseClient;
import io.opengemini.client.common.CommandFactory;
import io.opengemini.client.common.HeaderConst;
import io.opengemini.client.common.JacksonService;
import io.opengemini.client.common.ResultMapper;
import io.opengemini.client.interceptor.Interceptor;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.*;
import java.util.concurrent.CompletableFuture;

public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
private final List<Interceptor> interceptors = new ArrayList<>();
protected final Configuration conf;

private final HttpClient client;

public OpenGeminiClient(@NotNull Configuration conf) {
super(conf);
this.conf = conf;
AuthConfig authConfig = conf.getAuthConfig();
HttpClientConfig httpConfig = conf.getHttpConfig();
if (httpConfig == null) {
httpConfig = new HttpClientConfig.Builder().build();
conf.setHttpConfig(httpConfig);
}
if (authConfig != null && authConfig.getAuthType().equals(AuthType.PASSWORD)) {
httpConfig.addRequestFilter(
new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword())));
}
this.client = HttpClientFactory.createHttpClient(httpConfig);
}

public void addInterceptors(Interceptor... interceptors) {
Collections.addAll(this.interceptors, interceptors);
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -195,9 +192,22 @@ public CompletableFuture<Pong> ping() {
*
* @param query the query to execute.
*/
protected CompletableFuture<QueryResult> executeQuery(Query query) {
public CompletableFuture<QueryResult> executeQuery(Query query) {
String queryUrl = getQueryUrl(query);
return get(queryUrl).thenCompose(response -> convertResponse(response, QueryResult.class));
CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.queryBefore(query))
.toArray(CompletableFuture[]::new)
);

return beforeFutures.thenCompose(voidResult -> executeHttpQuery(query).thenCompose(response -> {
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.queryAfter(query, response))
.toArray(CompletableFuture[]::new)
);
return afterFutures.thenCompose(voidResult2 -> convertResponse(response, QueryResult.class));
}));
}

/**
Expand All @@ -217,9 +227,33 @@ protected CompletableFuture<QueryResult> executePostQuery(Query query) {
* @param retentionPolicy the name of the retention policy.
* @param lineProtocol the line protocol string to write.
*/
protected CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
public CompletableFuture<Void> executeWrite(String database, String retentionPolicy, String lineProtocol) {
String writeUrl = getWriteUrl(database, retentionPolicy);
return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class));
Write write = new Write(
database,
retentionPolicy,
"default_measurement",
lineProtocol,
"ns"
);

CompletableFuture<Void> beforeFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.writeBefore(write))
.toArray(CompletableFuture[]::new)
);

return beforeFutures.thenCompose(voidResult ->
executeHttpWrite(write).thenCompose(response -> {
CompletableFuture<Void> afterFutures = CompletableFuture.allOf(
interceptors.stream()
.map(interceptor -> interceptor.writeAfter(write, response))
.toArray(CompletableFuture[]::new)
);
return afterFutures.thenCompose(voidResult2 ->
convertResponse(response, Void.class));
})
);
}

/**
Expand Down Expand Up @@ -258,7 +292,7 @@ private CompletableFuture<HttpResponse> get(String url) {

private CompletableFuture<HttpResponse> post(String url, String body) {
return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8),
headers);
headers);
}

@Override
Expand All @@ -270,4 +304,14 @@ public void close() throws IOException {
public String toString() {
return "OpenGeminiClient{" + "httpEngine=" + conf.getHttpConfig().engine() + '}';
}

private CompletableFuture<HttpResponse> executeHttpQuery(Query query) {
String queryUrl = getQueryUrl(query);
return get(queryUrl);
}

private CompletableFuture<HttpResponse> executeHttpWrite(Write write) {
String writeUrl = getWriteUrl(write.getDatabase(), write.getRetentionPolicy());
return post(writeUrl, write.getLineProtocol());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import io.opengemini.client.api.BatchConfig;
import io.opengemini.client.api.Configuration;
import io.opengemini.client.api.OpenGeminiException;
import io.opengemini.client.interceptor.Interceptor;
import io.opengemini.client.interceptor.OtelInterceptor;
import org.jetbrains.annotations.NotNull;

import java.util.List;

public class OpenGeminiClientFactory {
public static OpenGeminiClient create(@NotNull Configuration configuration) throws OpenGeminiException {
if (configuration.getAddresses() == null || configuration.getAddresses().isEmpty()) {
Expand Down Expand Up @@ -54,4 +58,13 @@ public static OpenGeminiClient create(@NotNull Configuration configuration) thro
}
return new OpenGeminiClient(configuration);
}

public static OpenGeminiClient createClientWithInterceptors(Configuration config) throws OpenGeminiException {
OpenGeminiClient client = create(config);
List<Interceptor> interceptors = List.of(
new OtelInterceptor()
);
client.addInterceptors(interceptors.toArray(new Interceptor[0]));
return client;
}
}
Loading
Loading