Skip to content

Commit 42670f8

Browse files
committed
byte buffering reimplemented
1 parent 4e56e82 commit 42670f8

File tree

9 files changed

+95
-84
lines changed

9 files changed

+95
-84
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,6 @@ public class Client implements AutoCloseable {
136136

137137
private ClickHouseClient oldClient = null;
138138

139-
private BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool;
140-
141139
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
142140
ExecutorService sharedOperationExecutor) {
143141
this.endpoints = endpoints;
@@ -163,15 +161,6 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
163161
this.oldClient = ClientV1AdaptorHelper.createClient(configuration);
164162
LOG.info("Using old http client implementation");
165163
}
166-
167-
this.byteBufferPool = new BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator>(
168-
new LinkedList<>(), 100
169-
) {
170-
@Override
171-
protected BinaryStreamReader.ByteBufferAllocator create() {
172-
return new BinaryStreamReader.CachingByteBufferAllocator();
173-
}
174-
};
175164
}
176165

177166
/**
@@ -1456,10 +1445,10 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
14561445
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
14571446
settings.waitEndOfQuery(true); // we rely on the summery
14581447

1459-
final QuerySettings finalSettings = settings;
14601448
return query(sqlQuery, settings).thenApply(response -> {
14611449
try {
1462-
return new Records(response, finalSettings, byteBufferPool);
1450+
1451+
return new Records(response, newBinaryFormatReader(response));
14631452
} catch (Exception e) {
14641453
throw new ClientException("Failed to get query response", e);
14651454
}
@@ -1476,14 +1465,14 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
14761465
public List<GenericRecord> queryAll(String sqlQuery) {
14771466
try {
14781467
int operationTimeout = getOperationTimeout();
1479-
QuerySettings settings = new QuerySettings().waitEndOfQuery(true);
1468+
QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes)
1469+
.waitEndOfQuery(true);
14801470
try (QueryResponse response = operationTimeout == 0 ? query(sqlQuery, settings).get() :
14811471
query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) {
14821472
List<GenericRecord> records = new ArrayList<>();
14831473
if (response.getResultRows() > 0) {
14841474
RowBinaryWithNamesAndTypesFormatReader reader =
1485-
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(),
1486-
byteBufferPool);
1475+
(RowBinaryWithNamesAndTypesFormatReader) newBinaryFormatReader(response);
14871476

14881477
Map<String, Object> record;
14891478
while (reader.readRecord((record = new LinkedHashMap<>()))) {
@@ -1586,6 +1575,13 @@ public CompletableFuture<CommandResponse> execute(String sql) {
15861575
*/
15871576
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
15881577
ClickHouseBinaryFormatReader reader = null;
1578+
// Using caching buffer allocator is risky so this parameter is not exposed to the user
1579+
boolean useCachingBufferAllocator = Boolean.parseBoolean(
1580+
configuration.getOrDefault("client_use_caching_buffer_allocator", "false"));
1581+
BinaryStreamReader.ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
1582+
new BinaryStreamReader.CachingByteBufferAllocator() :
1583+
new BinaryStreamReader.DefaultByteBufferAllocator();
1584+
15891585
switch (response.getFormat()) {
15901586
case Native:
15911587
reader = new NativeFormatReader(response.getInputStream(), response.getSettings(),

client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
44
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
5-
import com.clickhouse.client.api.internal.BasicObjectsPool;
65
import com.clickhouse.client.api.query.QuerySettings;
76
import com.clickhouse.data.ClickHouseColumn;
87

@@ -25,8 +24,8 @@ public class NativeFormatReader extends AbstractBinaryFormatReader {
2524
private int blockRowIndex;
2625

2726
public NativeFormatReader(InputStream inputStream, QuerySettings settings,
28-
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
29-
super(inputStream, settings, null, byteBufferPool);
27+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
28+
super(inputStream, settings, null, byteBufferAllocator);
3029
readNextRecord();
3130
}
3231

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
44
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
5-
import com.clickhouse.client.api.internal.BasicObjectsPool;
65
import com.clickhouse.client.api.metadata.TableSchema;
76
import com.clickhouse.client.api.query.QuerySettings;
87
import com.clickhouse.data.ClickHouseColumn;
@@ -15,8 +14,8 @@
1514
public class RowBinaryFormatReader extends AbstractBinaryFormatReader {
1615

1716
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
18-
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
19-
super(inputStream, querySettings, schema, byteBufferPool);
17+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
18+
super(inputStream, querySettings, schema, byteBufferAllocator);
2019
readNextRecord();
2120
}
2221

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
55
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
6-
import com.clickhouse.client.api.internal.BasicObjectsPool;
76
import com.clickhouse.client.api.metadata.TableSchema;
87
import com.clickhouse.client.api.query.QuerySettings;
9-
import com.clickhouse.data.ClickHouseColumn;
108

119
import java.io.EOFException;
1210
import java.io.IOException;
@@ -19,8 +17,8 @@
1917
public class RowBinaryWithNamesAndTypesFormatReader extends AbstractBinaryFormatReader implements Iterator<Map<String, Object>> {
2018

2119
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings,
22-
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
23-
super(inputStream, querySettings, null, byteBufferPool);
20+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
21+
super(inputStream, querySettings, null, byteBufferAllocator);
2422
readSchema();
2523
}
2624

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
44
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
5-
import com.clickhouse.client.api.internal.BasicObjectsPool;
65
import com.clickhouse.client.api.metadata.TableSchema;
76
import com.clickhouse.client.api.query.QuerySettings;
87

@@ -18,8 +17,8 @@ public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {
1817
private List<String> columns = null;
1918

2019
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
21-
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
22-
super(inputStream, querySettings, schema, byteBufferPool);
20+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
21+
super(inputStream, querySettings, schema, byteBufferAllocator);
2322
int nCol = 0;
2423
try {
2524
nCol = BinaryStreamReader.readVarInt(input);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
6060

6161
private volatile boolean hasNext = true;
6262

63-
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool;
64-
65-
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator;
66-
6763
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
68-
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
64+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
6965
this.input = inputStream;
7066
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
7167
boolean useServerTimeZone = (boolean) this.settings.get(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey());
@@ -74,8 +70,6 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
7470
if (timeZone == null) {
7571
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
7672
}
77-
this. byteBufferPool = byteBufferPool;
78-
this.byteBufferAllocator = byteBufferPool.lease();
7973
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator);
8074
setSchema(schema);
8175
}
@@ -182,9 +176,6 @@ public Map<String, Object> next() {
182176

183177
protected void endReached() {
184178
hasNext = false;
185-
BinaryStreamReader.ByteBufferAllocator allocator = this.byteBufferAllocator;
186-
this.byteBufferAllocator = null;
187-
byteBufferPool.release(allocator);
188179
}
189180

190181
protected void setSchema(TableSchema schema) {
@@ -636,10 +627,6 @@ public LocalDateTime getLocalDateTime(int index) {
636627

637628
@Override
638629
public void close() throws Exception {
639-
if (byteBufferAllocator != null) {
640-
byteBufferPool.release(byteBufferAllocator);
641-
byteBufferAllocator = null;
642-
}
643630
input.close();
644631
}
645632
}

0 commit comments

Comments
 (0)