Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
244b912
feat: add grpc dependency management
castlighting Oct 28, 2024
60abedc
Merge pull request #1 from openGemini/main
castlighting Oct 28, 2024
e5e9be8
feat: add grpc dependencies
castlighting Oct 28, 2024
d4ed300
Merge branch 'main' into new-protocol-write
castlighting Oct 28, 2024
8c6a7e4
feat: Add RPC Client Config
castlighting Oct 28, 2024
d22ca7b
refactor: add target address and remove host and port field
castlighting Oct 30, 2024
99c5509
feat: grpc module
castlighting Oct 30, 2024
991c846
feat: Add write proto file
castlighting Oct 30, 2024
8d47ad1
Init project
castlighting Oct 30, 2024
e68016d
refactor: Grpc protoc generate executions
castlighting Oct 30, 2024
d8a93e8
refactor: GRPC protobuf protoc generate executions
castlighting Oct 30, 2024
40bd70a
feat: Add failsafe dependecy
castlighting Oct 30, 2024
3b5e2f5
feat: Add grpc-netty denpendency
castlighting Oct 30, 2024
cd4989c
feat: Add RPC waitForReady property
castlighting Oct 30, 2024
943c2bc
feat: RpcClientSupplier
castlighting Oct 30, 2024
ac20d02
feat: RpcClientConnectionManager add newStub and close
castlighting Oct 30, 2024
05e449f
feat: Add Common Service Impl
castlighting Oct 30, 2024
975a1a3
feat: WriteService Impl
castlighting Oct 30, 2024
963260e
feat: RpcClient
castlighting Oct 30, 2024
76d2ac9
feat: Write GRPC Client
castlighting Oct 31, 2024
183392c
refactor: Remove target fileds and add host,port fields
castlighting Oct 31, 2024
f6e5bc7
test: Init grpc test env
castlighting Oct 31, 2024
2f34472
feat: RPC write interface
castlighting Oct 31, 2024
24f650c
feat: RpcClient add writeRows
castlighting Oct 31, 2024
14d685c
feat: add Record to byte block
castlighting Nov 7, 2024
42d1fef
fix: Client dependency
castlighting Nov 7, 2024
c57f927
test: refactor test data
castlighting Nov 7, 2024
4bb8df3
doc: record binary structure
castlighting Nov 7, 2024
0c8c936
refactor: Remove the lineProtocol argument and replace it with the po…
castlighting Nov 12, 2024
7e44b4c
feat: PontConvert add extractColVlas and extractSchema methods
castlighting Nov 12, 2024
9fde4a6
Update WriteService.java
castlighting Nov 12, 2024
55229fb
Update RpcClientTest.java
castlighting Nov 12, 2024
6dcedad
feat: Add tag to colVals
castlighting Nov 13, 2024
fc3de7e
fix: Add tag and time field process colval
castlighting Nov 13, 2024
e83a3c4
fix: Colval set offset byte array
castlighting Nov 13, 2024
fb6eecc
fix: ColVal marshal
castlighting Nov 13, 2024
aa302dd
fix: Field marshal
castlighting Nov 13, 2024
ad3bbc1
fix: Field marshal
castlighting Nov 13, 2024
2d93237
fix: Field marshal length
castlighting Nov 13, 2024
da777d4
test: Add setTime
castlighting Nov 13, 2024
4e89875
fix: Write long zigzag
castlighting Nov 13, 2024
492e0c5
fix: Value write LE
castlighting Nov 14, 2024
0a4aeea
feat: WriteByRpc method add measurement argument
castlighting Nov 14, 2024
b5a78fc
feat: Add setMeasurement
castlighting Nov 14, 2024
f277e26
Update BaseAsyncClient.java
castlighting Nov 14, 2024
fb3465e
fix: Time field last column
castlighting Nov 14, 2024
b71c616
feat: add grpc ssl options
castlighting Nov 18, 2024
a0343f7
refactor: remove recMeta field
castlighting Nov 18, 2024
136c5a8
refactor: remove recMeta
castlighting Nov 18, 2024
5dc4299
Update WriteService.java
castlighting Nov 18, 2024
1982315
Update WriteService.java
castlighting Nov 18, 2024
f3b990a
Update RpcClientTest.java
castlighting Nov 18, 2024
b09d846
Update PointConverter.java
castlighting Nov 18, 2024
8f1d845
fix: release buffer
castlighting Nov 18, 2024
c0f4d91
refactor: remove unse method
castlighting Nov 18, 2024
1bdcbcd
doc: add license comment
castlighting Nov 18, 2024
a72da69
refactor: license comment style
castlighting Nov 18, 2024
c7b348a
doc: remove chinese annotation
castlighting Nov 19, 2024
67fca68
doc: Remove chinese annotations
castlighting Nov 19, 2024
ef6094d
remove: blank line
castlighting Nov 19, 2024
75b926e
fix: add RpcClient close
castlighting Nov 19, 2024
dac1935
refactor: Rename execution name grpc-vertx to vertx-grpc
castlighting Nov 23, 2024
914dfc3
refactor: Remove failsafe dependecy
castlighting Nov 23, 2024
cb1be18
refactor: RpcClientConfig rename to GrpcConfig
castlighting Nov 27, 2024
ebcded9
refactor: Rename executeWriteByRpc to executeWriteByGrpc
castlighting Nov 27, 2024
c2224d4
refactor: rename ssl config
castlighting Nov 27, 2024
51abc86
refactor: Rename RpcClient to GrpcClient
castlighting Nov 28, 2024
a5b4d50
refactor: remove import *
castlighting Dec 7, 2024
cd246ca
Merge branch 'new-protocol-write' of https://github.com/castlighting/…
castlighting Dec 7, 2024
35a859f
refactor: write proto file
castlighting Dec 10, 2024
9e6221a
refactor: remove measurement arg
castlighting Dec 10, 2024
2cb6680
fix: rename proto package
castlighting Dec 10, 2024
d8406bc
refactor: Remove grpc module
castlighting Dec 17, 2024
490bfd3
refactor: ip
castlighting Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ public class Configuration {
boolean gzipEnabled;

HttpClientConfig httpConfig;

GrpcConfig rpcConfig;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.api;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@ToString
@Builder
public class GrpcConfig {
private String host;
private Integer port;
private String username;
private String password;
private boolean useSSL = false;
private boolean waitForReady = true;
private String caCertPath;
private String clientCertPath;
private String clientKeyPath;
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ public interface OpenGeminiAsyncClient extends AutoCloseable {
*/
CompletableFuture<Void> write(String database, String retentionPolicy, Point point);

/**
* Writing via GRPC points to the database.
*
* @param database the name of the database.
* @param points the points to write.
*/
CompletableFuture<Void> writeByGrpc(String database, List<Point> points);

/**
* Write points to the database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,22 @@ public CompletableFuture<Void> write(String database, String retentionPolicy, Li
if (points.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
String body = toLineProtocol(points);
if (StringUtils.isEmpty(body)) {
return CompletableFuture.completedFuture(null);
}
return executeWrite(database, retentionPolicy, body);
}

@Override
public CompletableFuture<Void> writeByGrpc(String database, List<Point> points) {
if (points.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return executeWriteByGrpc(database, points);
}

private static String toLineProtocol(List<Point> points) {
StringJoiner sj = new StringJoiner("\n");
for (Point point : points) {
String lineProtocol = point.lineProtocol();
Expand All @@ -147,11 +163,7 @@ public CompletableFuture<Void> write(String database, String retentionPolicy, Li
}
sj.add(lineProtocol);
}
String body = sj.toString();
if (StringUtils.isEmpty(body)) {
return CompletableFuture.completedFuture(null);
}
return executeWrite(database, retentionPolicy, body);
return sj.toString();
}

/**
Expand Down Expand Up @@ -187,6 +199,15 @@ protected abstract CompletableFuture<Void> executeWrite(String database,
String retentionPolicy,
String lineProtocol);

/**
* The implementation class needs to implement this method to execute a write operation via an RPC call.
*
* @param database the name of the database.
* @param points the points to write.
*/
protected abstract CompletableFuture<Void> executeWriteByGrpc(String database,
List<Point> points);

/**
* The implementation class needs to implement this method to execute a ping call.
*/
Expand Down
35 changes: 35 additions & 0 deletions opengemini-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,41 @@
<version>${okhttp.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc</artifactId>
<version>${vertx-grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<version>${vertx-grpc.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.HttpClientFactory;
import io.github.openfacade.http.HttpResponse;
import io.opengemini.client.api.AuthConfig;
import io.opengemini.client.api.AuthType;
import io.opengemini.client.api.Configuration;
import io.opengemini.client.api.OpenGeminiException;
import io.opengemini.client.api.Pong;
import io.opengemini.client.api.Query;
import io.opengemini.client.api.QueryResult;
import io.opengemini.client.api.*;
import io.opengemini.client.common.BaseAsyncClient;
import io.opengemini.client.common.HeaderConst;
import io.opengemini.client.common.JacksonService;
import io.opengemini.client.impl.grpc.GrpcClient;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -43,6 +39,8 @@ public class OpenGeminiClient extends BaseAsyncClient {

private final HttpClient client;

private final GrpcClient grpcClient;

public OpenGeminiClient(@NotNull Configuration conf) {
super(conf);
this.conf = conf;
Expand All @@ -53,6 +51,12 @@ public OpenGeminiClient(@NotNull Configuration conf) {
new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword())));
}
this.client = HttpClientFactory.createHttpClient(httpConfig);

if (conf.getRpcConfig() != null) {
grpcClient = GrpcClient.create(conf.getRpcConfig());
} else {
grpcClient = null;
}
}

/**
Expand Down Expand Up @@ -90,6 +94,20 @@ protected CompletableFuture<Void> executeWrite(String database, String retention
return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class));
}

/**
* Execute a write call with RPC client
*
* @param database the name of the database.
* @param points the points to write.
*/
@Override
protected CompletableFuture<Void> executeWriteByGrpc(String database, List<Point> points) {
if (grpcClient == null) {
throw new IllegalStateException("RPC client not initialized");
}
return grpcClient.getWriteClient().writeRows(database, points);
}

/**
* Execute a ping call with java HttpClient.
*/
Expand Down Expand Up @@ -127,12 +145,15 @@ public CompletableFuture<HttpResponse> get(String url) {

public CompletableFuture<HttpResponse> post(String url, String body) {
return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8),
headers);
headers);
}

@Override
public void close() throws IOException {
this.client.close();
if (this.grpcClient != null) {
this.grpcClient.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.impl.grpc;

import io.opengemini.client.api.GrpcConfig;
import io.opengemini.client.impl.grpc.service.WriteService;
import io.opengemini.client.impl.grpc.support.RpcClientSupplier;

public class GrpcClient {
private final GrpcConfig config;
private final RpcClientConnectionManager connectionManager;

private final RpcClientSupplier<WriteService> writeClient;

public static GrpcClient create(final GrpcConfig config) {
return new GrpcClient(config);
}

private GrpcClient(GrpcConfig config) {
this.config = config;
this.connectionManager = new RpcClientConnectionManager(config);
this.writeClient = new RpcClientSupplier<>(() -> {
try {
return new WriteService(this.connectionManager);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}


public WriteService getWriteClient() {
return writeClient.get();
}

public synchronized void close() {
writeClient.close();
connectionManager.close();
}
}
Loading