Skip to content

Commit 50ffab2

Browse files
committed
optimized buffer allocations for lz4input stream
1 parent 57a0159 commit 50ffab2

File tree

4 files changed

+50
-78
lines changed

4 files changed

+50
-78
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4InputStream.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.io.EOFException;
1212
import java.io.IOException;
1313
import java.io.InputStream;
14-
import java.nio.ByteBuffer;
1514

1615
public class ClickHouseLZ4InputStream extends InputStream {
1716

@@ -20,7 +19,13 @@ public class ClickHouseLZ4InputStream extends InputStream {
2019

2120
private final InputStream in;
2221

23-
private ByteBuffer buffer;
22+
private byte[] outBuffer;
23+
24+
private int position = 0;
25+
26+
private int limit = 0;
27+
28+
private byte[] inBuffer;
2429

2530
private byte[] tmpBuffer = new byte[1];
2631

@@ -29,8 +34,8 @@ public ClickHouseLZ4InputStream(InputStream in, LZ4FastDecompressor decompressor
2934
super();
3035
this.decompressor = decompressor;
3136
this.in = in;
32-
this.buffer = ByteBuffer.allocate(bufferSize);
33-
this.buffer.limit(0);
37+
this.outBuffer = new byte[bufferSize];
38+
this.inBuffer = new byte[bufferSize];
3439
}
3540

3641
@Override
@@ -55,8 +60,9 @@ public int read(byte[] b, int off, int len) throws IOException {
5560

5661
int readBytes = 0;
5762
do {
58-
int remaining = Math.min(len - readBytes, buffer.remaining());
59-
buffer.get(b, off + readBytes, remaining);
63+
int remaining = Math.min(len - readBytes, limit - position);
64+
System.arraycopy(outBuffer, position, b, off + readBytes, remaining);
65+
position += remaining;
6066
readBytes += remaining;
6167
} while (readBytes < len && refill() != -1);
6268

@@ -109,30 +115,33 @@ private int refill() throws IOException {
109115
int uncompressedSize = getInt32(headerBuff, 21);
110116

111117
int offset = 9;
112-
final byte[] block = new byte[compressedSizeWithHeader];
113-
block[0] = MAGIC;
114-
setInt32(block, 1, compressedSizeWithHeader);
115-
setInt32(block, 5, uncompressedSize);
118+
if (inBuffer.length < compressedSizeWithHeader) {
119+
LOG.debug("compressed buff too small, new size: {}", compressedSizeWithHeader);
120+
}
121+
inBuffer = inBuffer.length >= compressedSizeWithHeader ? inBuffer : new byte[(int) (compressedSizeWithHeader * 1.5)];
122+
inBuffer[0] = MAGIC;
123+
setInt32(inBuffer, 1, compressedSizeWithHeader);
124+
setInt32(inBuffer, 5, uncompressedSize);
116125
// compressed data: compressed_size - 9 bytes
117126
int remaining = compressedSizeWithHeader - offset;
118127

119-
readFully = readFully(block, offset, remaining);
128+
readFully = readFully(inBuffer, offset, remaining);
120129
if (!readFully) {
121130
throw new EOFException("Unexpected end of stream");
122131
}
123132

124-
long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader);
133+
long[] real = ClickHouseCityHash.cityHash128(inBuffer, 0, compressedSizeWithHeader);
125134
if (real[0] != getInt64(headerBuff, 0) || real[1] != ClickHouseByteUtils.getInt64(headerBuff, 8)) {
126135
throw new ClientException("Corrupted stream: checksum mismatch");
127136
}
128137

129-
if (buffer.capacity() < uncompressedSize) {
130-
buffer = ByteBuffer.allocate(uncompressedSize);
131-
LOG.warn("Buffer size is too small, reallocate buffer with size: " + uncompressedSize);
138+
if (outBuffer.length < uncompressedSize) {
139+
outBuffer = new byte[uncompressedSize];
140+
LOG.debug("uncompressed buff too small, new size: {}", uncompressedSize);
132141
}
133-
decompressor.decompress(ByteBuffer.wrap(block), offset, buffer, 0, uncompressedSize);
134-
buffer.position(0);
135-
buffer.limit(uncompressedSize);
142+
decompressor.decompress(inBuffer, offset, outBuffer, 0, uncompressedSize);
143+
position = 0;
144+
limit = uncompressedSize;
136145
return uncompressedSize;
137146
}
138147

examples/demo-service/src/main/java/com/clickhouse/demo_service/DatasetController.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,6 @@ public void setup() {
6161
datasetQuerySchema = chDirectClient.getTableSchemaFromQuery(DATASET_QUERY);
6262
chDirectClient.register(VirtualDatasetRecord.class, datasetQuerySchema);
6363
log.info("Dataset schema: " + datasetQuerySchema.getColumns());
64-
65-
pool = new BasicObjectsPool<>(new ConcurrentLinkedDeque<>(), 100) {
66-
@Override
67-
ObjectsPreparedCollection<VirtualDatasetRecord> create() {
68-
return new ObjectsPreparedCollection<>(new LinkedList<>(), MAX_LIMIT) {
69-
@Override
70-
VirtualDatasetRecord create() {
71-
return new VirtualDatasetRecord();
72-
}
73-
};
74-
}
75-
};
7664
}
7765

7866
/**
@@ -124,7 +112,7 @@ public List<VirtualDatasetRecord> directDatasetFetch(@RequestParam(name = "limit
124112
response.getMetrics().getMetric(ClientMetrics.OP_DURATION).getLong(),
125113
TimeUnit.NANOSECONDS.toMillis(response.getServerTime())));
126114

127-
return result.stream().findFirst().stream().collect(Collectors.toCollection(ArrayList::new));
115+
return result;
128116
} catch (Exception e) {
129117
throw new RuntimeException("Failed to fetch dataset", e);
130118
}
@@ -195,6 +183,25 @@ public CalculationResult directDatasetReadToPojo(@RequestParam(name = "limit", r
195183
private CalculationResult readToPOJO(int limit, boolean cache) {
196184
final String query = DATASET_QUERY + " LIMIT " + limit;
197185
List<VirtualDatasetRecord> result = null;
186+
187+
if (cache) {
188+
synchronized (this) {
189+
if (pool == null) {
190+
pool = new BasicObjectsPool<>(new ConcurrentLinkedDeque<>(), 100) {
191+
@Override
192+
ObjectsPreparedCollection<VirtualDatasetRecord> create() {
193+
return new ObjectsPreparedCollection<>(new LinkedList<>(), MAX_LIMIT) {
194+
@Override
195+
VirtualDatasetRecord create() {
196+
return new VirtualDatasetRecord();
197+
}
198+
};
199+
}
200+
};
201+
}
202+
}
203+
}
204+
198205
Supplier<VirtualDatasetRecord> objectsPool = cache ? this.pool.lease()
199206
: VirtualDatasetRecord::new;
200207
try {

examples/demo-service/src/main/java/com/clickhouse/demo_service/DbConfiguration.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
public class DbConfiguration {
1212

1313
@Bean
14-
public Client chDirectClient(LoggingMeterRegistry loggingMeterRegistry, @Value("${db.url}") String dbUrl, @Value("${db.user}") String dbUser,
14+
public Client chDirectClient(@Value("${db.url}") String dbUrl, @Value("${db.user}") String dbUser,
1515
@Value("${db.pass}") String dbPassword) {
1616
return new Client.Builder()
1717
.addEndpoint(dbUrl)
@@ -21,13 +21,10 @@ public Client chDirectClient(LoggingMeterRegistry loggingMeterRegistry, @Value("
2121
// sets the maximum number of connections to the server at a time
2222
// this is important for services handling many concurrent requests to ClickHouse
2323
.setMaxConnections(100)
24-
.setLZ4UncompressedBufferSize(1058576)
25-
.setSocketRcvbuf(500_000)
2624
.setSocketTcpNodelay(true)
27-
.setSocketSndbuf(500_000)
28-
.setClientNetworkBufferSize(500_000)
25+
.setLZ4UncompressedBufferSize(1 * 1024 * 1024) // 1MB buffer size
26+
.setClientNetworkBufferSize(1 * 1024 * 1024) // 1MB buffer size
2927
.allowBinaryReaderToReuseBuffers(true) // using buffer pool for binary reader
30-
.registerClientMetrics(loggingMeterRegistry, "clickhouse-client-metrics")
3128
.build();
3229
}
3330
}

examples/demo-service/src/main/java/com/clickhouse/demo_service/MetricsConfig.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)