Skip to content

Commit d8009e4

Browse files
committed
fixed LZ4OutputStream to avoid compressing extra bytes when income buffer is empty
1 parent 69fe388 commit d8009e4

File tree

4 files changed

+24
-52
lines changed

4 files changed

+24
-52
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1501,7 +1501,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15011501

15021502
byte[] buffer = new byte[writeBufferSize];
15031503
int bytesRead;
1504-
while ((bytesRead = data.read(buffer)) != -1) {
1504+
while ((bytesRead = data.read(buffer)) > 0) {
15051505
out.write(buffer, 0, bytesRead);
15061506
}
15071507
out.close();

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4OutputStream.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,23 @@ public void write( byte[] b, int off, int len) throws IOException {
6565

6666
@Override
6767
public void flush() throws IOException {
68-
compressedBuffer.clear();
69-
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
70-
int uncompressedLen = buffer.position();
71-
buffer.flip();
72-
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
73-
compressedBuffer.remaining() - 25);
74-
int compressedSizeWithHeader = compressed + 9;
75-
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
76-
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 21, uncompressedLen); // uncompressed size
77-
long[] hash = ClickHouseCityHash.cityHash128(compressedBuffer.array(), 16, compressedSizeWithHeader);
78-
setInt64(compressedBuffer.array(), 0, hash[0]);
79-
setInt64(compressedBuffer.array(), 8, hash[1]);
80-
compressedBuffer.flip();
81-
out.write(compressedBuffer.array(), 0, compressed + 25);
82-
buffer.clear();
68+
if (buffer.position() > 0) {
69+
compressedBuffer.clear();
70+
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
71+
int uncompressedLen = buffer.position();
72+
buffer.flip();
73+
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
74+
compressedBuffer.remaining() - 25);
75+
int compressedSizeWithHeader = compressed + 9;
76+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
77+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 21, uncompressedLen); // uncompressed size
78+
long[] hash = ClickHouseCityHash.cityHash128(compressedBuffer.array(), 16, compressedSizeWithHeader);
79+
setInt64(compressedBuffer.array(), 0, hash[0]);
80+
setInt64(compressedBuffer.array(), 8, hash[1]);
81+
compressedBuffer.flip();
82+
out.write(compressedBuffer.array(), 0, compressed + 25);
83+
buffer.clear();
84+
}
8385
}
8486

8587

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import java.util.concurrent.TimeUnit;
7474
import java.util.function.Function;
7575

76+
import static com.clickhouse.client.api.ClientConfigProperties.SOCKET_TCP_NO_DELAY_OPT;
77+
7678
public class HttpAPIClientHelper {
7779
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
7880

@@ -235,6 +237,9 @@ public CloseableHttpClient createHttpClient() {
235237
soCfgBuilder::setSndBufSize);
236238
MapUtils.applyInt(chConfiguration, ClientConfigProperties.SOCKET_LINGER_OPT.getKey(),
237239
(v) -> soCfgBuilder.setSoLinger(v, TimeUnit.SECONDS));
240+
if (MapUtils.getFlag(chConfiguration, ClientConfigProperties.SOCKET_TCP_NO_DELAY_OPT.getKey(), false)) {
241+
soCfgBuilder.setTcpNoDelay(true);
242+
}
238243

239244
// Proxy
240245
String proxyHost = chConfiguration.get(ClientConfigProperties.PROXY_HOST.getKey());

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.clickhouse.client.api.query.QuerySettings;
2121
import com.clickhouse.data.ClickHouseFormat;
2222
import com.clickhouse.data.ClickHouseVersion;
23-
import com.clickhouse.data.stream.ByteArrayQueueInputStream;
2423
import org.apache.commons.lang3.StringEscapeUtils;
2524
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
2625
import org.testng.Assert;
@@ -38,9 +37,7 @@
3837
import java.util.ArrayList;
3938
import java.util.Arrays;
4039
import java.util.Collections;
41-
import java.util.LinkedList;
4240
import java.util.List;
43-
import java.util.Queue;
4441
import java.util.UUID;
4542
import java.util.concurrent.TimeUnit;
4643

@@ -70,7 +67,6 @@ public InsertTests(boolean useClientCompression, boolean useHttpCompression) {
7067
public void setUp() throws IOException {
7168
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
7269
int bufferSize = (7 * 65500);
73-
7470
client = new Client.Builder()
7571
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
7672
.setUsername("default")
@@ -81,6 +77,7 @@ public void setUp() throws IOException {
8177
.setSocketRcvbuf(bufferSize)
8278
.setClientNetworkBufferSize(bufferSize)
8379
.build();
80+
8481
settings = new InsertSettings()
8582
.setDeduplicationToken(RandomStringUtils.randomAlphabetic(36))
8683
.setQueryId(String.valueOf(UUID.randomUUID()));
@@ -233,38 +230,6 @@ public void insertRawData() throws Exception {
233230
assertEquals(records.size(), 1000);
234231
}
235232

236-
237-
@Test(groups = { "integration" }, enabled = true)
238-
public void insertRawDataQueued() throws Exception {
239-
final String tableName = "raw_data_table";
240-
final String createSQL = "CREATE TABLE " + tableName +
241-
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()";
242-
243-
initTable(tableName, createSQL);
244-
settings.setInputStreamCopyBufferSize(8198 * 2);
245-
settings.compressClientRequest(true);
246-
Queue<byte[]> queue = new LinkedList<>();
247-
ByteArrayQueueInputStream qIn = new ByteArrayQueueInputStream(queue, null);
248-
for (int i = 0; i < 10; i++) {
249-
if (i > 2 && i < 5) {
250-
queue.add(new byte[0]);
251-
} else {
252-
queue.add(String.format("{ \"Id\": %d, \"events_ts\": \"%s\", \"name\": \"%s\", \"p1\": \"%d\", \"p2\": \"%s\"}\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2").getBytes());
253-
}
254-
}
255-
InsertResponse response = client.insert(tableName, qIn,
256-
ClickHouseFormat.JSONEachRow, settings).get(30, TimeUnit.SECONDS);
257-
258-
assertEquals((int)response.getWrittenRows(), 10 );
259-
260-
List<GenericRecord> records = client.queryAll("SELECT * FROM " + tableName);
261-
assertEquals(records.size(), 10);
262-
for (GenericRecord record : records) {
263-
System.out.println(record.getString(1) + " " +record.getString(2) + " " +record.getString(3) + " " +record.getString(4) + " " +record.getString(5) + " ");
264-
}
265-
}
266-
267-
268233
@Test(groups = { "integration" }, enabled = true)
269234
public void insertRawDataSimple() throws Exception {
270235
insertRawDataSimple(1000);

0 commit comments

Comments
 (0)