Skip to content

Commit 7ea67f9

Browse files
authored
Merge pull request #2238 from ClickHouse/perf_code_review_01
[perf] Request pipeline review and improvements
2 parents 7b249c4 + 46496f5 commit 7ea67f9

File tree

8 files changed

+150
-94
lines changed

8 files changed

+150
-94
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public class Client implements AutoCloseable {
143143
// Server context
144144
private String serverVersion;
145145
private Object metricsRegistry;
146+
private int retries;
146147

147148
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
148149
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) {
@@ -172,6 +173,9 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
172173
boolean initSslContext = getEndpoints().stream().anyMatch(s -> s.toLowerCase().contains("https://"));
173174
this.httpClientHelper = new HttpAPIClientHelper(configuration, metricsRegistry, initSslContext);
174175
this.columnToMethodMatchingStrategy = columnToMethodMatchingStrategy;
176+
177+
String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
178+
this.retries = retry == null ? 0 : Integer.parseInt(retry);
175179
}
176180

177181
/**
@@ -1471,8 +1475,6 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14711475
Supplier<InsertResponse> responseSupplier;
14721476

14731477

1474-
String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
1475-
final int maxRetries = retry == null ? 0 : Integer.parseInt(retry);
14761478
final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ?
14771479
Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "8192")) :
14781480
settings.getInputStreamCopyBufferSize();
@@ -1491,7 +1493,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14911493
ClickHouseNode selectedNode = getNextAliveNode();
14921494

14931495
RuntimeException lastException = null;
1494-
for (int i = 0; i <= maxRetries; i++) {
1496+
for (int i = 0; i <= retries; i++) {
14951497
// Execute request
14961498
try (ClassicHttpResponse httpResponse =
14971499
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(),
@@ -1517,7 +1519,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15171519
return new InsertResponse(metrics);
15181520
} catch (Exception e) {
15191521
lastException = httpClientHelper.wrapException(String.format("Insert failed (Attempt: %s/%s - Duration: %s)",
1520-
(i + 1), (maxRetries + 1), System.nanoTime() - startTime), e);
1522+
(i + 1), (retries + 1), System.nanoTime() - startTime), e);
15211523
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
15221524
LOG.warn("Retrying.", e);
15231525
selectedNode = getNextAliveNode();
@@ -1526,15 +1528,15 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15261528
}
15271529
}
15281530

1529-
if (i < maxRetries) {
1531+
if (i < retries) {
15301532
try {
15311533
writer.onRetry();
15321534
} catch (IOException ioe) {
15331535
throw new ClientException("Failed to reset stream before next attempt", ioe);
15341536
}
15351537
}
15361538
}
1537-
throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1) + " - Duration: " + (System.nanoTime() - startTime), lastException);
1539+
throw new ClientException("Insert request failed after attempts: " + (retries + 1) + " - Duration: " + (System.nanoTime() - startTime), lastException);
15381540
};
15391541

15401542
return runAsyncOperation(responseSupplier, settings.getAllSettings());
@@ -1605,9 +1607,6 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
16051607

16061608
Supplier<QueryResponse> responseSupplier;
16071609

1608-
String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
1609-
final int maxRetries = retry == null ? 0 : Integer.parseInt(retry);
1610-
16111610
if (queryParams != null) {
16121611
settings.setOption("statement_params", queryParams);
16131612
}
@@ -1617,7 +1616,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
16171616
// Selecting some node
16181617
ClickHouseNode selectedNode = getNextAliveNode();
16191618
RuntimeException lastException = null;
1620-
for (int i = 0; i <= maxRetries; i++) {
1619+
for (int i = 0; i <= retries; i++) {
16211620
try {
16221621
ClassicHttpResponse httpResponse =
16231622
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(), output -> {
@@ -1650,7 +1649,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
16501649

16511650
} catch (Exception e) {
16521651
lastException = httpClientHelper.wrapException(String.format("Query request failed (Attempt: %s/%s - Duration: %s)",
1653-
(i + 1), (maxRetries + 1), System.nanoTime() - startTime), e);
1652+
(i + 1), (retries + 1), System.nanoTime() - startTime), e);
16541653
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
16551654
LOG.warn("Retrying.", e);
16561655
selectedNode = getNextAliveNode();
@@ -1660,7 +1659,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
16601659
}
16611660
}
16621661

1663-
throw new ClientException("Query request failed after attempts: " + (maxRetries + 1) + " - Duration: " + (System.nanoTime() - startTime), lastException);
1662+
throw new ClientException("Query request failed after attempts: " + (retries + 1) + " - Duration: " + (System.nanoTime() - startTime), lastException);
16641663
};
16651664

16661665
return runAsyncOperation(responseSupplier, settings.getAllSettings());

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ private void readSchema() {
3434
return;
3535
}
3636
for (int i = 0; i < nCol; i++) {
37-
columns.add(BinaryStreamReader.readString(input));
37+
columns.add(binaryStreamReader.readString());
3838
}
3939

4040
for (int i = 0; i < nCol; i++) {
41-
headerSchema.addColumn(columns.get(i), BinaryStreamReader.readString(input));
41+
headerSchema.addColumn(columns.get(i), binaryStreamReader.readString());
4242
}
4343

4444
setSchema(headerSchema);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
5656

5757
protected InputStream input;
5858

59-
protected Map<String, Object> settings;
60-
6159
protected BinaryStreamReader binaryStreamReader;
6260

6361
private TableSchema schema;
@@ -69,14 +67,14 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
6967
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
7068
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
7169
this.input = inputStream;
72-
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
73-
Boolean useServerTimeZone = (Boolean) this.settings.get(ClientConfigProperties.USE_SERVER_TIMEZONE.getKey());
70+
Map<String, Object> settings = querySettings == null ? Collections.emptyMap() : querySettings.getAllSettings();
71+
Boolean useServerTimeZone = (Boolean) settings.get(ClientConfigProperties.USE_SERVER_TIMEZONE.getKey());
7472
TimeZone timeZone = useServerTimeZone == Boolean.TRUE && querySettings != null ? querySettings.getServerTimeZone() :
75-
(TimeZone) this.settings.get(ClientConfigProperties.USE_TIMEZONE.getKey());
73+
(TimeZone) settings.get(ClientConfigProperties.USE_TIMEZONE.getKey());
7674
if (timeZone == null) {
7775
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
7876
}
79-
boolean jsonAsString = MapUtils.getFlag(this.settings,
77+
boolean jsonAsString = MapUtils.getFlag(settings,
8078
ClientConfigProperties.serverSetting(ServerSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING), false);
8179
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString);
8280
if (schema != null) {

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,13 @@ public <T> T readValue(ClickHouseColumn column, Class<?> typeHint) throws IOExce
112112
switch (dataType) {
113113
// Primitives
114114
case FixedString: {
115-
byte[] bytes = readNBytes(input, estimatedLen);
115+
byte[] bytes = estimatedLen > STRING_BUFF.length ?
116+
new byte[estimatedLen] : STRING_BUFF;
117+
readNBytes(input, bytes, 0, estimatedLen);
116118
return (T) new String(bytes, 0, estimatedLen, StandardCharsets.UTF_8);
117119
}
118120
case String: {
119-
int len = readVarInt(input);
120-
if (len == 0) {
121-
return (T) "";
122-
}
123-
return (T) new String(readNBytes(input, len), StandardCharsets.UTF_8);
121+
return (T) readString();
124122
}
125123
case Int8:
126124
return (T) Byte.valueOf(readByte());
@@ -977,6 +975,24 @@ public static ZonedDateTime readDateTime64(InputStream input, byte[] buff, int s
977975
return Instant.ofEpochSecond(value, nanoSeconds).atZone(tz.toZoneId());
978976
}
979977

978+
private final byte[] STRING_BUFF = new byte[1024];
979+
980+
/**
981+
* Reads a string from the internal input stream.
982+
* Uses pre-allocated buffer to store tmp data.
983+
* @return
984+
* @throws IOException
985+
*/
986+
public String readString() throws IOException {
987+
int len = readVarInt(input);
988+
if (len == 0) {
989+
return "";
990+
}
991+
byte[] dest = len > STRING_BUFF.length ? new byte[len] : STRING_BUFF;
992+
readNBytes(input, dest, 0, len);
993+
return new String(dest, 0, len, StandardCharsets.UTF_8);
994+
}
995+
980996
/**
981997
* Reads a decimal value from input stream.
982998
* @param input - source of bytes
Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,92 @@
11
package com.clickhouse.client.api.data_formats.internal;
22

3-
import com.clickhouse.client.api.ClientException;
43
import com.clickhouse.client.api.metrics.OperationMetrics;
54
import com.clickhouse.client.api.metrics.ServerMetrics;
6-
import com.fasterxml.jackson.core.JsonFactory;
7-
import com.fasterxml.jackson.core.JsonParser;
8-
import com.fasterxml.jackson.core.JsonToken;
95

10-
import java.io.IOException;
6+
import java.util.Collections;
7+
import java.util.HashMap;
8+
import java.util.Map;
119

1210
public class ProcessParser {
1311

14-
private static JsonFactory parserFactory = new JsonFactory();
12+
private static final String[] SUMMARY_FIELDS = {
13+
"read_rows", "read_bytes", "written_rows", "written_bytes",
14+
"total_rows_to_read", "elapsed_ns", "result_rows"
15+
};
16+
17+
private static final ServerMetrics[] SUMMARY_METRICS = {
18+
ServerMetrics.NUM_ROWS_READ, ServerMetrics.NUM_BYTES_READ,
19+
ServerMetrics.NUM_ROWS_WRITTEN, ServerMetrics.NUM_BYTES_WRITTEN,
20+
ServerMetrics.TOTAL_ROWS_TO_READ, ServerMetrics.ELAPSED_TIME,
21+
ServerMetrics.RESULT_ROWS
22+
};
1523

1624
public static void parseSummary(String text, OperationMetrics metrics) {
17-
try (JsonParser parser = parserFactory.createParser(text)) {
18-
parser.nextToken(); // skip START_OBJECT
19-
JsonToken t = parser.nextToken();
25+
Map<String, Integer> map = parse(text == null ? "{}" : text);
26+
27+
for (ServerMetrics m : ServerMetrics.values()) {
28+
metrics.updateMetric(m, -1);
29+
}
30+
31+
for (int i = 0; i < SUMMARY_FIELDS.length; i++) {
32+
String field = SUMMARY_FIELDS[i];
33+
Integer value = map.get(field);
34+
if (value != null) {
35+
metrics.updateMetric(SUMMARY_METRICS[i], value);
36+
}
37+
}
38+
}
39+
40+
41+
public static Map<String, Integer> parse(String json) {
42+
if (json == null) {
43+
throw new IllegalArgumentException("json is null");
44+
}
45+
json = json.trim();
46+
if (json.isEmpty()) {
47+
return Collections.emptyMap();
48+
}
49+
if (json.charAt(0) != '{' || json.charAt(json.length() - 1) != '}') {
50+
throw new IllegalArgumentException("JSON must start with '{' and end with '}'");
51+
}
2052

21-
for (ServerMetrics m : ServerMetrics.values()) {
22-
metrics.updateMetric(m, -1);
53+
Map<String, Integer> result = new HashMap<>();
54+
55+
String content = json.substring(1, json.length() - 1).trim();
56+
if (content.isEmpty()) {
57+
return result; // empty object
58+
}
59+
60+
String[] pairs = content.split(",");
61+
62+
for (String pair : pairs) {
63+
String[] keyValue = pair.split(":", 2);
64+
if (keyValue.length != 2) {
65+
throw new IllegalArgumentException("Invalid key-value format: " + pair);
66+
}
67+
68+
String key = keyValue[0].trim();
69+
String valueStr = keyValue[1].trim();
70+
71+
if (key.startsWith("\"") && key.endsWith("\"") && key.length() >= 2) {
72+
key = key.substring(1, key.length() - 1);
73+
} else {
74+
throw new IllegalArgumentException("Invalid key format: " + key);
75+
}
76+
77+
if (valueStr.startsWith("\"") && valueStr.endsWith("\"") && valueStr.length() >= 2) {
78+
valueStr = valueStr.substring(1, valueStr.length() - 1);
2379
}
24-
while (t != null) {
25-
if (t == JsonToken.FIELD_NAME) {
26-
String fieldName = parser.currentName();
27-
parser.nextValue();
28-
if ("read_rows".equals(fieldName)) {
29-
metrics.updateMetric(ServerMetrics.NUM_ROWS_READ, parser.getValueAsLong());
30-
} else if ("read_bytes".equals(fieldName)) {
31-
metrics.updateMetric(ServerMetrics.NUM_BYTES_READ, parser.getValueAsLong());
32-
} else if ("written_rows".equals(fieldName)) {
33-
metrics.updateMetric(ServerMetrics.NUM_ROWS_WRITTEN, parser.getValueAsLong());
34-
} else if ("written_bytes".equals(fieldName)) {
35-
metrics.updateMetric(ServerMetrics.NUM_BYTES_WRITTEN, parser.getValueAsLong());
36-
} else if ("total_rows_to_read".equals(fieldName)) {
37-
metrics.updateMetric(ServerMetrics.TOTAL_ROWS_TO_READ, parser.getValueAsLong());
38-
} else if ("elapsed_ns".equals(fieldName)) {
39-
metrics.updateMetric(ServerMetrics.ELAPSED_TIME, parser.getValueAsLong());
40-
} else if ("result_rows".equals(fieldName)) {
41-
metrics.updateMetric(ServerMetrics.RESULT_ROWS, parser.getValueAsLong());
42-
} else {
43-
// ignore unknown fields for forward compatibility
44-
}
45-
}
46-
t = parser.nextToken();
80+
81+
try {
82+
int value = Integer.parseInt(valueStr);
83+
result.put(key, value);
84+
} catch (NumberFormatException e) {
85+
// ignore error
4786
}
48-
} catch (IOException e) {
49-
throw new ClientException("Failed to parse summary", e);
5087
}
88+
89+
return result;
5190
}
91+
5292
}

0 commit comments

Comments
 (0)