Skip to content

Commit c49d5a2

Browse files
authored
Merge pull request #1784 from ClickHouse/clientv2_perf_record_map_optimization
[client-v2] Add property for network buffer size
2 parents 36b8823 + 6412408 commit c49d5a2

File tree

4 files changed

+213
-0
lines changed

4 files changed

+213
-0
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,19 @@ public Builder setSharedOperationExecutor(ExecutorService executorService) {
714714
return this;
715715
}
716716

717+
/**
718+
* Set size of a buffers that are used to read/write data from the server. It is mainly used to copy data from
719+
* a socket to application memory and visa-versa. Setting is applied for both read and write operations.
720+
* Default is 8192 bytes.
721+
*
722+
* @param size - size in bytes
723+
* @return
724+
*/
725+
public Builder setClientNetworkBufferSize(int size) {
726+
this.configuration.put("client_network_buffer_size", String.valueOf(size));
727+
return this;
728+
}
729+
717730
public Client build() {
718731
this.configuration = setDefaults(this.configuration);
719732

@@ -816,6 +829,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
816829
userConfig.put("connection_ttl", "-1");
817830
}
818831

832+
if (!userConfig.containsKey("client_network_buffer_size")) {
833+
setClientNetworkBufferSize(8192);
834+
}
835+
819836
return userConfig;
820837
}
821838
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1919
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
2020
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
21+
import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory;
2122
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
2223
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
2324
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -33,7 +34,10 @@
3334
import org.apache.hc.core5.http.HttpHost;
3435
import org.apache.hc.core5.http.HttpStatus;
3536
import org.apache.hc.core5.http.NoHttpResponseException;
37+
import org.apache.hc.core5.http.config.CharCodingConfig;
38+
import org.apache.hc.core5.http.config.Http1Config;
3639
import org.apache.hc.core5.http.config.RegistryBuilder;
40+
import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory;
3741
import org.apache.hc.core5.http.io.SocketConfig;
3842
import org.apache.hc.core5.http.io.entity.EntityTemplate;
3943
import org.apache.hc.core5.io.IOCallback;
@@ -161,6 +165,7 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext,
161165
PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create()
162166
.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX);
163167

168+
164169
ConnectionReuseStrategy connectionReuseStrategy =
165170
ConnectionReuseStrategy.valueOf(chConfiguration.get("connection_reuse_strategy"));
166171
switch (connectionReuseStrategy) {
@@ -181,6 +186,15 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext,
181186
connMgrBuilder::setMaxConnPerRoute);
182187

183188

189+
int networkBufferSize = MapUtils.getInt(chConfiguration, "client_network_buffer_size");
190+
ManagedHttpClientConnectionFactory connectionFactory = new ManagedHttpClientConnectionFactory(
191+
Http1Config.custom()
192+
.setBufferSize(networkBufferSize)
193+
.build(),
194+
CharCodingConfig.DEFAULT,
195+
DefaultHttpResponseParserFactory.INSTANCE);
196+
197+
connMgrBuilder.setConnectionFactory(connectionFactory);
184198
connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext));
185199
connMgrBuilder.setDefaultSocketConfig(socketConfig);
186200
return connMgrBuilder.build();
@@ -272,6 +286,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
272286
throw new RuntimeException(e);
273287
}
274288
HttpPost req = new HttpPost(uri);
289+
// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
275290
addHeaders(req, chConfiguration, requestConfig);
276291

277292
RequestConfig httpReqConfig = RequestConfig.copy(baseRequestConfig)
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package com.clickhouse.examples.client_v2;
2+
3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
5+
import com.clickhouse.client.api.metrics.ClientMetrics;
6+
import com.clickhouse.client.api.query.QueryResponse;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
import java.util.ArrayList;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.SynchronousQueue;
12+
import java.util.concurrent.ThreadPoolExecutor;
13+
import java.util.concurrent.TimeUnit;
14+
15+
@Slf4j
16+
public class BigDatasetExamples {
17+
18+
private final String endpoint;
19+
private final String user;
20+
private final String password;
21+
private final String database;
22+
23+
public BigDatasetExamples(String endpoint, String user, String password, String database) {
24+
this.endpoint = endpoint;
25+
this.user = user;
26+
this.password = password;
27+
this.database = database;
28+
}
29+
30+
/**
31+
* Reads {@code system.numbers} table into a result set of numbers of different types.
32+
*
33+
*/
34+
void readBigSetOfNumbers(int limit, int iterations, int concurrency) {
35+
Client client = new Client.Builder()
36+
.addEndpoint(endpoint)
37+
.setUsername(user)
38+
.setPassword(password)
39+
.setDefaultDatabase(database)
40+
.compressServerResponse(false)
41+
.compressClientRequest(false)
42+
.setLZ4UncompressedBufferSize(1048576)
43+
.useNewImplementation(true)
44+
// when network buffer and socket buffer are the same size - it is less IO calls and more efficient
45+
.setSocketRcvbuf(1_000_000)
46+
.setClientNetworkBufferSize(1_000_000)
47+
.setMaxConnections(20)
48+
.build();
49+
try {
50+
client.ping(10); // warmup connections pool. required once per client.
51+
52+
Runnable task = () -> {
53+
StringBuilder sb = new StringBuilder();
54+
55+
for (int i = 0; i < iterations; i++) {
56+
try {
57+
long[] stats = doReadNumbersSet(client, limit);
58+
for (long stat : stats) {
59+
sb.append(stat).append(", ");
60+
}
61+
sb.append("\n");
62+
} catch (Exception e) {
63+
log.error("Failed to read dataset", e);
64+
}
65+
}
66+
67+
System.out.print(sb.toString());
68+
};
69+
70+
System.out.println("Records, Read Time, Request Time, Server Time");
71+
if (concurrency == 1) {
72+
task.run();
73+
} else {
74+
ExecutorService executor = new ThreadPoolExecutor(concurrency, Integer.MAX_VALUE,
75+
60L, TimeUnit.SECONDS,
76+
new SynchronousQueue<Runnable>());
77+
78+
for (int i = 0; i < concurrency; i++) {
79+
executor.submit(task);
80+
}
81+
82+
executor.shutdown();
83+
executor.awaitTermination(3, TimeUnit.MINUTES);
84+
}
85+
} catch (InterruptedException e) {
86+
throw new RuntimeException(e);
87+
} finally {
88+
client.close();
89+
}
90+
}
91+
92+
/**
93+
* Does actual request and returns time stats in format:
94+
* [number of records, read time in ms, request initiation time in ms, server time in ms]
95+
* @param client
96+
* @param limit
97+
* @return
98+
*/
99+
private long[] doReadNumbersSet(Client client, int limit) {
100+
final String query = DATASET_QUERY + " LIMIT " + limit;
101+
try (QueryResponse response = client.query(query).get(3000, TimeUnit.MILLISECONDS)) {
102+
ArrayList<com.clickhouse.demo_service.data.NumbersRecord> result = new ArrayList<>();
103+
104+
// iterable approach is more efficient for large datasets because it doesn't load all records into memory
105+
ClickHouseBinaryFormatReader reader = Client.newBinaryFormatReader(response);
106+
107+
long start = System.nanoTime();
108+
while (reader.next() != null) {
109+
result.add(new com.clickhouse.demo_service.data.NumbersRecord(
110+
reader.getUUID("id"),
111+
reader.getLong("p1"),
112+
reader.getBigInteger("number"),
113+
reader.getFloat("p2"),
114+
reader.getDouble("p3")
115+
));
116+
}
117+
long duration = System.nanoTime() - start;
118+
119+
return new long[] { result.size(), TimeUnit.NANOSECONDS.toMillis(duration), response.getMetrics().getMetric(ClientMetrics.OP_DURATION).getLong(),
120+
TimeUnit.NANOSECONDS.toMillis(response.getServerTime()) };
121+
} catch (Exception e) {
122+
throw new RuntimeException("Failed to fetch dataset", e);
123+
}
124+
}
125+
126+
private static final String DATASET_QUERY =
127+
"SELECT generateUUIDv4() as id, " +
128+
"toUInt32(number) as p1, " +
129+
"number, " +
130+
"toFloat32(number/100000) as p2, " +
131+
"toFloat64(number/100000) as p3" +
132+
" FROM system.numbers";
133+
134+
public static void main(String[] args) {
135+
final String endpoint = System.getProperty("chEndpoint", "http://localhost:8123");
136+
final String user = System.getProperty("chUser", "default");
137+
final String password = System.getProperty("chPassword", "");
138+
final String database = System.getProperty("chDatabase", "default");
139+
140+
// profilerDelay();
141+
142+
BigDatasetExamples examples = new BigDatasetExamples(endpoint, user, password, database);
143+
144+
examples.readBigSetOfNumbers(100_000, 100, 10);
145+
146+
// profilerDelay();
147+
}
148+
149+
private static void profilerDelay() {
150+
// Delay for a profiler
151+
try {
152+
Thread.sleep(30000);
153+
} catch (InterruptedException e) {
154+
e.printStackTrace();
155+
}
156+
}
157+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.clickhouse.demo_service.data;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
7+
import java.math.BigInteger;
8+
import java.util.UUID;
9+
10+
@Data
11+
@AllArgsConstructor
12+
@NoArgsConstructor
13+
public class NumbersRecord {
14+
15+
private UUID id;
16+
17+
private long p1;
18+
19+
private BigInteger number;
20+
21+
private float p2;
22+
23+
private double p3;
24+
}

0 commit comments

Comments
 (0)