Skip to content

Commit 7be8d90

Browse files
authored
Merge pull request #2000 from ClickHouse/v2_connection_problems_investigation
[client-v2] Fix ClickHouseLZ4OutputStream sending extra bytes when income buffer is empty
2 parents 95c580e + d8009e4 commit 7be8d90

File tree

6 files changed

+100
-20
lines changed

6 files changed

+100
-20
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1501,7 +1501,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15011501

15021502
byte[] buffer = new byte[writeBufferSize];
15031503
int bytesRead;
1504-
while ((bytesRead = data.read(buffer)) != -1) {
1504+
while ((bytesRead = data.read(buffer)) > 0) {
15051505
out.write(buffer, 0, bytesRead);
15061506
}
15071507
out.close();

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4OutputStream.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,23 @@ public void write( byte[] b, int off, int len) throws IOException {
6565

6666
@Override
6767
public void flush() throws IOException {
68-
compressedBuffer.clear();
69-
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
70-
int uncompressedLen = buffer.position();
71-
buffer.flip();
72-
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
73-
compressedBuffer.remaining() - 25);
74-
int compressedSizeWithHeader = compressed + 9;
75-
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
76-
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 21, uncompressedLen); // uncompressed size
77-
long[] hash = ClickHouseCityHash.cityHash128(compressedBuffer.array(), 16, compressedSizeWithHeader);
78-
setInt64(compressedBuffer.array(), 0, hash[0]);
79-
setInt64(compressedBuffer.array(), 8, hash[1]);
80-
compressedBuffer.flip();
81-
out.write(compressedBuffer.array(), 0, compressed + 25);
82-
buffer.clear();
68+
if (buffer.position() > 0) {
69+
compressedBuffer.clear();
70+
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
71+
int uncompressedLen = buffer.position();
72+
buffer.flip();
73+
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
74+
compressedBuffer.remaining() - 25);
75+
int compressedSizeWithHeader = compressed + 9;
76+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
77+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 21, uncompressedLen); // uncompressed size
78+
long[] hash = ClickHouseCityHash.cityHash128(compressedBuffer.array(), 16, compressedSizeWithHeader);
79+
setInt64(compressedBuffer.array(), 0, hash[0]);
80+
setInt64(compressedBuffer.array(), 8, hash[1]);
81+
compressedBuffer.flip();
82+
out.write(compressedBuffer.array(), 0, compressed + 25);
83+
buffer.clear();
84+
}
8385
}
8486

8587

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import java.util.concurrent.TimeUnit;
7474
import java.util.function.Function;
7575

76+
import static com.clickhouse.client.api.ClientConfigProperties.SOCKET_TCP_NO_DELAY_OPT;
77+
7678
public class HttpAPIClientHelper {
7779
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
7880

@@ -235,6 +237,9 @@ public CloseableHttpClient createHttpClient() {
235237
soCfgBuilder::setSndBufSize);
236238
MapUtils.applyInt(chConfiguration, ClientConfigProperties.SOCKET_LINGER_OPT.getKey(),
237239
(v) -> soCfgBuilder.setSoLinger(v, TimeUnit.SECONDS));
240+
if (MapUtils.getFlag(chConfiguration, ClientConfigProperties.SOCKET_TCP_NO_DELAY_OPT.getKey(), false)) {
241+
soCfgBuilder.setTcpNoDelay(true);
242+
}
238243

239244
// Proxy
240245
String proxyHost = chConfiguration.get(ClientConfigProperties.PROXY_HOST.getKey());

client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public void testSecureConnection() {
223223
.setUsername("default")
224224
.setPassword("")
225225
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
226-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
226+
.compressClientRequest(true)
227227
.build()) {
228228

229229
List<GenericRecord> records = client.queryAll("SELECT timezone()");
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,77 @@
11
package com.clickhouse.client.insert;
22

3+
import com.clickhouse.client.ClickHouseNode;
4+
import com.clickhouse.client.ClickHouseProtocol;
5+
import com.clickhouse.client.api.Client;
6+
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
7+
import com.clickhouse.client.api.insert.InsertResponse;
8+
import com.clickhouse.client.api.insert.InsertSettings;
9+
import com.clickhouse.client.api.query.GenericRecord;
10+
import com.clickhouse.client.api.query.QueryResponse;
11+
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
12+
import org.testng.Assert;
13+
import org.testng.annotations.Test;
14+
15+
import java.util.Collections;
16+
import java.util.List;
17+
import java.util.UUID;
18+
import java.util.concurrent.TimeUnit;
19+
320
public class InsertClientContentCompressionTests extends InsertTests {
421
public InsertClientContentCompressionTests() {
522
super(true, false);
623
}
24+
25+
26+
@Test(groups = { "integration" })
27+
public void testInsertAndReadBackWithSecureConnection() {
28+
ClickHouseNode secureServer = getSecureServer(ClickHouseProtocol.HTTP);
29+
30+
try (Client client = new Client.Builder()
31+
.addEndpoint("https://localhost:" + secureServer.getPort())
32+
.setUsername("default")
33+
.setPassword("")
34+
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
35+
.compressClientRequest(true)
36+
.build()) {
37+
final String tableName = "single_pojo_table";
38+
final String createSQL = SamplePOJO.generateTableCreateSQL(tableName);
39+
final SamplePOJO pojo = new SamplePOJO();
40+
41+
initTable(tableName, createSQL);
42+
43+
client.register(SamplePOJO.class, client.getTableSchema(tableName, "default"));
44+
InsertSettings settings = new InsertSettings()
45+
.setDeduplicationToken(RandomStringUtils.randomAlphabetic(36))
46+
.setQueryId(String.valueOf(UUID.randomUUID()));
47+
System.out.println("Inserting POJO: " + pojo);
48+
try (InsertResponse response = client.insert(tableName, Collections.singletonList(pojo), settings).get(10, TimeUnit.SECONDS)) {
49+
Assert.assertEquals(response.getWrittenRows(), 1);
50+
}
51+
52+
try (QueryResponse queryResponse =
53+
client.query("SELECT * FROM " + tableName + " LIMIT 1").get(10, TimeUnit.SECONDS)) {
54+
55+
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(queryResponse);
56+
Assert.assertNotNull(reader.next());
57+
58+
Assert.assertEquals(reader.getByte("byteValue"), pojo.getByteValue());
59+
Assert.assertEquals(reader.getByte("int8"), pojo.getInt8());
60+
Assert.assertEquals(reader.getShort("uint8"), pojo.getUint8());
61+
Assert.assertEquals(reader.getShort("int16"), pojo.getInt16());
62+
Assert.assertEquals(reader.getInteger("int32"), pojo.getInt32());
63+
Assert.assertEquals(reader.getLong("int64"), pojo.getInt64());
64+
Assert.assertEquals(reader.getFloat("float32"), pojo.getFloat32());
65+
Assert.assertEquals(reader.getDouble("float64"), pojo.getFloat64());
66+
Assert.assertEquals(reader.getString("string"), pojo.getString());
67+
Assert.assertEquals(reader.getString("fixedString"), pojo.getFixedString());
68+
}
69+
List<GenericRecord> records = client.queryAll("SELECT timezone()");
70+
Assert.assertTrue(records.size() > 0);
71+
Assert.assertEquals(records.get(0).getString(1), "UTC");
72+
} catch (Exception e) {
73+
e.printStackTrace();
74+
Assert.fail(e.getMessage());
75+
}
76+
}
777
}

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,18 @@ public InsertTests(boolean useClientCompression, boolean useHttpCompression) {
6666
@BeforeMethod(groups = { "integration" })
6767
public void setUp() throws IOException {
6868
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
69+
int bufferSize = (7 * 65500);
6970
client = new Client.Builder()
7071
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
7172
.setUsername("default")
7273
.setPassword("")
73-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
7474
.compressClientRequest(useClientCompression)
7575
.useHttpCompression(useHttpCompression)
76+
.setSocketSndbuf(bufferSize)
77+
.setSocketRcvbuf(bufferSize)
78+
.setClientNetworkBufferSize(bufferSize)
7679
.build();
80+
7781
settings = new InsertSettings()
7882
.setDeduplicationToken(RandomStringUtils.randomAlphabetic(36))
7983
.setQueryId(String.valueOf(UUID.randomUUID()));
@@ -226,7 +230,6 @@ public void insertRawData() throws Exception {
226230
assertEquals(records.size(), 1000);
227231
}
228232

229-
230233
@Test(groups = { "integration" }, enabled = true)
231234
public void insertRawDataSimple() throws Exception {
232235
insertRawDataSimple(1000);
@@ -379,7 +382,7 @@ public static Object[] logCommentDataProvider() {
379382
};
380383
}
381384

382-
private void initTable(String tableName, String createTableSQL) throws Exception {
385+
protected void initTable(String tableName, String createTableSQL) throws Exception {
383386
client.execute("DROP TABLE IF EXISTS " + tableName).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS);
384387
client.execute(createTableSQL).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS);
385388
}

0 commit comments

Comments
 (0)