diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java index daf22422..9b59eda0 100644 --- a/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java @@ -40,4 +40,6 @@ public class Configuration { boolean gzipEnabled; HttpClientConfig httpConfig; + + GrpcConfig rpcConfig; } diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/GrpcConfig.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/GrpcConfig.java new file mode 100644 index 00000000..fe1d3eb7 --- /dev/null +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/GrpcConfig.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.api; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@ToString +@Builder +public class GrpcConfig { + private String host; + private Integer port; + private String username; + private String password; + private boolean useSSL = false; + private boolean waitForReady = true; + private String caCertPath; + private String clientCertPath; + private String clientKeyPath; +} diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/OpenGeminiAsyncClient.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/OpenGeminiAsyncClient.java index cd87ecff..5a45a2c9 100644 --- a/opengemini-client-api/src/main/java/io/opengemini/client/api/OpenGeminiAsyncClient.java +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/OpenGeminiAsyncClient.java @@ -99,6 +99,14 @@ public interface OpenGeminiAsyncClient extends AutoCloseable { */ CompletableFuture write(String database, String retentionPolicy, Point point); + /** + * Writing via GRPC points to the database. + * + * @param database the name of the database. + * @param points the points to write. + */ + CompletableFuture writeByGrpc(String database, List points); + /** * Write points to the database. * diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseAsyncClient.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseAsyncClient.java index 60cf32a4..7a9a9e07 100644 --- a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseAsyncClient.java +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseAsyncClient.java @@ -139,6 +139,22 @@ public CompletableFuture write(String database, String retentionPolicy, Li if (points.isEmpty()) { return CompletableFuture.completedFuture(null); } + String body = toLineProtocol(points); + if (StringUtils.isEmpty(body)) { + return CompletableFuture.completedFuture(null); + } + return executeWrite(database, retentionPolicy, body); + } + + @Override + public CompletableFuture writeByGrpc(String database, List points) { + if (points.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return executeWriteByGrpc(database, points); + } + + private static String toLineProtocol(List points) { StringJoiner sj = new StringJoiner("\n"); for (Point point : points) { String lineProtocol = point.lineProtocol(); @@ -147,11 +163,7 @@ public CompletableFuture write(String database, String retentionPolicy, Li } sj.add(lineProtocol); } - String body = sj.toString(); - if (StringUtils.isEmpty(body)) { - return CompletableFuture.completedFuture(null); - } - return executeWrite(database, retentionPolicy, body); + return sj.toString(); } /** @@ -187,6 +199,15 @@ protected abstract CompletableFuture executeWrite(String database, String retentionPolicy, String lineProtocol); + /** + * The implementation class needs to implement this method to execute a write operation via an RPC call. + * + * @param database the name of the database. + * @param points the points to write. + */ + protected abstract CompletableFuture executeWriteByGrpc(String database, + List points); + /** * The implementation class needs to implement this method to execute a ping call. */ diff --git a/opengemini-client/pom.xml b/opengemini-client/pom.xml index 25f7420e..e6c0418c 100644 --- a/opengemini-client/pom.xml +++ b/opengemini-client/pom.xml @@ -54,6 +54,41 @@ ${okhttp.version} provided + + io.grpc + grpc-netty + + + io.grpc + grpc-stub + + + io.grpc + grpc-protobuf + + + io.vertx + vertx-grpc + ${vertx-grpc.version} + + + io.grpc + grpc-testing + test + + + io.vertx + vertx-junit5 + ${vertx-grpc.version} + test + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java index a3af405e..e69d7ca9 100644 --- a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java @@ -21,20 +21,16 @@ import io.github.openfacade.http.HttpClientConfig; import io.github.openfacade.http.HttpClientFactory; import io.github.openfacade.http.HttpResponse; -import io.opengemini.client.api.AuthConfig; -import io.opengemini.client.api.AuthType; -import io.opengemini.client.api.Configuration; -import io.opengemini.client.api.OpenGeminiException; -import io.opengemini.client.api.Pong; -import io.opengemini.client.api.Query; -import io.opengemini.client.api.QueryResult; +import io.opengemini.client.api.*; import io.opengemini.client.common.BaseAsyncClient; import io.opengemini.client.common.HeaderConst; import io.opengemini.client.common.JacksonService; +import io.opengemini.client.impl.grpc.GrpcClient; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -43,6 +39,8 @@ public class OpenGeminiClient extends BaseAsyncClient { private final HttpClient client; + private final GrpcClient grpcClient; + public OpenGeminiClient(@NotNull Configuration conf) { super(conf); this.conf = conf; @@ -53,6 +51,12 @@ public OpenGeminiClient(@NotNull Configuration conf) { new BasicAuthRequestFilter(authConfig.getUsername(), String.valueOf(authConfig.getPassword()))); } this.client = HttpClientFactory.createHttpClient(httpConfig); + + if (conf.getRpcConfig() != null) { + grpcClient = GrpcClient.create(conf.getRpcConfig()); + } else { + grpcClient = null; + } } /** @@ -90,6 +94,20 @@ protected CompletableFuture executeWrite(String database, String retention return post(writeUrl, lineProtocol).thenCompose(response -> convertResponse(response, Void.class)); } + /** + * Execute a write call with RPC client + * + * @param database the name of the database. + * @param points the points to write. + */ + @Override + protected CompletableFuture executeWriteByGrpc(String database, List points) { + if (grpcClient == null) { + throw new IllegalStateException("RPC client not initialized"); + } + return grpcClient.getWriteClient().writeRows(database, points); + } + /** * Execute a ping call with java HttpClient. */ @@ -127,12 +145,15 @@ public CompletableFuture get(String url) { public CompletableFuture post(String url, String body) { return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8), - headers); + headers); } @Override public void close() throws IOException { this.client.close(); + if (this.grpcClient != null) { + this.grpcClient.close(); + } } @Override diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/GrpcClient.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/GrpcClient.java new file mode 100644 index 00000000..4e61afef --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/GrpcClient.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc; + +import io.opengemini.client.api.GrpcConfig; +import io.opengemini.client.impl.grpc.service.WriteService; +import io.opengemini.client.impl.grpc.support.RpcClientSupplier; + +public class GrpcClient { + private final GrpcConfig config; + private final RpcClientConnectionManager connectionManager; + + private final RpcClientSupplier writeClient; + + public static GrpcClient create(final GrpcConfig config) { + return new GrpcClient(config); + } + + private GrpcClient(GrpcConfig config) { + this.config = config; + this.connectionManager = new RpcClientConnectionManager(config); + this.writeClient = new RpcClientSupplier<>(() -> { + try { + return new WriteService(this.connectionManager); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + + public WriteService getWriteClient() { + return writeClient.get(); + } + + public synchronized void close() { + writeClient.close(); + connectionManager.close(); + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/RpcClientConnectionManager.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/RpcClientConnectionManager.java new file mode 100644 index 00000000..3531c97a --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/RpcClientConnectionManager.java @@ -0,0 +1,131 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NegotiationType; +import io.grpc.stub.AbstractStub; +import io.opengemini.client.api.GrpcConfig; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.grpc.VertxChannelBuilder; +import lombok.Getter; + +import java.io.File; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.function.Function; + +public class RpcClientConnectionManager { + private final Object locker; + @Getter + private final GrpcConfig config; + private volatile ManagedChannel managedChannel; + private volatile Vertx vertx; + + @Getter + private final ExecutorService executorService; + + RpcClientConnectionManager(GrpcConfig config) { + this(config, null); + } + + RpcClientConnectionManager(GrpcConfig config, ManagedChannel managedChannel) { + this.locker = new Object(); + this.config = config; + this.managedChannel = managedChannel; + // TODO: Parameterized configuration + this.executorService = Executors.newFixedThreadPool((Runtime.getRuntime().availableProcessors() * 2) + 1, r -> { + final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + Thread thread = defaultFactory.newThread(r); + thread.setName("OpenGemini-RpcClient-" + thread.getName()); + thread.setDaemon(false); + return thread; + }); + } + + ManagedChannel getChannel() throws Exception { + if (managedChannel == null) { + synchronized (locker) { + if (managedChannel == null) { + managedChannel = defaultChannelBuilder().build(); + } + } + } + return managedChannel; + } + + void close() { + synchronized (locker) { + if (managedChannel != null) { + managedChannel.shutdown(); + } + if (vertx != null) { + vertx.close(); + } + if (executorService != null) { + executorService.shutdown(); + } + } + } + + Vertx vertx() { + if (this.vertx == null) { + synchronized (locker) { + if (this.vertx == null) { + this.vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(false)); + } + } + } + + return this.vertx; + } + + ManagedChannelBuilder defaultChannelBuilder() throws Exception { + final VertxChannelBuilder channelBuilder = VertxChannelBuilder + .forAddress(vertx(), Objects.requireNonNull(config.getHost()), Objects.requireNonNull(config.getPort())); + + if (config.isUseSSL()) { + channelBuilder.nettyBuilder().negotiationType(NegotiationType.TLS); + channelBuilder.nettyBuilder().sslContext(GrpcSslContexts.forClient() + .trustManager(new File(config.getCaCertPath())) + .keyManager(new File(config.getClientCertPath()), new File(config.getClientKeyPath())) + .build()); + } else { + channelBuilder.usePlaintext(); + } + // TODO: more build config properties + return channelBuilder; + } + + public > T newStub(Function supplier) throws Exception { + return newStub(supplier, getChannel()); + } + + private > T newStub(Function stubCustomizer, ManagedChannel channel) { + T stub = stubCustomizer.apply(channel); + if (config.isWaitForReady()) { + stub = stub.withWaitForReady(); + } + return stub; + } + +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/ColVal.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/ColVal.java new file mode 100644 index 00000000..6fc24915 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/ColVal.java @@ -0,0 +1,65 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.record; + +import io.opengemini.client.impl.grpc.support.Encoder; +import lombok.Data; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +@Data +public class ColVal { + private byte[] val; + private int[] offset; + private byte[] bitmap; + private int bitMapOffset; + private int len; + private int nilCount; + + public byte[] marshal() throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + + dos.writeLong(Encoder.encodeZigZag64(len)); + dos.writeLong(Encoder.encodeZigZag64(nilCount)); + dos.writeLong(Encoder.encodeZigZag64(bitMapOffset)); + + dos.writeInt(val != null ? val.length : 0); + if (val != null) { + dos.write(val); + } + + dos.writeInt(bitmap != null ? bitmap.length : 0); + if (bitmap != null) { + dos.write(bitmap); + } + + dos.writeInt(offset != null ? offset.length : 0); + if (offset != null) { + for (int off : offset) { + dos.writeInt(off); + } + } + + dos.flush(); + return baos.toByteArray(); + } + } + +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/Field.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/Field.java new file mode 100644 index 00000000..080f7b38 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/Field.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.record; + +import io.opengemini.client.impl.grpc.support.Encoder; +import lombok.Data; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +@Data +public class Field { + private int type; + private String name; + + public byte[] marshal() throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + + byte[] nameBytes = name.getBytes(StandardCharsets.UTF_8); + dos.writeShort(name.length()); + dos.write(nameBytes); + + dos.writeLong(Encoder.encodeZigZag64(type)); + dos.flush(); + return baos.toByteArray(); + } + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/Record.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/Record.java new file mode 100644 index 00000000..435b5cf9 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/record/Record.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.record; + +import lombok.Data; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Record Binary Structure: + * +----------------------------------------------------------------------------------------+ + * | Record Header | + * +----------------------------------------------------------------------------------------+ + * | Schema Length | Schema Array | ColVals Length | ColVals Array | + * | (4 bytes) | (variable size) | (4 bytes) | (variable size) | + * +----------------------------------------------------------------------------------------+ + *

+ * Schema Array Structure (repeated for each field): + * +----------------------------------------------------------------------------------------+ + * | Field Size | Type | Name Length | Name | + * | (4 bytes) | (4 bytes) | (4 bytes) | (variable bytes, UTF-8) | + * +----------------------------------------------------------------------------------------+ + *

+ * ColVals Array Structure (repeated for each column): + * +----------------------------------------------------------------------------------------+ + * | ColVal Size | ColVal Content | + * | (4 bytes) | | + * | | +----------------------------------------------------------+ | + * | | | Val Length | Val Content | | | + * | | | (4 bytes) | (variable) | | | + * | | +----------------------------------------------------------+ | + * | | | Offset Length | Offset Array | | + * | | | (4 bytes) | (4 bytes * length) | | + * | | +----------------------------------------------------------+ | + * | | | Bitmap Length | Bitmap | | + * | | | (4 bytes) | (variable bytes) | | + * | | +----------------------------------------------------------+ | + * | | | BitMapOffset | Length | NilCount | | + * | | | (4 bytes) | (4 bytes) | (4 bytes) | | + * | | +----------------------------------------------------------+ | + * +----------------------------------------------------------------------------------------+ + */ +@Data +public class Record { + private ColVal[] colVals; + private Field[] schema; + + public byte[] marshal() throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + + // Write Schema + dos.writeInt(schema.length); + for (Field field : schema) { + byte[] fieldBytes = field.marshal(); + dos.writeInt(fieldBytes.length); + dos.write(fieldBytes); + } + + // Write ColVals + dos.writeInt(colVals.length); + for (ColVal colVal : colVals) { + byte[] colValBytes = colVal.marshal(); + dos.writeInt(colValBytes.length); + dos.write(colValBytes); + } + + dos.flush(); + return baos.toByteArray(); + } + } + +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/service/ServiceImpl.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/service/ServiceImpl.java new file mode 100644 index 00000000..a76190ea --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/service/ServiceImpl.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.service; + +import io.opengemini.client.impl.grpc.RpcClientConnectionManager; +import lombok.Getter; + +@Getter +public abstract class ServiceImpl { + private final RpcClientConnectionManager connectionManager; + + protected ServiceImpl(final RpcClientConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/service/WriteService.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/service/WriteService.java new file mode 100644 index 00000000..a34ca727 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/service/WriteService.java @@ -0,0 +1,115 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.service; + +import com.google.protobuf.ByteString; +import io.opengemini.client.api.Point; +import io.opengemini.client.impl.grpc.RpcClientConnectionManager; +import io.opengemini.client.grpc.VertxWriteServiceGrpc; +import io.opengemini.client.grpc.WriteRequest; +import io.opengemini.client.grpc.Record; +import io.opengemini.client.impl.grpc.record.ColVal; +import io.opengemini.client.impl.grpc.record.Field; +import io.opengemini.client.impl.grpc.support.PointConverter; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class WriteService extends ServiceImpl { + private final VertxWriteServiceGrpc.WriteServiceVertxStub stub; + + public WriteService(RpcClientConnectionManager connectionManager) throws Exception { + super(connectionManager); + this.stub = connectionManager.newStub(VertxWriteServiceGrpc::newVertxStub); + } + + public CompletableFuture writeRows(String database, List points) { + Map> measurementPoints = points.stream() + .collect(Collectors.groupingBy( + Point::getMeasurement, + Collectors.collectingAndThen( + Collectors.toList(), + list -> { + list.sort(Comparator.comparingLong(Point::getTime)); + return list; + } + ) + )); + + List records = buildRecords(measurementPoints); + + String username = getConnectionManager().getConfig().getUsername(); + String password = getConnectionManager().getConfig().getPassword(); + + + WriteRequest writeRequest = WriteRequest + .newBuilder() + .setDatabase(Objects.requireNonNull(database)) + .setUsername(username == null ? "" : username) + .setPassword(password == null ? "" : password) + .addAllRecords(records) + .build(); + + return writeRows(writeRequest); + } + + private List buildRecords(Map> measurementPoints) { + List records = new ArrayList<>(measurementPoints.size()); + measurementPoints.forEach((measurement, points) -> { + if (measurement != null && !measurement.isEmpty()) { + try { + records.add(buildRecord(measurement, points)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + return records; + } + + private Record buildRecord(String measurement, List points) throws IOException { + List schemas = PointConverter.extractSchema(points); + List colVals = PointConverter.extractColVals(points, schemas); + io.opengemini.client.impl.grpc.record.Record record = new io.opengemini.client.impl.grpc.record.Record(); + record.setSchema(schemas.toArray(new Field[0])); + record.setColVals(colVals.toArray(new ColVal[0])); + + return Record + .newBuilder() + .setMinTime(points.get(0).getTime()) + .setMaxTime(points.get(points.size() - 1).getTime()) + .setMeasurement(measurement) + .setBlock(ByteString.copyFrom(record.marshal())) + .build(); + } + + + public CompletableFuture writeRows(WriteRequest writeRequest) { + CompletableFuture resultFuture = new CompletableFuture<>(); + stub.write(writeRequest) + .onComplete(ar -> { + if (ar.succeeded()) { + resultFuture.complete(null); + } else { + resultFuture.completeExceptionally(ar.cause()); + } + }); + return resultFuture; + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/Encoder.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/Encoder.java new file mode 100644 index 00000000..557b74d4 --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/Encoder.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.support; + +public class Encoder { + public static long encodeZigZag64(long value) { + return (value << 1) ^ (value >> 63); + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/PointConverter.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/PointConverter.java new file mode 100644 index 00000000..3e3976db --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/PointConverter.java @@ -0,0 +1,242 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.support; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.opengemini.client.api.Point; +import io.opengemini.client.impl.grpc.record.ColVal; +import io.opengemini.client.impl.grpc.record.Field; +import lombok.Getter; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Arrays; + +public final class PointConverter { + private static final int BITS_PER_BYTE = 8; + private static final String TIME_FIELD = "time"; + + public static List extractColVals(List points, List schema) { + if (points == null || points.isEmpty()) { + throw new IllegalArgumentException("Points cannot be null or empty"); + } + + int rowCount = points.size(); + List colVals = new ArrayList<>(schema.size()); + + // Init ColVals + for (int i = 0; i < schema.size(); i++) { + ColVal colVal = new ColVal(); + colVal.setLen(rowCount); + colVal.setBitMapOffset(0); + initializeBitmap(colVal, rowCount); + colVals.add(colVal); + } + + // Process each column + Map fieldIndexMap = createFieldIndexMap(schema); + for (Field field : schema) { + int colIndex = fieldIndexMap.get(field.getName()); + processColumn(points, colVals.get(colIndex), field); + } + return colVals; + } + + /** + * 从Points中提取Schema信息 + */ + public static List extractSchema(List points) { + // Use a LinkedHashMap to maintain field order + Map fieldTypes = new LinkedHashMap<>(); + + // Traverse all points to ensure that all possible fields and types are captured. + for (Point point : points) { + Map fields = point.getFields(); + if (fields == null) continue; + + for (Map.Entry entry : fields.entrySet()) { + String fieldName = entry.getKey(); + Object value = entry.getValue(); + if (value != null && !fieldTypes.containsKey(fieldName)) { + fieldTypes.put(fieldName, determineFieldType(value)); + } + } + + point.getTags().forEach((tagName, tagValue) -> { + fieldTypes.put(tagName, FieldType.TAG.getValue()); + }); + } + + // Convert to field list + List schema = new ArrayList<>(); + for (Map.Entry entry : fieldTypes.entrySet()) { + Field field = new Field(); + field.setName(entry.getKey()); + field.setType(entry.getValue()); + schema.add(field); + } + + Field timeField = new Field(); + timeField.setName(TIME_FIELD); + timeField.setType(FieldType.INT64.getValue()); + schema.add(timeField); + + + return schema; + } + + private static Map createFieldIndexMap(List schema) { + Map fieldIndexMap = new HashMap<>(); + for (int i = 0; i < schema.size(); i++) { + fieldIndexMap.put(schema.get(i).getName(), i); + } + return fieldIndexMap; + } + + private static void processColumn(List points, ColVal colVal, Field field) { + ByteBuf buffer = Unpooled.buffer(); + try { + List offsets = new ArrayList<>(); + int currentOffset = 0; + + for (int rowIndex = 0; rowIndex < points.size(); rowIndex++) { + Point point = points.get(rowIndex); + Map fields = new HashMap<>(point.getFields()); + fields.putAll(point.getTags()); + fields.put(TIME_FIELD, point.getTime()); + + Object value = fields.get(field.getName()); + + if (value == null) { + markAsNull(colVal, rowIndex); + if (field.getType() == FieldType.STRING.getValue()) { + offsets.add(currentOffset); + } + continue; + } + + try { + currentOffset = writeValue(buffer, value, field.getType(), currentOffset, offsets); + markAsNonNull(colVal, rowIndex); + } catch (Exception e) { + markAsNull(colVal, rowIndex); + if (field.getType() == FieldType.STRING.getValue()) { + offsets.add(currentOffset); + } + } + } + + byte[] valArray = new byte[buffer.readableBytes()]; + buffer.readBytes(valArray); + colVal.setVal(valArray); + colVal.setOffset(convertToLittleEndian(offsets)); + } finally { + buffer.release(); + } + } + + public static int[] convertToLittleEndian(List offsets) { + int[] intArray = offsets.stream().mapToInt(Integer::intValue).toArray(); + + // 转换每个整数的字节顺序 + for (int i = 0; i < intArray.length; i++) { + intArray[i] = Integer.reverseBytes(intArray[i]); + } + return intArray; + } + + private static int writeValue(ByteBuf buffer, Object value, int type, int currentOffset, List offsets) { + if (type == FieldType.DOUBLE.getValue()) { + buffer.writeDoubleLE(((Number) value).doubleValue()); + } else if (type == FieldType.FLOAT.getValue()) { + buffer.writeFloatLE(((Number) value).floatValue()); + } else if (type == FieldType.INT64.getValue()) { + buffer.writeLongLE(((Number) value).longValue()); + } else if (type == FieldType.INT32.getValue()) { + buffer.writeIntLE(((Number) value).intValue()); + } else if (type == FieldType.BOOLEAN.getValue()) { + buffer.writeBoolean((Boolean) value); + } else if (type == FieldType.STRING.getValue() || (type == FieldType.TAG.getValue())) { + byte[] bytes = ((String) value).getBytes(); + buffer.writeBytes(bytes); + offsets.add(currentOffset); + return currentOffset + bytes.length; + } + return currentOffset; + } + + private static void markAsNull(ColVal colVal, int rowIndex) { + int byteIndex = rowIndex / BITS_PER_BYTE; + int bitOffset = rowIndex % BITS_PER_BYTE; + colVal.getBitmap()[byteIndex] &= (byte) ~(1 << bitOffset); + colVal.setNilCount(colVal.getNilCount() + 1); + } + + private static void markAsNonNull(ColVal colVal, int rowIndex) { + int byteIndex = rowIndex / BITS_PER_BYTE; + int bitOffset = rowIndex % BITS_PER_BYTE; + colVal.getBitmap()[byteIndex] |= (byte) (1 << bitOffset); + } + + private static void initializeBitmap(ColVal colVal, int rowCount) { + int bitmapSize = (rowCount + BITS_PER_BYTE - 1) / BITS_PER_BYTE; + colVal.setBitmap(new byte[bitmapSize]); + Arrays.fill(colVal.getBitmap(), (byte) 0); + colVal.setNilCount(0); + } + + private static int determineFieldType(Object value) { + if (value instanceof Double) { + return FieldType.DOUBLE.getValue(); + } else if (value instanceof Float) { + return FieldType.FLOAT.getValue(); + } else if (value instanceof Long) { + return FieldType.INT64.getValue(); + } else if (value instanceof Integer) { + return FieldType.INT32.getValue(); + } else if (value instanceof Boolean) { + return FieldType.BOOLEAN.getValue(); + } else if (value instanceof String) { + return FieldType.STRING.getValue(); + } else if (value instanceof Byte) { + return FieldType.INT32.getValue(); + } + throw new IllegalArgumentException("Unsupported type: " + value.getClass()); + } + + @Getter + enum FieldType { + INT64(1), + INT32(1), + DOUBLE(3), + FLOAT(3), + STRING(4), + BOOLEAN(5), + TAG(6); + + private final int value; + + FieldType(int value) { + this.value = value; + } + + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/RpcClientSupplier.java b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/RpcClientSupplier.java new file mode 100644 index 00000000..3a87b13d --- /dev/null +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/grpc/support/RpcClientSupplier.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.impl.grpc.support; + +import java.util.Objects; +import java.util.function.Supplier; + +public class RpcClientSupplier implements Supplier { + private final Supplier delegate; + private volatile boolean initialized = false; + private T holder; + + public RpcClientSupplier(Supplier delegate) { + this.delegate = Objects.requireNonNull(delegate); + } + + @Override + public T get() { + if (!initialized) { + synchronized (this) { + if (!initialized) { + T t = delegate.get(); + holder = t; + initialized = true; + return t; + } + } + } + return holder; + } + + public void close() { + if (initialized) { + synchronized (this) { + if (initialized) { + if (holder != null) { + // TODO: Holder close + holder = null; + } + } + initialized = false; + } + } + } +} diff --git a/opengemini-client/src/main/proto/write.proto b/opengemini-client/src/main/proto/write.proto new file mode 100644 index 00000000..d3e8aa2f --- /dev/null +++ b/opengemini-client/src/main/proto/write.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; +package proto; +option java_multiple_files = true; +option java_package = "io.opengemini.client.grpc"; +option java_outer_classname = "WriteProto"; +// WriteService represents a openGemini RPC write service. +service WriteService { + // Write writes the given records to the specified database and retention policy. + rpc Write (WriteRequest) returns (WriteResponse) {} + // Ping is used to check if the server is alive + rpc Ping(PingRequest) returns (PingResponse) {} +} + + +message WriteRequest { + uint32 version = 1; + string database = 2; + string retention_policy = 3; + string username = 4; + string password = 5; + repeated Record records = 6; +} + +message WriteResponse { + ResponseCode code = 1; +} + +message Record { + string measurement = 1; + int64 min_time = 2; + int64 max_time = 3; + CompressMethod compress_method = 4; + bytes block = 5; +} + +enum CompressMethod { + UNCOMPRESSED = 0; + LZ4_FAST = 1; + ZSTD_FAST = 2; + SNAPPY = 3; +} + +enum ResponseCode { + Success = 0; + Partial = 1; + Failed = 2; +} + +message PingRequest { + string client_id = 1; +} + +enum ServerStatus { + Up = 0; + Down = 1; + Unknown = 99; +} + +message PingResponse { + ServerStatus status = 1; +} \ No newline at end of file diff --git a/opengemini-client/src/test/java/io/opengemini/client/impl/grpc/GrpcClientTest.java b/opengemini-client/src/test/java/io/opengemini/client/impl/grpc/GrpcClientTest.java new file mode 100644 index 00000000..c95b050b --- /dev/null +++ b/opengemini-client/src/test/java/io/opengemini/client/impl/grpc/GrpcClientTest.java @@ -0,0 +1,136 @@ +package io.opengemini.client.impl.grpc; + +import io.grpc.Server; +import io.grpc.stub.StreamObserver; +import io.opengemini.client.api.Point; +import io.opengemini.client.api.GrpcConfig; +import io.opengemini.client.grpc.Record; +import io.opengemini.client.grpc.ResponseCode; +import io.opengemini.client.grpc.WriteRequest; +import io.opengemini.client.grpc.WriteResponse; +import io.opengemini.client.grpc.WriteServiceGrpc; +import io.vertx.core.Vertx; +import io.vertx.grpc.VertxServerBuilder; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; + +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Date; +import java.util.concurrent.ExecutionException; + +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.mock; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@ExtendWith(VertxExtension.class) +public class GrpcClientTest { + private Vertx vertx; + private int port; + private Server server; + + + private final WriteServiceGrpc.WriteServiceImplBase serviceImpl = Mockito.mock(WriteServiceGrpc.WriteServiceImplBase.class, delegatesTo( + new WriteServiceGrpc.WriteServiceImplBase() { + @Override + public void write(WriteRequest request, StreamObserver responseObserver) { + responseObserver.onNext(WriteResponse.newBuilder().setCode(ResponseCode.Success).build()); + responseObserver.onCompleted(); + } + } + )); + + private GrpcClient grpcClient; + + @BeforeEach + public void setUp(VertxTestContext testContext) throws Exception { + vertx = Vertx.vertx(); + ServerSocket socket = new ServerSocket(0); + port = socket.getLocalPort(); + socket.close(); + server = VertxServerBuilder + .forAddress(vertx, "localhost", port) + .addService(serviceImpl) + .build() + .start(); + testContext.completeNow(); + + GrpcConfig config = GrpcConfig.builder().host("127.0.0.1").port(port).build(); + + grpcClient = GrpcClient.create(config); + } + + @Test + void testWrite() throws ExecutionException, InterruptedException { + WriteRequest request = WriteRequest + .newBuilder() + .setDatabase("test") + .setUsername("test") + .setPassword("test") + .addAllRecords(Collections.singletonList(Record.newBuilder().build())) + .build(); + grpcClient.getWriteClient().writeRows(request).get(); + } + + @Test + void testWriteRows() throws ExecutionException, InterruptedException { + GrpcConfig config = GrpcConfig + .builder() + .host("127.0.0.1") + .port(8305) + .build(); + GrpcClient grpcClient = GrpcClient.create(config); + List points = new ArrayList<>(); + Point point1 = new Point(); + Map fields1 = new HashMap<>(); + fields1.put("a", 1.0); + fields1.put("b", -1.0); + fields1.put("c", 0.0); + point1.setTime(new Date().getTime() * 1_000_000); + + Map tags1 = new HashMap<>(); + tags1.put("tag1", "111"); + + point1.setFields(fields1); + point1.setTags(tags1); + point1.setMeasurement("test1"); + + Point point2 = new Point(); + Map fields2 = new HashMap<>(); + fields2.put("a", 2.0); + fields2.put("b", -2.0); + Map tags2 = new HashMap<>(); + tags2.put("tag1", "222"); + point2.setFields(fields2); + point2.setTags(tags2); + point2.setTime((new Date().getTime() + 100) * 1_000_000); + point2.setMeasurement("test1"); + points.add(point1); + points.add(point2); + + + grpcClient.getWriteClient().writeRows("test", points).get(); + } + + @AfterEach + void tearDown(VertxTestContext testContext) { + if (grpcClient != null) { + grpcClient.close(); + } + if (server != null) { + server.shutdown(); + } + vertx.close() + .onComplete(testContext.succeedingThenComplete()); + } +} diff --git a/pom.xml b/pom.xml index 3e66e568..e6b660a8 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,11 @@ 2.0.7 6.0.19 3.1.11 + 1.61.1 + 3.25.1 + 3.25.1 + 1.3.2 + 4.5.10 0.6.0 1.18.20.0 @@ -70,6 +75,8 @@ 3.1.1 3.3.1 3.5.0 + 1.7.1 + 0.6.1 1.6.13 4.9.2 4.8.6.4 @@ -95,6 +102,13 @@ jackson-databind ${jackson.version} + + io.grpc + grpc-bom + ${grpc.version} + pom + import + @@ -138,14 +152,31 @@ log4j-slf4j2-impl test + + io.grpc + grpc-testing + test + io.github.openfacade http-facade ${http-facade.version} + + javax.annotation + javax.annotation-api + ${javax.annotation-api.version} + + + + kr.motd.maven + os-maven-plugin + ${maven-os-maven-plugin.version} + + ${src.dir} @@ -168,6 +199,40 @@ + + org.xolstice.maven.plugins + protobuf-maven-plugin + ${maven-protobuf-maven-plugin} + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + + + + + grpc-java + + compile + compile-custom + + + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + vertx-grpc + + compile + compile-custom + + + grpc-vertx + io.vertx:vertx-grpc-protoc-plugin:${vertx-grpc.version}:exe:${os.detected.classifier} + + + + org.apache.maven.plugins maven-compiler-plugin