Skip to content

Commit 6298b1e

Browse files
committed
fix compression by removing static buffer from the flow
1 parent bc1f50d commit 6298b1e

File tree

4 files changed

+50
-17
lines changed

4 files changed

+50
-17
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4InputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public int read(byte[] b, int off, int len) throws IOException {
6767
static final byte MAGIC = (byte) 0x82;
6868
static final int HEADER_LENGTH = 25;
6969

70-
static final byte[] headerBuff = new byte[HEADER_LENGTH];
70+
final byte[] headerBuff = new byte[HEADER_LENGTH];
7171

7272
/**
7373
* Method ensures to read all bytes from the input stream.

client-v2/src/test/java/com/clickhouse/client/query/QueryServerContentCompressionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
public class QueryServerContentCompressionTests extends QueryTests {
44
static {
5-
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
5+
// System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
66
}
77
QueryServerContentCompressionTests() {
88
super(true, false);

client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,17 @@
6666
import java.util.Properties;
6767
import java.util.Random;
6868
import java.util.UUID;
69+
import java.util.concurrent.CountDownLatch;
6970
import java.util.concurrent.ExecutionException;
71+
import java.util.concurrent.ExecutorService;
72+
import java.util.concurrent.Executors;
7073
import java.util.concurrent.Future;
7174
import java.util.concurrent.TimeUnit;
7275
import java.util.function.Consumer;
7376
import java.util.function.Function;
7477
import java.util.function.Supplier;
7578
import java.util.stream.BaseStream;
79+
import java.util.stream.IntStream;
7680

7781
public class QueryTests extends BaseIntegrationTest {
7882

@@ -1264,32 +1268,62 @@ public void testClientUseOwnTimeZone() {
12641268

12651269
@Test
12661270
public void testAsyncQuery() {
1267-
try (Client client = newClient().useAsyncRequests(true).build();
1268-
QueryResponse response =
1271+
try (Client client = newClient().useAsyncRequests(true).build()){
1272+
simpleRequest(client);
1273+
} catch (Exception e) {
1274+
Assert.fail("Failed to get server time zone from header", e);
1275+
}
1276+
}
1277+
1278+
protected void simpleRequest(Client client) throws Exception {
1279+
try (QueryResponse response =
12691280
client.query("SELECT number FROM system.numbers LIMIT 1000_000").get(1, TimeUnit.SECONDS)) {
1270-
ClickHouseBinaryFormatReader reader =
1271-
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1281+
ClickHouseBinaryFormatReader reader =
1282+
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
12721283

1273-
int count = 0;
1274-
while (reader.hasNext()) {
1275-
reader.next();
1276-
count++;
1277-
}
1284+
int count = 0;
1285+
while (reader.hasNext()) {
1286+
reader.next();
1287+
count++;
1288+
}
12781289

1279-
Assert.assertEquals(count, 1000_000);
1280-
} catch (Exception e) {
1281-
Assert.fail("Failed to get server time zone from header", e);
1290+
Assert.assertEquals(count, 1000_000);
12821291
}
12831292
}
12841293

1285-
private Client.Builder newClient() {
1294+
@Test
1295+
public void testConcurrentQueries() throws Exception{
1296+
final Client client = newClient().build();
1297+
final int concurrency = 10;
1298+
CountDownLatch latch = new CountDownLatch(concurrency);
1299+
Runnable task = () -> {
1300+
try {
1301+
simpleRequest(client);
1302+
} catch (Exception e) {
1303+
e.printStackTrace();
1304+
Assert.fail("Failed", e);
1305+
} finally {
1306+
latch.countDown();
1307+
}
1308+
};
1309+
1310+
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
1311+
IntStream.range(0,concurrency).forEach(i -> executor.submit(task));
1312+
executor.shutdown();
1313+
executor.awaitTermination(10, TimeUnit.SECONDS);
1314+
latch.await();
1315+
Assert.assertEquals(latch.getCount(), 0);
1316+
}
1317+
1318+
protected Client.Builder newClient() {
12861319
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
12871320
return new Client.Builder()
12881321
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
12891322
.setUsername("default")
12901323
.setPassword("")
12911324
.compressClientRequest(false)
1292-
.compressServerResponse(false)
1325+
.compressServerResponse(true)
1326+
.useHttpCompression(useHttpCompression)
12931327
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"));
12941328
}
12951329
}

examples/demo-service/src/main/java/com/clickhouse/demo_service/DbConfiguration.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ public Client chDirectClient(@Value("${db.url}") String dbUrl, @Value("${db.user
1717
.addEndpoint(dbUrl)
1818
.setUsername(dbUser)
1919
.setPassword(dbPassword)
20-
.compressServerResponse(false)
2120
.useNewImplementation(true) // using new transport layer implementation
2221

2322
// sets the maximum number of connections to the server at a time

0 commit comments

Comments
 (0)