diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/CompressMethod.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/CompressMethod.java new file mode 100644 index 00000000..4de0042e --- /dev/null +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/CompressMethod.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.api; + +public enum CompressMethod { + GZIP("gzip"), + SNAPPY("snappy"), + ZSTD("zstd"); + + private final String value; + + CompressMethod(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java index daf22422..8720250c 100644 --- a/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java @@ -37,6 +37,11 @@ public class Configuration { BatchConfig batchConfig; + ContentType contentType; + + CompressMethod compressMethod; + + // deprecated, will use compressMethod and contentType boolean gzipEnabled; HttpClientConfig httpConfig; diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/ContentType.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/ContentType.java new file mode 100644 index 00000000..b93e015c --- /dev/null +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/ContentType.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.api; + +public enum ContentType { + JSON("application/json"), + MSGPACK("application/msgpack"); + + private final String value; + + ContentType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java index 4945721d..9024250b 100644 --- a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java @@ -55,6 +55,9 @@ public BaseClient(Configuration conf) { contentEncodingHeader.add("gzip"); headers.put("Content-Encoding", contentEncodingHeader); } + + applyCodec(conf, headers); + String httpPrefix; if (conf.getHttpConfig().tlsConfig() != null) { httpPrefix = "https://"; @@ -77,6 +80,36 @@ public BaseClient(Configuration conf) { scheduler.ifPresent(this::startHealthCheck); } + private void applyCodec(Configuration config, Map> headers) { + if (config.getContentType() != null) { + List acceptHeader = new ArrayList<>(); + switch (config.getContentType()) { + case MSGPACK: + acceptHeader.add("application/msgpack"); + break; + case JSON: + acceptHeader.add("application/json"); + break; + } + headers.put("Accept", acceptHeader); + } + if (config.getCompressMethod() != null) { + List acceptEncodingHeader = new ArrayList<>(); + switch (config.getCompressMethod()) { + case GZIP: + acceptEncodingHeader.add("gzip"); + break; + case ZSTD: + acceptEncodingHeader.add("zstd"); + break; + case SNAPPY: + acceptEncodingHeader.add("snappy"); + break; + } + headers.put("Accept-Encoding", acceptEncodingHeader); + } + } + /** * Health Check * Start schedule task(period 10s) to ping all server url diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/Compressor.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/Compressor.java new file mode 100644 index 00000000..ef7393c6 --- /dev/null +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/Compressor.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; + +public interface Compressor { + + byte[] compress(byte[] data); + + byte[] decompress(byte[] data); + + String getName(); +} diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/GzipCompressor.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/GzipCompressor.java new file mode 100644 index 00000000..9f8ee0cf --- /dev/null +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/GzipCompressor.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; + +import io.opengemini.client.api.CompressMethod; +import lombok.AllArgsConstructor; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +@AllArgsConstructor +public class GzipCompressor implements Compressor { + + @Override + public byte[] compress(byte[] data) { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOutputStream.write(data); + gzipOutputStream.finish(); + return byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to compress data", e); + } + } + + @Override + public byte[] decompress(byte[] data) { + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data); + GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipInputStream.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, len); + } + return byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to decompress data", e); + } + } + + @Override + public String getName() { + return CompressMethod.GZIP.getValue(); + } +} diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/SnappyCompressor.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/SnappyCompressor.java new file mode 100644 index 00000000..effe923d --- /dev/null +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/SnappyCompressor.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; + +import io.opengemini.client.api.CompressMethod; +import lombok.AllArgsConstructor; +import org.xerial.snappy.Snappy; + +import java.io.IOException; + +@AllArgsConstructor +public class SnappyCompressor implements Compressor { + + @Override + public byte[] compress(byte[] data) { + try { + return Snappy.compress(data); + } catch (IOException e) { + throw new RuntimeException("Failed to compress data", e); + } + } + + @Override + public byte[] decompress(byte[] data) { + try { + return Snappy.uncompress(data); + } catch (IOException e) { + throw new RuntimeException("Failed to decompress data", e); + } + } + + @Override + public String getName() { + return CompressMethod.SNAPPY.getValue(); + } +} diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/ZstdCompressor.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/ZstdCompressor.java new file mode 100644 index 00000000..f722f210 --- /dev/null +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/ZstdCompressor.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; + +import com.github.luben.zstd.Zstd; +import io.opengemini.client.api.CompressMethod; + +public class ZstdCompressor implements Compressor { + + @Override + public byte[] compress(byte[] data) { + return Zstd.compress(data); + } + + @Override + public byte[] decompress(byte[] data) { + try { + long decompressedSize = Zstd.decompressedSize(data); + byte[] decompressedData = new byte[(int) decompressedSize]; + Zstd.decompress(decompressedData, data); + return decompressedData; + } catch (Exception e) { + throw new RuntimeException("Failed to decompress data", e); + } + } + + @Override + public String getName() { + return CompressMethod.ZSTD.getValue(); + } +} diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/package-info.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/package-info.java new file mode 100644 index 00000000..fdd215e3 --- /dev/null +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/compress/package-info.java @@ -0,0 +1,17 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; diff --git a/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/GzipCompressorTest.java b/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/GzipCompressorTest.java new file mode 100644 index 00000000..4e6cc547 --- /dev/null +++ b/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/GzipCompressorTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class GzipCompressorTest { + + private final GzipCompressor gzipCompressor = new GzipCompressor(); + + @Test + void testCompressAndDecompress() { + String originalString = "This is a test string for GZIP compression"; + byte[] originalData = originalString.getBytes(); + + // Compress the data + byte[] compressedData = gzipCompressor.compress(originalData); + Assertions.assertNotNull(compressedData); + Assertions.assertNotEquals(0, compressedData.length); + + // Decompress the data + byte[] decompressedData = gzipCompressor.decompress(compressedData); + Assertions.assertNotNull(decompressedData); + Assertions.assertArrayEquals(originalData, decompressedData); + + // Verify the decompressed string is the same as the original + String decompressedString = new String(decompressedData); + Assertions.assertEquals(originalString, decompressedString); + } +} diff --git a/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/SnappyCompressorTest.java b/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/SnappyCompressorTest.java new file mode 100644 index 00000000..60b59a4e --- /dev/null +++ b/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/SnappyCompressorTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SnappyCompressorTest { + @Test + public void testCompression() { + SnappyCompressor snappyCompressor = new SnappyCompressor(); + // Example input data + String input = "This is a test string to compress"; + byte[] inputData = input.getBytes(); + + // Compress the data + byte[] compressedData = snappyCompressor.compress(inputData); + + // Decompress the data + byte[] decompressedData = snappyCompressor.decompress(compressedData); + + // Verify the decompressed data matches the original input + Assertions.assertArrayEquals(inputData, decompressedData); + } +} diff --git a/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/ZstdCompressorTest.java b/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/ZstdCompressorTest.java new file mode 100644 index 00000000..d15787a2 --- /dev/null +++ b/opengemini-client-common/src/test/java/io/opengemini/client/common/compress/ZstdCompressorTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 openGemini Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opengemini.client.common.compress; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ZstdCompressorTest { + + @Test + public void testCompression() { + ZstdCompressor zstdCompressor = new ZstdCompressor(); + // Example input data + String input = "This is a test string to compress"; + byte[] inputData = input.getBytes(); + + // Compress the data + byte[] compressedData = zstdCompressor.compress(inputData); + + // Decompress the data + byte[] decompressedData = zstdCompressor.decompress(compressedData); + + // Verify the decompressed data matches the original input + Assertions.assertArrayEquals(inputData, decompressedData); + } +} diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java index 1957257c..0247f7a1 100644 --- a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.opengemini.client.impl; import io.github.openfacade.http.BasicAuthRequestFilter; @@ -23,7 +22,9 @@ import io.github.openfacade.http.HttpResponse; import io.opengemini.client.api.AuthConfig; import io.opengemini.client.api.AuthType; +import io.opengemini.client.api.CompressMethod; import io.opengemini.client.api.Configuration; +import io.opengemini.client.api.ContentType; import io.opengemini.client.api.OpenGeminiException; import io.opengemini.client.api.Pong; import io.opengemini.client.api.Query; @@ -31,17 +32,24 @@ import io.opengemini.client.common.BaseAsyncClient; import io.opengemini.client.common.HeaderConst; import io.opengemini.client.common.JacksonService; +import io.opengemini.client.common.compress.GzipCompressor; +import io.opengemini.client.common.compress.SnappyCompressor; +import io.opengemini.client.common.compress.ZstdCompressor; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class OpenGeminiClient extends BaseAsyncClient { + protected final Configuration conf; private final HttpClient client; + private final Map compressorCache = new ConcurrentHashMap<>(); public OpenGeminiClient(@NotNull Configuration conf) { super(conf); @@ -80,9 +88,9 @@ protected CompletableFuture executePostQuery(Query query) { /** * Execute a write call with java HttpClient. * - * @param database the name of the database. + * @param database the name of the database. * @param retentionPolicy the name of the retention policy. - * @param lineProtocol the line protocol string to write. + * @param lineProtocol the line protocol string to write. */ @Override protected CompletableFuture executeWrite(String database, String retentionPolicy, String lineProtocol) { @@ -101,10 +109,11 @@ protected CompletableFuture executePing() { .orElse(null)).thenApply(Pong::new); } - private @NotNull CompletableFuture convertResponse(HttpResponse response, Class type) { + private @NotNull + CompletableFuture convertResponse(HttpResponse response, Class type) { if (response.statusCode() >= 200 && response.statusCode() < 300) { try { - T resp = JacksonService.toObject(response.body(), type); + T resp = processResponseBody(response, type); return CompletableFuture.completedFuture(resp); } catch (IOException e) { CompletableFuture future = new CompletableFuture<>(); @@ -121,13 +130,54 @@ protected CompletableFuture executePing() { } } + private T processResponseBody(HttpResponse response, Class type) throws IOException { + String contentType = response.headers().get("Content-Type") != null + ? response.headers().get("Content-Type").get(0) : null; + String contentEncoding = response.headers().get("Content-Encoding") != null + ? response.headers().get("Content-Encoding").get(0) : null; + byte[] body = processCompression(contentEncoding, response.body(), type); + + return processContentType(contentType, body, type); + } + + private byte[] processCompression(String compressMethod, byte[] body, Class type) throws IOException { + byte[] decompressedBody = null; + if (CompressMethod.GZIP.getValue().equals(compressMethod)) { + GzipCompressor compressor = (GzipCompressor) compressorCache.computeIfAbsent( + CompressMethod.GZIP.getValue(), + k -> new GzipCompressor()); + decompressedBody = compressor.decompress(body); + } else if (CompressMethod.SNAPPY.getValue().equals(compressMethod)) { + SnappyCompressor compressor = (SnappyCompressor) compressorCache.computeIfAbsent( + CompressMethod.SNAPPY.getValue(), + k -> new SnappyCompressor()); + decompressedBody = compressor.decompress(body); + } else if (CompressMethod.ZSTD.getValue().equals(compressMethod)) { + ZstdCompressor compressor = (ZstdCompressor) compressorCache.computeIfAbsent( + CompressMethod.ZSTD.getValue(), + k -> new ZstdCompressor()); + decompressedBody = compressor.decompress(body); + } + + return decompressedBody != null ? decompressedBody : body; + } + + private T processContentType(String contentType, byte[] body, Class type) throws IOException { + if (ContentType.JSON.getValue().equals(contentType)) { + return JacksonService.toObject(body, type); + } else if (ContentType.MSGPACK.getValue().equals(contentType)) { + throw new IOException("Unsupported content type: " + contentType); + } + return JacksonService.toObject(body, type); + } + public CompletableFuture get(String url) { return client.get(buildUriWithPrefix(url), headers); } public CompletableFuture post(String url, String body) { return client.post(buildUriWithPrefix(url), body == null ? new byte[0] : body.getBytes(StandardCharsets.UTF_8), - headers); + headers); } @Override diff --git a/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientTest.java b/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientTest.java index f031de32..9d66be68 100644 --- a/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientTest.java +++ b/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientTest.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.opengemini.client.impl; import io.github.openfacade.http.HttpClientConfig; @@ -21,6 +20,7 @@ import io.opengemini.client.api.Address; import io.opengemini.client.api.AuthConfig; import io.opengemini.client.api.AuthType; +import io.opengemini.client.api.CompressMethod; import io.opengemini.client.api.Configuration; import io.opengemini.client.api.OpenGeminiException; import io.opengemini.client.api.Point; @@ -53,6 +53,7 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) class OpenGeminiClientTest extends TestBase { + private final List clients = new ArrayList<>(); protected List clientList() throws OpenGeminiException { @@ -68,12 +69,27 @@ protected List clientList() throws OpenGeminiException { .connectTimeout(Duration.ofSeconds(3)) .timeout(Duration.ofSeconds(3)) .build(); - Configuration configuration = - Configuration.builder() - .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) - .httpConfig(httpConfig) - .gzipEnabled(false) - .build(); + Configuration configuration = Configuration + .builder() + .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) + .httpConfig(httpConfig) + .gzipEnabled(false) + .build(); + clients.add(OpenGeminiClientFactory.create(configuration)); + } + List compressMethods = Arrays.asList(CompressMethod.SNAPPY, CompressMethod.GZIP, + CompressMethod.ZSTD); + for (CompressMethod compressMethod : compressMethods) { + HttpClientConfig httpConfig = new HttpClientConfig.Builder() + .engine(HttpClientEngine.Async) + .connectTimeout(Duration.ofSeconds(3)) + .timeout(Duration.ofSeconds(3)) + .build(); + Configuration configuration = Configuration.builder() + .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) + .httpConfig(httpConfig) + .compressMethod(compressMethod) + .build(); clients.add(OpenGeminiClientFactory.create(configuration)); } return clients; @@ -375,8 +391,10 @@ void retention_policy_create_failed_for_wrong_duration_param(OpenGeminiClient cl // todo jdk doesn't throw ExecutionException Exception exception = Assertions.assertThrows(Exception.class, createRpFuture::get); - if (exception instanceof ExecutionException exp) { - if (exp.getCause() instanceof OpenGeminiException e) { + if (exception instanceof ExecutionException) { + ExecutionException exp = (ExecutionException) exception; + if (exp.getCause() instanceof OpenGeminiException) { + OpenGeminiException e = (OpenGeminiException) exp.getCause(); Assertions.assertTrue( e.getMessage().contains("syntax error: unexpected IDENT, expecting DURATIONVAL")); } @@ -417,7 +435,6 @@ private static Point testPoint(String measurementName, int valueIndex, int field return testPoint; } - @ParameterizedTest @MethodSource("clientList") void testQueryPrecision(OpenGeminiClient client) throws Exception { diff --git a/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java b/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java index 9aa5d9fe..77db65fd 100644 --- a/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java +++ b/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java @@ -13,12 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.opengemini.client.impl; import io.github.openfacade.http.HttpClientConfig; import io.github.openfacade.http.HttpClientEngine; import io.opengemini.client.api.Address; +import io.opengemini.client.api.CompressMethod; import io.opengemini.client.api.Configuration; import io.opengemini.client.api.OpenGeminiException; import io.opengemini.client.api.Point; @@ -43,6 +43,7 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) class OpenGeminiClientWriteTest extends TestBase { + private final List clients = new ArrayList<>(); protected List clientList() throws OpenGeminiException { @@ -58,12 +59,26 @@ protected List clientList() throws OpenGeminiException { .connectTimeout(Duration.ofSeconds(3)) .timeout(Duration.ofSeconds(3)) .build(); - Configuration configuration = - Configuration.builder() - .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) - .httpConfig(httpConfig) - .gzipEnabled(false) - .build(); + Configuration configuration = Configuration.builder() + .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) + .httpConfig(httpConfig) + .gzipEnabled(false) + .build(); + clients.add(OpenGeminiClientFactory.create(configuration)); + } + + List compressMethods = Arrays.asList(CompressMethod.SNAPPY); + for (CompressMethod compressMethod : compressMethods) { + HttpClientConfig httpConfig = new HttpClientConfig.Builder() + .engine(HttpClientEngine.Java) + .connectTimeout(Duration.ofSeconds(3)) + .timeout(Duration.ofSeconds(3)) + .build(); + Configuration configuration = Configuration.builder() + .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) + .httpConfig(httpConfig) + .compressMethod(compressMethod) + .build(); clients.add(OpenGeminiClientFactory.create(configuration)); } return clients; @@ -111,7 +126,8 @@ void write_point_with_more_fields(OpenGeminiClient client) throws Exception { writeRsp.get(); Thread.sleep(3000); - Query selectQuery = new Query("select * from " + measurementName, databaseName, ""); + Query selectQuery = new Query("select * from " + measurementName, + databaseName, ""); CompletableFuture rst = client.query(selectQuery); QueryResult queryResult = rst.get(); diff --git a/spring/opengemini-spring/src/main/java/io/opengemini/client/spring/data/core/OpenGeminiTemplate.java b/spring/opengemini-spring/src/main/java/io/opengemini/client/spring/data/core/OpenGeminiTemplate.java index af970547..fd99361e 100644 --- a/spring/opengemini-spring/src/main/java/io/opengemini/client/spring/data/core/OpenGeminiTemplate.java +++ b/spring/opengemini-spring/src/main/java/io/opengemini/client/spring/data/core/OpenGeminiTemplate.java @@ -120,7 +120,7 @@ public MeasurementOperations opsForMeasurement(String databaseName, } @SuppressWarnings("unchecked") - private @NotNull MeasurementOperations getMeasurementOperations(MeasurementOperationsCacheKey key) { + private @NotNull MeasurementOperations getMeasurementOperations(MeasurementOperationsCacheKey key) { return (MeasurementOperations) msOperationsMap.computeIfAbsent(key, (k) -> { OpenGeminiSerializer serializer = (OpenGeminiSerializer) serializerFactory.getSerializer( k.getClazz());