Skip to content

Commit 762d78a

Browse files
authored
Merge pull request #1791 from ClickHouse/fix_compression_issue
[client-v2] Fix compression issue when concurrent requests
2 parents d1c90cd + 6298b1e commit 762d78a

File tree

4 files changed

+50
-17
lines changed

4 files changed

+50
-17
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4InputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public int read(byte[] b, int off, int len) throws IOException {
6767
static final byte MAGIC = (byte) 0x82;
6868
static final int HEADER_LENGTH = 25;
6969

70-
static final byte[] headerBuff = new byte[HEADER_LENGTH];
70+
final byte[] headerBuff = new byte[HEADER_LENGTH];
7171

7272
/**
7373
* Method ensures to read all bytes from the input stream.

client-v2/src/test/java/com/clickhouse/client/query/QueryServerContentCompressionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
public class QueryServerContentCompressionTests extends QueryTests {
44
static {
5-
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
5+
// System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
66
}
77
QueryServerContentCompressionTests() {
88
super(true, false);

client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,17 @@
6666
import java.util.Properties;
6767
import java.util.Random;
6868
import java.util.UUID;
69+
import java.util.concurrent.CountDownLatch;
6970
import java.util.concurrent.ExecutionException;
71+
import java.util.concurrent.ExecutorService;
72+
import java.util.concurrent.Executors;
7073
import java.util.concurrent.Future;
7174
import java.util.concurrent.TimeUnit;
7275
import java.util.function.Consumer;
7376
import java.util.function.Function;
7477
import java.util.function.Supplier;
7578
import java.util.stream.BaseStream;
79+
import java.util.stream.IntStream;
7680

7781
public class QueryTests extends BaseIntegrationTest {
7882

@@ -1281,32 +1285,62 @@ public void testClientUseOwnTimeZone() {
12811285

12821286
@Test
12831287
public void testAsyncQuery() {
1284-
try (Client client = newClient().useAsyncRequests(true).build();
1285-
QueryResponse response =
1288+
try (Client client = newClient().useAsyncRequests(true).build()){
1289+
simpleRequest(client);
1290+
} catch (Exception e) {
1291+
Assert.fail("Failed to get server time zone from header", e);
1292+
}
1293+
}
1294+
1295+
protected void simpleRequest(Client client) throws Exception {
1296+
try (QueryResponse response =
12861297
client.query("SELECT number FROM system.numbers LIMIT 1000_000").get(1, TimeUnit.SECONDS)) {
1287-
ClickHouseBinaryFormatReader reader =
1288-
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1298+
ClickHouseBinaryFormatReader reader =
1299+
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
12891300

1290-
int count = 0;
1291-
while (reader.hasNext()) {
1292-
reader.next();
1293-
count++;
1294-
}
1301+
int count = 0;
1302+
while (reader.hasNext()) {
1303+
reader.next();
1304+
count++;
1305+
}
12951306

1296-
Assert.assertEquals(count, 1000_000);
1297-
} catch (Exception e) {
1298-
Assert.fail("Failed to get server time zone from header", e);
1307+
Assert.assertEquals(count, 1000_000);
12991308
}
13001309
}
13011310

1302-
private Client.Builder newClient() {
1311+
@Test
1312+
public void testConcurrentQueries() throws Exception{
1313+
final Client client = newClient().build();
1314+
final int concurrency = 10;
1315+
CountDownLatch latch = new CountDownLatch(concurrency);
1316+
Runnable task = () -> {
1317+
try {
1318+
simpleRequest(client);
1319+
} catch (Exception e) {
1320+
e.printStackTrace();
1321+
Assert.fail("Failed", e);
1322+
} finally {
1323+
latch.countDown();
1324+
}
1325+
};
1326+
1327+
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
1328+
IntStream.range(0,concurrency).forEach(i -> executor.submit(task));
1329+
executor.shutdown();
1330+
executor.awaitTermination(10, TimeUnit.SECONDS);
1331+
latch.await();
1332+
Assert.assertEquals(latch.getCount(), 0);
1333+
}
1334+
1335+
protected Client.Builder newClient() {
13031336
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
13041337
return new Client.Builder()
13051338
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
13061339
.setUsername("default")
13071340
.setPassword("")
13081341
.compressClientRequest(false)
1309-
.compressServerResponse(false)
1342+
.compressServerResponse(true)
1343+
.useHttpCompression(useHttpCompression)
13101344
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"));
13111345
}
13121346
}

examples/demo-service/src/main/java/com/clickhouse/demo_service/DbConfiguration.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ public Client chDirectClient(@Value("${db.url}") String dbUrl, @Value("${db.user
1717
.addEndpoint(dbUrl)
1818
.setUsername(dbUser)
1919
.setPassword(dbPassword)
20-
.compressServerResponse(false)
2120
.useNewImplementation(true) // using new transport layer implementation
2221

2322
// sets the maximum number of connections to the server at a time

0 commit comments

Comments
 (0)