Skip to content

Commit d452cc9

Browse files
committed
Merge branch 'main' into clientv2_pojo_deserializers
2 parents 3eaf4c0 + dfb161b commit d452cc9

File tree

18 files changed

+500
-231
lines changed

18 files changed

+500
-231
lines changed

.github/workflows/analysis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,6 @@ jobs:
9595
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any
9696
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
9797
run: |
98-
mvn --batch-mode -DclickhouseVersion=$PREFERRED_LTS_VERSION \
98+
mvn --batch-mode -DclickhouseVersion=$PREFERRED_LTS_VERSION -Dclient.tests.useNewImplementation=true \
9999
-Panalysis verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar
100100
continue-on-error: true

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
1212
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
1313
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
14+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
1415
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
1516
import com.clickhouse.client.api.data_formats.internal.ProcessParser;
1617
import com.clickhouse.client.api.enums.Protocol;
@@ -19,6 +20,8 @@
1920
import com.clickhouse.client.api.insert.InsertResponse;
2021
import com.clickhouse.client.api.insert.InsertSettings;
2122
import com.clickhouse.client.api.insert.POJOSerializer;
23+
import com.clickhouse.client.api.insert.SerializerNotFoundException;
24+
import com.clickhouse.client.api.internal.BasicObjectsPool;
2225
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
2326
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
2427
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
@@ -70,6 +73,7 @@
7073
import java.util.HashMap;
7174
import java.util.HashSet;
7275
import java.util.LinkedHashMap;
76+
import java.util.LinkedList;
7377
import java.util.List;
7478
import java.util.Map;
7579
import java.util.Set;
@@ -761,6 +765,22 @@ public Builder setMaxRetries(int maxRetries) {
761765
return this;
762766
}
763767

768+
/**
769+
* Configures client to reuse allocated byte buffers for numbers. It affects how binary format reader is working.
770+
* If set to 'true' then {@link Client#newBinaryFormatReader(QueryResponse)} will construct reader that will
771+
* reuse buffers for numbers. It improves performance for large datasets by reducing number of allocations
772+
* (therefore GC pressure).
773+
* Enabling this feature is safe because each reader suppose to be used by a single thread and readers are not reused.
774+
*
775+
* Default is false.
776+
* @param reuse - if to reuse buffers
777+
* @return
778+
*/
779+
public Builder allowBinaryReaderToReuseBuffers(boolean reuse) {
780+
this.configuration.put("client_allow_binary_reader_to_reuse_buffers", String.valueOf(reuse));
781+
return this;
782+
}
783+
764784
public Client build() {
765785
setDefaults();
766786

@@ -874,6 +894,10 @@ private void setDefaults() {
874894
if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) {
875895
setMaxRetries(3);
876896
}
897+
898+
if (!configuration.containsKey("client_allow_binary_reader_to_reuse_buffers")) {
899+
allowBinaryReaderToReuseBuffers(false);
900+
}
877901
}
878902
}
879903

@@ -1468,10 +1492,10 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
14681492
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
14691493
settings.waitEndOfQuery(true); // we rely on the summery
14701494

1471-
final QuerySettings finalSettings = settings;
14721495
return query(sqlQuery, settings).thenApply(response -> {
14731496
try {
1474-
return new Records(response, finalSettings);
1497+
1498+
return new Records(response, newBinaryFormatReader(response));
14751499
} catch (Exception e) {
14761500
throw new ClientException("Failed to get query response", e);
14771501
}
@@ -1488,13 +1512,14 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
14881512
public List<GenericRecord> queryAll(String sqlQuery) {
14891513
try {
14901514
int operationTimeout = getOperationTimeout();
1491-
QuerySettings settings = new QuerySettings().waitEndOfQuery(true);
1515+
QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes)
1516+
.waitEndOfQuery(true);
14921517
try (QueryResponse response = operationTimeout == 0 ? query(sqlQuery, settings).get() :
14931518
query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) {
14941519
List<GenericRecord> records = new ArrayList<>();
14951520
if (response.getResultRows() > 0) {
14961521
RowBinaryWithNamesAndTypesFormatReader reader =
1497-
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1522+
(RowBinaryWithNamesAndTypesFormatReader) newBinaryFormatReader(response);
14981523

14991524
Map<String, Object> record;
15001525
while (reader.readRecord((record = new LinkedHashMap<>()))) {
@@ -1662,28 +1687,38 @@ public CompletableFuture<CommandResponse> execute(String sql) {
16621687
* @param schema
16631688
* @return
16641689
*/
1665-
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
1690+
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
16661691
ClickHouseBinaryFormatReader reader = null;
1692+
// Using caching buffer allocator is risky so this parameter is not exposed to the user
1693+
boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers");
1694+
BinaryStreamReader.ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
1695+
new BinaryStreamReader.CachingByteBufferAllocator() :
1696+
new BinaryStreamReader.DefaultByteBufferAllocator();
1697+
16671698
switch (response.getFormat()) {
16681699
case Native:
1669-
reader = new NativeFormatReader(response.getInputStream(), response.getSettings());
1700+
reader = new NativeFormatReader(response.getInputStream(), response.getSettings(),
1701+
byteBufferPool);
16701702
break;
16711703
case RowBinaryWithNamesAndTypes:
1672-
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1704+
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(),
1705+
byteBufferPool);
16731706
break;
16741707
case RowBinaryWithNames:
1675-
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema);
1708+
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema,
1709+
byteBufferPool);
16761710
break;
16771711
case RowBinary:
1678-
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema);
1712+
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema,
1713+
byteBufferPool);
16791714
break;
16801715
default:
16811716
throw new IllegalArgumentException("Unsupported format: " + response.getFormat());
16821717
}
16831718
return reader;
16841719
}
16851720

1686-
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
1721+
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
16871722
return newBinaryFormatReader(response, null);
16881723
}
16891724

client-v2/src/main/java/com/clickhouse/client/api/data_formats/ClickHouseBinaryFormatReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.Map;
2020
import java.util.UUID;
2121

22-
public interface ClickHouseBinaryFormatReader {
22+
public interface ClickHouseBinaryFormatReader extends AutoCloseable {
2323

2424
/**
2525
* Reads a single value from the stream.

client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ public class NativeFormatReader extends AbstractBinaryFormatReader {
2323

2424
private int blockRowIndex;
2525

26-
public NativeFormatReader(InputStream inputStream) {
27-
this(inputStream, null);
28-
}
29-
30-
public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
31-
super(inputStream, settings, null);
26+
public NativeFormatReader(InputStream inputStream, QuerySettings settings,
27+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
28+
super(inputStream, settings, null, byteBufferAllocator);
3229
readNextRecord();
3330
}
3431

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatReader.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.clickhouse.client.api.data_formats;
22

33
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
4+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
45
import com.clickhouse.client.api.metadata.TableSchema;
56
import com.clickhouse.client.api.query.QuerySettings;
67
import com.clickhouse.data.ClickHouseColumn;
@@ -12,12 +13,9 @@
1213

1314
public class RowBinaryFormatReader extends AbstractBinaryFormatReader {
1415

15-
public RowBinaryFormatReader(InputStream inputStream, TableSchema schema) {
16-
this(inputStream, null, schema);
17-
}
18-
19-
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
20-
super(inputStream, querySettings, schema);
16+
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
17+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
18+
super(inputStream, querySettings, schema, byteBufferAllocator);
2119
readNextRecord();
2220
}
2321

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
66
import com.clickhouse.client.api.metadata.TableSchema;
77
import com.clickhouse.client.api.query.QuerySettings;
8-
import com.clickhouse.data.ClickHouseColumn;
98

109
import java.io.EOFException;
1110
import java.io.IOException;
@@ -17,8 +16,9 @@
1716

1817
public class RowBinaryWithNamesAndTypesFormatReader extends AbstractBinaryFormatReader implements Iterator<Map<String, Object>> {
1918

20-
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings) {
21-
super(inputStream, querySettings, null);
19+
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings,
20+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
21+
super(inputStream, querySettings, null, byteBufferAllocator);
2222
readSchema();
2323
}
2424

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,21 @@
44
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
55
import com.clickhouse.client.api.metadata.TableSchema;
66
import com.clickhouse.client.api.query.QuerySettings;
7-
import com.clickhouse.data.ClickHouseColumn;
87

98
import java.io.EOFException;
109
import java.io.IOException;
1110
import java.io.InputStream;
1211
import java.util.ArrayList;
1312
import java.util.Collections;
1413
import java.util.List;
15-
import java.util.Map;
1614

1715
public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {
1816

1917
private List<String> columns = null;
2018

21-
public RowBinaryWithNamesFormatReader(InputStream inputStream, TableSchema schema) {
22-
this(inputStream, null, schema);
23-
readNextRecord();
24-
}
25-
26-
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
27-
super(inputStream, querySettings, schema);
19+
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
20+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
21+
super(inputStream, querySettings, schema, byteBufferAllocator);
2822
int nCol = 0;
2923
try {
3024
nCol = BinaryStreamReader.readVarInt(input);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
5+
import com.clickhouse.client.api.internal.BasicObjectsPool;
6+
import com.clickhouse.client.api.internal.MapUtils;
57
import com.clickhouse.client.api.metadata.TableSchema;
68
import com.clickhouse.client.api.query.NullValueException;
79
import com.clickhouse.client.api.query.POJOSetter;
@@ -37,6 +39,8 @@
3739
import java.util.UUID;
3840
import java.util.concurrent.ConcurrentHashMap;
3941
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.function.Function;
43+
import java.util.function.Supplier;
4044

4145
public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {
4246

@@ -54,7 +58,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
5458

5559
private volatile boolean hasNext = true;
5660

57-
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
61+
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
62+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
5863
this.input = inputStream;
5964
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
6065
boolean useServerTimeZone = (boolean) this.settings.get(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey());
@@ -63,7 +68,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
6368
if (timeZone == null) {
6469
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
6570
}
66-
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG);
71+
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator);
6772
setSchema(schema);
6873
}
6974

@@ -155,12 +160,12 @@ protected void readNextRecord() {
155160
try {
156161
nextRecordEmpty.set(true);
157162
if (!readRecord(nextRecord)) {
158-
hasNext = false;
163+
endReached();
159164
} else {
160165
nextRecordEmpty.compareAndSet(true, false);
161166
}
162167
} catch (IOException e) {
163-
hasNext = false;
168+
endReached();
164169
throw new ClientException("Failed to read next row", e);
165170
}
166171
}
@@ -187,7 +192,7 @@ public Map<String, Object> next() {
187192
return null;
188193
}
189194
} catch (IOException e) {
190-
hasNext = false;
195+
endReached();
191196
throw new ClientException("Failed to read row", e);
192197
}
193198
}
@@ -643,4 +648,9 @@ public LocalDateTime getLocalDateTime(int index) {
643648
}
644649
return (LocalDateTime) value;
645650
}
651+
652+
@Override
653+
public void close() throws Exception {
654+
input.close();
655+
}
646656
}

0 commit comments

Comments
 (0)