Skip to content

Commit 4e56e82

Browse files
committed
added two levels of byte buffer caching
1 parent de49bb8 commit 4e56e82

File tree

14 files changed

+218
-101
lines changed

14 files changed

+218
-101
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
1212
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
1313
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
14+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
1415
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
1516
import com.clickhouse.client.api.data_formats.internal.ProcessParser;
1617
import com.clickhouse.client.api.enums.Protocol;
@@ -20,6 +21,7 @@
2021
import com.clickhouse.client.api.insert.InsertSettings;
2122
import com.clickhouse.client.api.insert.POJOSerializer;
2223
import com.clickhouse.client.api.insert.SerializerNotFoundException;
24+
import com.clickhouse.client.api.internal.BasicObjectsPool;
2325
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
2426
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
2527
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
@@ -68,6 +70,7 @@
6870
import java.util.HashMap;
6971
import java.util.HashSet;
7072
import java.util.LinkedHashMap;
73+
import java.util.LinkedList;
7174
import java.util.List;
7275
import java.util.Map;
7376
import java.util.Set;
@@ -133,6 +136,8 @@ public class Client implements AutoCloseable {
133136

134137
private ClickHouseClient oldClient = null;
135138

139+
private BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool;
140+
136141
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
137142
ExecutorService sharedOperationExecutor) {
138143
this.endpoints = endpoints;
@@ -158,6 +163,15 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
158163
this.oldClient = ClientV1AdaptorHelper.createClient(configuration);
159164
LOG.info("Using old http client implementation");
160165
}
166+
167+
this.byteBufferPool = new BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator>(
168+
new LinkedList<>(), 100
169+
) {
170+
@Override
171+
protected BinaryStreamReader.ByteBufferAllocator create() {
172+
return new BinaryStreamReader.CachingByteBufferAllocator();
173+
}
174+
};
161175
}
162176

163177
/**
@@ -1445,7 +1459,7 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
14451459
final QuerySettings finalSettings = settings;
14461460
return query(sqlQuery, settings).thenApply(response -> {
14471461
try {
1448-
return new Records(response, finalSettings);
1462+
return new Records(response, finalSettings, byteBufferPool);
14491463
} catch (Exception e) {
14501464
throw new ClientException("Failed to get query response", e);
14511465
}
@@ -1468,7 +1482,8 @@ public List<GenericRecord> queryAll(String sqlQuery) {
14681482
List<GenericRecord> records = new ArrayList<>();
14691483
if (response.getResultRows() > 0) {
14701484
RowBinaryWithNamesAndTypesFormatReader reader =
1471-
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1485+
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(),
1486+
byteBufferPool);
14721487

14731488
Map<String, Object> record;
14741489
while (reader.readRecord((record = new LinkedHashMap<>()))) {
@@ -1569,28 +1584,32 @@ public CompletableFuture<CommandResponse> execute(String sql) {
15691584
* @param schema
15701585
* @return
15711586
*/
1572-
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
1587+
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
15731588
ClickHouseBinaryFormatReader reader = null;
15741589
switch (response.getFormat()) {
15751590
case Native:
1576-
reader = new NativeFormatReader(response.getInputStream(), response.getSettings());
1591+
reader = new NativeFormatReader(response.getInputStream(), response.getSettings(),
1592+
byteBufferPool);
15771593
break;
15781594
case RowBinaryWithNamesAndTypes:
1579-
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1595+
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(),
1596+
byteBufferPool);
15801597
break;
15811598
case RowBinaryWithNames:
1582-
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema);
1599+
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema,
1600+
byteBufferPool);
15831601
break;
15841602
case RowBinary:
1585-
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema);
1603+
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema,
1604+
byteBufferPool);
15861605
break;
15871606
default:
15881607
throw new IllegalArgumentException("Unsupported format: " + response.getFormat());
15891608
}
15901609
return reader;
15911610
}
15921611

1593-
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
1612+
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
15941613
return newBinaryFormatReader(response, null);
15951614
}
15961615

client-v2/src/main/java/com/clickhouse/client/api/data_formats/ClickHouseBinaryFormatReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.Map;
2020
import java.util.UUID;
2121

22-
public interface ClickHouseBinaryFormatReader {
22+
public interface ClickHouseBinaryFormatReader extends AutoCloseable {
2323

2424
/**
2525
* Reads a single value from the stream.

client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
44
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
5+
import com.clickhouse.client.api.internal.BasicObjectsPool;
56
import com.clickhouse.client.api.query.QuerySettings;
67
import com.clickhouse.data.ClickHouseColumn;
78

@@ -23,12 +24,9 @@ public class NativeFormatReader extends AbstractBinaryFormatReader {
2324

2425
private int blockRowIndex;
2526

26-
public NativeFormatReader(InputStream inputStream) {
27-
this(inputStream, null);
28-
}
29-
30-
public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
31-
super(inputStream, settings, null);
27+
public NativeFormatReader(InputStream inputStream, QuerySettings settings,
28+
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
29+
super(inputStream, settings, null, byteBufferPool);
3230
readNextRecord();
3331
}
3432

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatReader.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.clickhouse.client.api.data_formats;
22

33
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
4+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
5+
import com.clickhouse.client.api.internal.BasicObjectsPool;
46
import com.clickhouse.client.api.metadata.TableSchema;
57
import com.clickhouse.client.api.query.QuerySettings;
68
import com.clickhouse.data.ClickHouseColumn;
@@ -12,12 +14,9 @@
1214

1315
public class RowBinaryFormatReader extends AbstractBinaryFormatReader {
1416

15-
public RowBinaryFormatReader(InputStream inputStream, TableSchema schema) {
16-
this(inputStream, null, schema);
17-
}
18-
19-
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
20-
super(inputStream, querySettings, schema);
17+
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
18+
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
19+
super(inputStream, querySettings, schema, byteBufferPool);
2120
readNextRecord();
2221
}
2322

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
55
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
6+
import com.clickhouse.client.api.internal.BasicObjectsPool;
67
import com.clickhouse.client.api.metadata.TableSchema;
78
import com.clickhouse.client.api.query.QuerySettings;
89
import com.clickhouse.data.ClickHouseColumn;
@@ -17,8 +18,9 @@
1718

1819
public class RowBinaryWithNamesAndTypesFormatReader extends AbstractBinaryFormatReader implements Iterator<Map<String, Object>> {
1920

20-
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings) {
21-
super(inputStream, querySettings, null);
21+
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings,
22+
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
23+
super(inputStream, querySettings, null, byteBufferPool);
2224
readSchema();
2325
}
2426

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,24 @@
22

33
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
44
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
5+
import com.clickhouse.client.api.internal.BasicObjectsPool;
56
import com.clickhouse.client.api.metadata.TableSchema;
67
import com.clickhouse.client.api.query.QuerySettings;
7-
import com.clickhouse.data.ClickHouseColumn;
88

99
import java.io.EOFException;
1010
import java.io.IOException;
1111
import java.io.InputStream;
1212
import java.util.ArrayList;
1313
import java.util.Collections;
1414
import java.util.List;
15-
import java.util.Map;
1615

1716
public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {
1817

1918
private List<String> columns = null;
2019

21-
public RowBinaryWithNamesFormatReader(InputStream inputStream, TableSchema schema) {
22-
this(inputStream, null, schema);
23-
readNextRecord();
24-
}
25-
26-
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
27-
super(inputStream, querySettings, schema);
20+
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
21+
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
22+
super(inputStream, querySettings, schema, byteBufferPool);
2823
int nCol = 0;
2924
try {
3025
nCol = BinaryStreamReader.readVarInt(input);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
5+
import com.clickhouse.client.api.internal.BasicObjectsPool;
56
import com.clickhouse.client.api.internal.MapUtils;
67
import com.clickhouse.client.api.metadata.TableSchema;
78
import com.clickhouse.client.api.query.NullValueException;
@@ -41,6 +42,7 @@
4142
import java.util.concurrent.ConcurrentHashMap;
4243
import java.util.concurrent.atomic.AtomicBoolean;
4344
import java.util.function.Function;
45+
import java.util.function.Supplier;
4446

4547
public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {
4648

@@ -58,7 +60,12 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
5860

5961
private volatile boolean hasNext = true;
6062

61-
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
63+
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool;
64+
65+
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator;
66+
67+
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
68+
BasicObjectsPool<BinaryStreamReader.ByteBufferAllocator> byteBufferPool) {
6269
this.input = inputStream;
6370
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
6471
boolean useServerTimeZone = (boolean) this.settings.get(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey());
@@ -67,7 +74,9 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
6774
if (timeZone == null) {
6875
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
6976
}
70-
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG);
77+
this. byteBufferPool = byteBufferPool;
78+
this.byteBufferAllocator = byteBufferPool.lease();
79+
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator);
7180
setSchema(schema);
7281
}
7382

@@ -133,12 +142,12 @@ protected void readNextRecord() {
133142
try {
134143
nextRecordEmpty.set(true);
135144
if (!readRecord(nextRecord)) {
136-
hasNext = false;
145+
endReached();
137146
} else {
138147
nextRecordEmpty.compareAndSet(true, false);
139148
}
140149
} catch (IOException e) {
141-
hasNext = false;
150+
endReached();
142151
throw new ClientException("Failed to read next row", e);
143152
}
144153
}
@@ -165,14 +174,17 @@ public Map<String, Object> next() {
165174
return null;
166175
}
167176
} catch (IOException e) {
168-
hasNext = false;
177+
endReached();
169178
throw new ClientException("Failed to read row", e);
170179
}
171180
}
172181
}
173182

174183
protected void endReached() {
175184
hasNext = false;
185+
BinaryStreamReader.ByteBufferAllocator allocator = this.byteBufferAllocator;
186+
this.byteBufferAllocator = null;
187+
byteBufferPool.release(allocator);
176188
}
177189

178190
protected void setSchema(TableSchema schema) {
@@ -621,4 +633,13 @@ public LocalDateTime getLocalDateTime(int index) {
621633
}
622634
return (LocalDateTime) value;
623635
}
636+
637+
@Override
638+
public void close() throws Exception {
639+
if (byteBufferAllocator != null) {
640+
byteBufferPool.release(byteBufferAllocator);
641+
byteBufferAllocator = null;
642+
}
643+
input.close();
644+
}
624645
}

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,17 @@ public class BinaryStreamReader {
3535

3636
private final TimeZone timeZone;
3737

38+
private ByteBufferAllocator bufferAllocator;
39+
3840
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log) {
41+
this(input, timeZone, log, new DefaultByteBufferAllocator());
42+
}
43+
44+
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator) {
3945
this.log = log == null ? NOPLogger.NOP_LOGGER : log;
4046
this.timeZone = timeZone;
4147
this.input = input;
48+
this.bufferAllocator = bufferAllocator;
4249
}
4350

4451
public <T> T readValue(ClickHouseColumn column) throws IOException {
@@ -646,4 +653,36 @@ private static int readByteOrEOF(InputStream input) throws IOException {
646653
}
647654
return b;
648655
}
656+
657+
public interface ByteBufferAllocator {
658+
byte[] allocate(int size);
659+
}
660+
661+
public static class DefaultByteBufferAllocator implements ByteBufferAllocator {
662+
@Override
663+
public byte[] allocate(int size) {
664+
return new byte[size];
665+
}
666+
}
667+
668+
public static class CachingByteBufferAllocator implements ByteBufferAllocator {
669+
670+
private static final int MAX_PREALLOCATED_SIZE = 32;
671+
private final byte[][] preallocated = new byte[MAX_PREALLOCATED_SIZE + 1][];
672+
673+
public CachingByteBufferAllocator() {
674+
for (int i = 0; i < preallocated.length; i++) {
675+
preallocated[i] = new byte[i];
676+
}
677+
}
678+
679+
@Override
680+
public byte[] allocate(int size) {
681+
if (size < preallocated.length) {
682+
return preallocated[size];
683+
}
684+
685+
return new byte[size];
686+
}
687+
}
649688
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import java.util.Deque;
4+
5+
public abstract class BasicObjectsPool<T> {
6+
7+
8+
private Deque<T> objects;
9+
10+
BasicObjectsPool(Deque<T> objects) {
11+
this(objects, 0);
12+
}
13+
14+
public BasicObjectsPool(Deque<T> objects, int size) {
15+
this.objects = objects;
16+
for (int i = 0; i < size; i++) {
17+
this.objects.add(create());
18+
}
19+
}
20+
21+
public T lease() {
22+
T obj = objects.poll();
23+
return obj != null ? obj : create();
24+
}
25+
26+
public void release(T obj) {
27+
objects.addFirst(obj);
28+
}
29+
30+
protected abstract T create();
31+
}

0 commit comments

Comments
 (0)