Skip to content

Commit f63937b

Browse files
committed
expose network buffer property
1 parent 4fefbb9 commit f63937b

File tree

4 files changed

+212
-0
lines changed

4 files changed

+212
-0
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,19 @@ public Builder setSharedOperationExecutor(ExecutorService executorService) {
714714
return this;
715715
}
716716

717+
/**
718+
* Set size of a buffers that are used to read/write data from the server. It is mainly used to copy data from
719+
* a socket to application memory and visa-versa. Setting is applied for both read and write operations.
720+
* Default is 8192 bytes.
721+
*
722+
* @param size - size in bytes
723+
* @return
724+
*/
725+
public Builder setClientNetworkBufferSize(int size) {
726+
this.configuration.put("client_network_buffer_size", String.valueOf(size));
727+
return this;
728+
}
729+
717730
public Client build() {
718731
this.configuration = setDefaults(this.configuration);
719732

@@ -816,6 +829,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
816829
userConfig.put("connection_ttl", "-1");
817830
}
818831

832+
if (!userConfig.containsKey("client_network_buffer_size")) {
833+
setClientNetworkBufferSize(8192);
834+
}
835+
819836
return userConfig;
820837
}
821838
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1919
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
2020
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
21+
import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory;
2122
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
2223
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
2324
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -33,7 +34,10 @@
3334
import org.apache.hc.core5.http.HttpHost;
3435
import org.apache.hc.core5.http.HttpStatus;
3536
import org.apache.hc.core5.http.NoHttpResponseException;
37+
import org.apache.hc.core5.http.config.CharCodingConfig;
38+
import org.apache.hc.core5.http.config.Http1Config;
3639
import org.apache.hc.core5.http.config.RegistryBuilder;
40+
import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory;
3741
import org.apache.hc.core5.http.io.SocketConfig;
3842
import org.apache.hc.core5.http.io.entity.EntityTemplate;
3943
import org.apache.hc.core5.io.IOCallback;
@@ -161,6 +165,7 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext,
161165
PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create()
162166
.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX);
163167

168+
164169
ConnectionReuseStrategy connectionReuseStrategy =
165170
ConnectionReuseStrategy.valueOf(chConfiguration.get("connection_reuse_strategy"));
166171
switch (connectionReuseStrategy) {
@@ -181,6 +186,15 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext,
181186
connMgrBuilder::setMaxConnPerRoute);
182187

183188

189+
int networkBufferSize = MapUtils.getInt(chConfiguration, "client_network_buffer_size");
190+
ManagedHttpClientConnectionFactory connectionFactory = new ManagedHttpClientConnectionFactory(
191+
Http1Config.custom()
192+
.setBufferSize(networkBufferSize)
193+
.build(),
194+
CharCodingConfig.DEFAULT,
195+
DefaultHttpResponseParserFactory.INSTANCE);
196+
197+
connMgrBuilder.setConnectionFactory(connectionFactory);
184198
connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext));
185199
connMgrBuilder.setDefaultSocketConfig(socketConfig);
186200
return connMgrBuilder.build();
@@ -272,6 +286,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
272286
throw new RuntimeException(e);
273287
}
274288
HttpPost req = new HttpPost(uri);
289+
// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
275290
addHeaders(req, chConfiguration, requestConfig);
276291

277292
RequestConfig httpReqConfig = RequestConfig.copy(baseRequestConfig)
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package com.clickhouse.examples.client_v2;
2+
3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
5+
import com.clickhouse.client.api.metrics.ClientMetrics;
6+
import com.clickhouse.client.api.query.QueryResponse;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
import java.util.ArrayList;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.SynchronousQueue;
12+
import java.util.concurrent.ThreadPoolExecutor;
13+
import java.util.concurrent.TimeUnit;
14+
15+
@Slf4j
16+
public class BigDatasetExamples {
17+
18+
private final String endpoint;
19+
private final String user;
20+
private final String password;
21+
private final String database;
22+
23+
public BigDatasetExamples(String endpoint, String user, String password, String database) {
24+
this.endpoint = endpoint;
25+
this.user = user;
26+
this.password = password;
27+
this.database = database;
28+
}
29+
30+
/**
31+
* Reads {@code system.numbers} table into a result set of numbers of different types.
32+
*
33+
*/
34+
void readBigSetOfNumbers(int limit, int iterations, int concurrency) {
35+
Client client = new Client.Builder()
36+
.addEndpoint(endpoint)
37+
.setUsername(user)
38+
.setPassword(password)
39+
.setDefaultDatabase(database)
40+
.compressServerResponse(false)
41+
.compressClientRequest(false)
42+
.setLZ4UncompressedBufferSize(1048576)
43+
.useNewImplementation(true)
44+
// .setSocketRcvbuf(1_000_000)
45+
.setClientNetworkBufferSize(1_000_000)
46+
.setMaxConnections(20)
47+
.build();
48+
try {
49+
// client.ping(10); // warmup connections pool
50+
51+
Runnable task = () -> {
52+
StringBuilder sb = new StringBuilder();
53+
54+
for (int i = 0; i < iterations; i++) {
55+
try {
56+
long[] stats = doReadNumbersSet(client, limit);
57+
for (long stat : stats) {
58+
sb.append(stat).append(", ");
59+
}
60+
sb.append("\n");
61+
} catch (Exception e) {
62+
log.error("Failed to read dataset", e);
63+
}
64+
}
65+
66+
System.out.print(sb.toString());
67+
};
68+
69+
System.out.println("Records, Read Time, Request Time, Server Time");
70+
if (concurrency == 1) {
71+
task.run();
72+
} else {
73+
ExecutorService executor = new ThreadPoolExecutor(concurrency, Integer.MAX_VALUE,
74+
60L, TimeUnit.SECONDS,
75+
new SynchronousQueue<Runnable>());
76+
77+
for (int i = 0; i < concurrency; i++) {
78+
executor.submit(task);
79+
}
80+
81+
executor.shutdown();
82+
executor.awaitTermination(3, TimeUnit.MINUTES);
83+
}
84+
} catch (InterruptedException e) {
85+
throw new RuntimeException(e);
86+
} finally {
87+
client.close();
88+
}
89+
}
90+
91+
/**
92+
* Does actual request and returns time stats in format:
93+
* [number of records, read time in ms, request initiation time in ms, server time in ms]
94+
* @param client
95+
* @param limit
96+
* @return
97+
*/
98+
private long[] doReadNumbersSet(Client client, int limit) {
99+
final String query = DATASET_QUERY + " LIMIT " + limit;
100+
try (QueryResponse response = client.query(query).get(3000, TimeUnit.MILLISECONDS)) {
101+
ArrayList<com.clickhouse.demo_service.data.NumbersRecord> result = new ArrayList<>();
102+
103+
// iterable approach is more efficient for large datasets because it doesn't load all records into memory
104+
ClickHouseBinaryFormatReader reader = Client.newBinaryFormatReader(response);
105+
106+
long start = System.nanoTime();
107+
while (reader.next() != null) {
108+
result.add(new com.clickhouse.demo_service.data.NumbersRecord(
109+
reader.getUUID("id"),
110+
reader.getLong("p1"),
111+
reader.getBigInteger("number"),
112+
reader.getFloat("p2"),
113+
reader.getDouble("p3")
114+
));
115+
}
116+
long duration = System.nanoTime() - start;
117+
118+
return new long[] { result.size(), TimeUnit.NANOSECONDS.toMillis(duration), response.getMetrics().getMetric(ClientMetrics.OP_DURATION).getLong(),
119+
TimeUnit.NANOSECONDS.toMillis(response.getServerTime()) };
120+
} catch (Exception e) {
121+
throw new RuntimeException("Failed to fetch dataset", e);
122+
}
123+
}
124+
125+
private static final String DATASET_QUERY =
126+
"SELECT generateUUIDv4() as id, " +
127+
"toUInt32(number) as p1, " +
128+
"number, " +
129+
"toFloat32(number/100000) as p2, " +
130+
"toFloat64(number/100000) as p3" +
131+
" FROM system.numbers";
132+
133+
public static void main(String[] args) {
134+
final String endpoint = System.getProperty("chEndpoint", "http://localhost:8123");
135+
final String user = System.getProperty("chUser", "default");
136+
final String password = System.getProperty("chPassword", "");
137+
final String database = System.getProperty("chDatabase", "default");
138+
139+
// profilerDelay();
140+
141+
BigDatasetExamples examples = new BigDatasetExamples(endpoint, user, password, database);
142+
143+
examples.readBigSetOfNumbers(100_000, 100, 10);
144+
145+
// profilerDelay();
146+
}
147+
148+
private static void profilerDelay() {
149+
// Delay for a profiler
150+
try {
151+
Thread.sleep(30000);
152+
} catch (InterruptedException e) {
153+
e.printStackTrace();
154+
}
155+
}
156+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.clickhouse.demo_service.data;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
7+
import java.math.BigInteger;
8+
import java.util.UUID;
9+
10+
@Data
11+
@AllArgsConstructor
12+
@NoArgsConstructor
13+
public class NumbersRecord {
14+
15+
private UUID id;
16+
17+
private long p1;
18+
19+
private BigInteger number;
20+
21+
private float p2;
22+
23+
private double p3;
24+
}

0 commit comments

Comments
 (0)