Skip to content

Commit df5b1e1

Browse files
committed
changes after a review
1 parent 7b249c4 commit df5b1e1

File tree

6 files changed

+140
-73
lines changed

6 files changed

+140
-73
lines changed

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ private void readSchema() {
3434
return;
3535
}
3636
for (int i = 0; i < nCol; i++) {
37-
columns.add(BinaryStreamReader.readString(input));
37+
columns.add(binaryStreamReader.readString());
3838
}
3939

4040
for (int i = 0; i < nCol; i++) {
41-
headerSchema.addColumn(columns.get(i), BinaryStreamReader.readString(input));
41+
headerSchema.addColumn(columns.get(i), binaryStreamReader.readString());
4242
}
4343

4444
setSchema(headerSchema);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
5656

5757
protected InputStream input;
5858

59-
protected Map<String, Object> settings;
60-
6159
protected BinaryStreamReader binaryStreamReader;
6260

6361
private TableSchema schema;
@@ -69,14 +67,14 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
6967
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
7068
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
7169
this.input = inputStream;
72-
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
73-
Boolean useServerTimeZone = (Boolean) this.settings.get(ClientConfigProperties.USE_SERVER_TIMEZONE.getKey());
70+
Map<String, Object> settings = querySettings == null ? Collections.emptyMap() : querySettings.getAllSettings();
71+
Boolean useServerTimeZone = (Boolean) settings.get(ClientConfigProperties.USE_SERVER_TIMEZONE.getKey());
7472
TimeZone timeZone = useServerTimeZone == Boolean.TRUE && querySettings != null ? querySettings.getServerTimeZone() :
75-
(TimeZone) this.settings.get(ClientConfigProperties.USE_TIMEZONE.getKey());
73+
(TimeZone) settings.get(ClientConfigProperties.USE_TIMEZONE.getKey());
7674
if (timeZone == null) {
7775
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
7876
}
79-
boolean jsonAsString = MapUtils.getFlag(this.settings,
77+
boolean jsonAsString = MapUtils.getFlag(settings,
8078
ClientConfigProperties.serverSetting(ServerSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING), false);
8179
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString);
8280
if (schema != null) {

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,13 @@ public <T> T readValue(ClickHouseColumn column, Class<?> typeHint) throws IOExce
112112
switch (dataType) {
113113
// Primitives
114114
case FixedString: {
115-
byte[] bytes = readNBytes(input, estimatedLen);
115+
byte[] bytes = estimatedLen > STRING_BUFF.length ?
116+
new byte[estimatedLen] : STRING_BUFF;
117+
readNBytes(input, bytes, 0, estimatedLen);
116118
return (T) new String(bytes, 0, estimatedLen, StandardCharsets.UTF_8);
117119
}
118120
case String: {
119-
int len = readVarInt(input);
120-
if (len == 0) {
121-
return (T) "";
122-
}
123-
return (T) new String(readNBytes(input, len), StandardCharsets.UTF_8);
121+
return (T) readString();
124122
}
125123
case Int8:
126124
return (T) Byte.valueOf(readByte());
@@ -977,6 +975,24 @@ public static ZonedDateTime readDateTime64(InputStream input, byte[] buff, int s
977975
return Instant.ofEpochSecond(value, nanoSeconds).atZone(tz.toZoneId());
978976
}
979977

978+
private final byte[] STRING_BUFF = new byte[1024];
979+
980+
/**
981+
* Reads a string from the internal input stream.
982+
* Uses pre-allocated buffer to store tmp data.
983+
* @return
984+
* @throws IOException
985+
*/
986+
public String readString() throws IOException {
987+
int len = readVarInt(input);
988+
if (len == 0) {
989+
return "";
990+
}
991+
byte[] dest = len > STRING_BUFF.length ? new byte[len] : STRING_BUFF;
992+
readNBytes(input, dest, 0, len);
993+
return new String(dest, 0, len, StandardCharsets.UTF_8);
994+
}
995+
980996
/**
981997
* Reads a decimal value from input stream.
982998
* @param input - source of bytes
Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,92 @@
11
package com.clickhouse.client.api.data_formats.internal;
22

3-
import com.clickhouse.client.api.ClientException;
43
import com.clickhouse.client.api.metrics.OperationMetrics;
54
import com.clickhouse.client.api.metrics.ServerMetrics;
6-
import com.fasterxml.jackson.core.JsonFactory;
7-
import com.fasterxml.jackson.core.JsonParser;
8-
import com.fasterxml.jackson.core.JsonToken;
95

10-
import java.io.IOException;
6+
import java.util.Collections;
7+
import java.util.HashMap;
8+
import java.util.Map;
119

1210
public class ProcessParser {
1311

14-
private static JsonFactory parserFactory = new JsonFactory();
12+
private static final String[] SUMMARY_FIELDS = {
13+
"read_rows", "read_bytes", "written_rows", "written_bytes",
14+
"total_rows_to_read", "elapsed_ns", "result_rows"
15+
};
16+
17+
private static final ServerMetrics[] SUMMARY_METRICS = {
18+
ServerMetrics.NUM_ROWS_READ, ServerMetrics.NUM_BYTES_READ,
19+
ServerMetrics.NUM_ROWS_WRITTEN, ServerMetrics.NUM_BYTES_WRITTEN,
20+
ServerMetrics.TOTAL_ROWS_TO_READ, ServerMetrics.ELAPSED_TIME,
21+
ServerMetrics.RESULT_ROWS
22+
};
1523

1624
public static void parseSummary(String text, OperationMetrics metrics) {
17-
try (JsonParser parser = parserFactory.createParser(text)) {
18-
parser.nextToken(); // skip START_OBJECT
19-
JsonToken t = parser.nextToken();
25+
Map<String, Integer> map = parse(text == null ? "{}" : text);
26+
27+
for (ServerMetrics m : ServerMetrics.values()) {
28+
metrics.updateMetric(m, -1);
29+
}
30+
31+
for (int i = 0; i < SUMMARY_FIELDS.length; i++) {
32+
String field = SUMMARY_FIELDS[i];
33+
Integer value = map.get(field);
34+
if (value != null) {
35+
metrics.updateMetric(SUMMARY_METRICS[i], value);
36+
}
37+
}
38+
}
39+
40+
41+
public static Map<String, Integer> parse(String json) {
42+
if (json == null) {
43+
throw new IllegalArgumentException("json is null");
44+
}
45+
json = json.trim();
46+
if (json.isEmpty()) {
47+
return Collections.emptyMap();
48+
}
49+
if (json.charAt(0) != '{' || json.charAt(json.length() - 1) != '}') {
50+
throw new IllegalArgumentException("JSON must start with '{' and end with '}'");
51+
}
2052

21-
for (ServerMetrics m : ServerMetrics.values()) {
22-
metrics.updateMetric(m, -1);
53+
Map<String, Integer> result = new HashMap<>();
54+
55+
String content = json.substring(1, json.length() - 1).trim();
56+
if (content.isEmpty()) {
57+
return result; // empty object
58+
}
59+
60+
String[] pairs = content.split(",");
61+
62+
for (String pair : pairs) {
63+
String[] keyValue = pair.split(":", 2);
64+
if (keyValue.length != 2) {
65+
throw new IllegalArgumentException("Invalid key-value format: " + pair);
66+
}
67+
68+
String key = keyValue[0].trim();
69+
String valueStr = keyValue[1].trim();
70+
71+
if (key.startsWith("\"") && key.endsWith("\"") && key.length() >= 2) {
72+
key = key.substring(1, key.length() - 1);
73+
} else {
74+
throw new IllegalArgumentException("Invalid key format: " + key);
75+
}
76+
77+
if (valueStr.startsWith("\"") && valueStr.endsWith("\"") && valueStr.length() >= 2) {
78+
valueStr = valueStr.substring(1, valueStr.length() - 1);
2379
}
24-
while (t != null) {
25-
if (t == JsonToken.FIELD_NAME) {
26-
String fieldName = parser.currentName();
27-
parser.nextValue();
28-
if ("read_rows".equals(fieldName)) {
29-
metrics.updateMetric(ServerMetrics.NUM_ROWS_READ, parser.getValueAsLong());
30-
} else if ("read_bytes".equals(fieldName)) {
31-
metrics.updateMetric(ServerMetrics.NUM_BYTES_READ, parser.getValueAsLong());
32-
} else if ("written_rows".equals(fieldName)) {
33-
metrics.updateMetric(ServerMetrics.NUM_ROWS_WRITTEN, parser.getValueAsLong());
34-
} else if ("written_bytes".equals(fieldName)) {
35-
metrics.updateMetric(ServerMetrics.NUM_BYTES_WRITTEN, parser.getValueAsLong());
36-
} else if ("total_rows_to_read".equals(fieldName)) {
37-
metrics.updateMetric(ServerMetrics.TOTAL_ROWS_TO_READ, parser.getValueAsLong());
38-
} else if ("elapsed_ns".equals(fieldName)) {
39-
metrics.updateMetric(ServerMetrics.ELAPSED_TIME, parser.getValueAsLong());
40-
} else if ("result_rows".equals(fieldName)) {
41-
metrics.updateMetric(ServerMetrics.RESULT_ROWS, parser.getValueAsLong());
42-
} else {
43-
// ignore unknown fields for forward compatibility
44-
}
45-
}
46-
t = parser.nextToken();
80+
81+
try {
82+
int value = Integer.parseInt(valueStr);
83+
result.put(key, value);
84+
} catch (NumberFormatException e) {
85+
// ignore error
4786
}
48-
} catch (IOException e) {
49-
throw new ClientException("Failed to parse summary", e);
5087
}
88+
89+
return result;
5190
}
91+
5292
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,15 @@ public Exception readError(ClassicHttpResponse httpResponse) {
388388
}
389389
}
390390

391+
private static final long POOL_VENT_TIMEOUT = 10000L;
392+
private AtomicLong timeToPoolVent = new AtomicLong(0);
393+
391394
public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Object> requestConfig,
392395
IOCallback<OutputStream> writeCallback) throws IOException {
393-
poolControl.closeExpired();
396+
if (timeToPoolVent.get() < System.currentTimeMillis()) {
397+
timeToPoolVent.set(System.currentTimeMillis() + POOL_VENT_TIMEOUT);
398+
poolControl.closeExpired();
399+
}
394400

395401
if (requestConfig == null) {
396402
requestConfig = Collections.emptyMap();
@@ -411,8 +417,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
411417
boolean useHttpCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.USE_HTTP_COMPRESSION.getKey());
412418
boolean appCompressedData = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.APP_COMPRESSED_DATA.getKey());
413419

414-
RequestConfig httpReqConfig = RequestConfig.copy(baseRequestConfig).build();
415-
req.setConfig(httpReqConfig);
420+
req.setConfig(baseRequestConfig);
416421
// setting entity. wrapping if compression is enabled
417422
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
418423
clientCompression, useHttpCompression, appCompressedData));
@@ -498,17 +503,22 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
498503
}
499504
}
500505

501-
for (Map.Entry<String, String> entry : chConfig.entrySet()) {
502-
if (entry.getKey().startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) {
503-
req.setHeader(entry.getKey().substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), entry.getValue());
506+
for (String key : chConfig.keySet()) {
507+
if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) {
508+
req.setHeader(key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), chConfig.get(key));
504509
}
505510
}
506-
for (Map.Entry<String, Object> entry : requestConfig.entrySet()) {
507-
if (entry.getKey().startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) {
508-
req.setHeader(entry.getKey().substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), entry.getValue().toString());
511+
512+
for (String key : requestConfig.keySet()) {
513+
if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) {
514+
Object val = requestConfig.get(key);
515+
if (val != null) {
516+
req.setHeader(key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), String.valueOf(val));
517+
}
509518
}
510519
}
511520

521+
512522
// Special cases
513523
if (req.containsHeader(HttpHeaders.AUTHORIZATION) && (req.containsHeader(ClickHouseHttpProto.HEADER_DB_USER) ||
514524
req.containsHeader(ClickHouseHttpProto.HEADER_DB_PASSWORD))) {
@@ -522,9 +532,9 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
522532
}
523533

524534
private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<String, Object> requestConfig) {
525-
for (Map.Entry<String, String> entry : chConfig.entrySet()) {
526-
if (entry.getKey().startsWith(ClientConfigProperties.SERVER_SETTING_PREFIX)) {
527-
req.addParameter(entry.getKey().substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), entry.getValue());
535+
for (String key : chConfig.keySet()) {
536+
if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) {
537+
req.addParameter(key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), chConfig.get(key));
528538
}
529539
}
530540

@@ -563,9 +573,12 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
563573
sessionRoles.forEach(r -> req.addParameter(ClickHouseHttpProto.QPARAM_ROLE, r));
564574
}
565575

566-
for (Map.Entry<String, Object> entry : requestConfig.entrySet()) {
567-
if (entry.getKey().startsWith(ClientConfigProperties.SERVER_SETTING_PREFIX)) {
568-
req.addParameter(entry.getKey().substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), entry.getValue().toString());
576+
for (String key : requestConfig.keySet()) {
577+
if (key.startsWith(ClientConfigProperties.SERVER_SETTING_PREFIX)) {
578+
Object val = requestConfig.get(key);
579+
if (val != null) {
580+
req.addParameter(key.substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), String.valueOf(requestConfig.get(key)));
581+
}
569582
}
570583
}
571584
}

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,21 @@ public static void main(String[] args) throws Exception {
3636

3737
Options opt = new OptionsBuilder()
3838
.include(QueryClient.class.getSimpleName())
39-
.include(InsertClient.class.getSimpleName())
40-
.include(Compression.class.getSimpleName())
41-
.include(Serializers.class.getSimpleName())
42-
.include(Deserializers.class.getSimpleName())
43-
.include(MixedWorkload.class.getSimpleName())
39+
// .include(InsertClient.class.getSimpleName())
40+
// .include(Compression.class.getSimpleName())
41+
// .include(Serializers.class.getSimpleName())
42+
// .include(Deserializers.class.getSimpleName())
43+
// .include(MixedWorkload.class.getSimpleName())
4444
.forks(1) // must be a fork. No fork only for debugging
4545
.mode(Mode.SampleTime)
4646
.timeUnit(TimeUnit.MILLISECONDS)
4747
.addProfiler(GCProfiler.class)
4848
.addProfiler(MemPoolProfiler.class)
49-
.warmupIterations(3)
49+
.warmupIterations(1)
5050
.warmupTime(TimeValue.seconds(10))
51-
.measurementIterations(10)
51+
.measurementIterations(5)
5252
.jvmArgs("-Xms8g", "-Xmx8g")
53-
.measurementTime(TimeValue.seconds(isCloud() ? 30 : 120))
53+
.measurementTime(TimeValue.seconds(isCloud() ? 30 : 10))
5454
.resultFormat(ResultFormatType.JSON)
5555
// .output(String.format("jmh-results-%s-%s.out", isCloud() ? "cloud" : "local", System.currentTimeMillis()))
5656
.result(String.format("jmh-results-%s-%s.json", isCloud() ? "cloud" : "local", System.currentTimeMillis()))

0 commit comments

Comments
 (0)