Skip to content

Commit 98a8743

Browse files
authored
Merge branch 'main' into adjusting-kafka-feedback
2 parents b3d68f4 + 21af845 commit 98a8743

File tree

17 files changed

+384
-215
lines changed

17 files changed

+384
-215
lines changed

client-v2/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
<version>1.18.32</version>
143143
</path>
144144
</annotationProcessorPaths>
145+
<release>8</release>
145146
</configuration>
146147
</plugin>
147148
<plugin>

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.clickhouse.client.api.internal.TableSchemaParser;
2424
import com.clickhouse.client.api.internal.ValidationUtils;
2525
import com.clickhouse.client.api.metadata.TableSchema;
26+
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
27+
import com.clickhouse.client.api.metrics.ClientMetrics;
2628
import com.clickhouse.client.api.query.GenericRecord;
2729
import com.clickhouse.client.api.query.QueryResponse;
2830
import com.clickhouse.client.api.query.QuerySettings;
@@ -106,7 +108,7 @@ public class Client {
106108
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
107109
private ExecutorService queryExecutor;
108110

109-
private Map<String, OperationStatistics.ClientStatistics> globalClientStats = new ConcurrentHashMap<>();
111+
private Map<String, ClientStatisticsHolder> globalClientStats = new ConcurrentHashMap<>();
110112

111113
private Client(Set<String> endpoints, Map<String,String> configuration) {
112114
this.endpoints = endpoints;
@@ -540,7 +542,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
540542

541543
String operationId = startOperation();
542544
settings.setOperationId(operationId);
543-
globalClientStats.get(operationId).start("serialization");
545+
globalClientStats.get(operationId).start(ClientMetrics.OP_SERIALIZATION);
544546

545547
if (data == null || data.isEmpty()) {
546548
throw new IllegalArgumentException("Data cannot be empty");
@@ -574,7 +576,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
574576
}
575577
}
576578

577-
globalClientStats.get(operationId).stop("serialization");
579+
globalClientStats.get(operationId).stop(ClientMetrics.OP_SERIALIZATION);
578580
LOG.debug("Total serialization time: {}", globalClientStats.get(operationId).getElapsedTime("serialization"));
579581
return insert(tableName, new ByteArrayInputStream(stream.toByteArray()), format, settings);
580582
}
@@ -609,8 +611,8 @@ public CompletableFuture<InsertResponse> insert(String tableName,
609611
operationId = startOperation();
610612
settings.setOperationId(operationId);
611613
}
612-
OperationStatistics.ClientStatistics clientStats = globalClientStats.remove(operationId);
613-
clientStats.start("insert");
614+
ClientStatisticsHolder clientStats = globalClientStats.remove(operationId);
615+
clientStats.start(ClientMetrics.OP_DURATION);
614616

615617
CompletableFuture<InsertResponse> responseFuture = new CompletableFuture<>();
616618

@@ -638,7 +640,6 @@ public CompletableFuture<InsertResponse> insert(String tableName,
638640
responseFuture.completeExceptionally(new ClientException("Operation has likely timed out.", e));
639641
}
640642
}
641-
clientStats.stop("insert");
642643
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));
643644
}
644645

@@ -706,8 +707,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
706707
if (settings.getFormat() == null) {
707708
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
708709
}
709-
OperationStatistics.ClientStatistics clientStats = new OperationStatistics.ClientStatistics();
710-
clientStats.start("query");
710+
ClientStatisticsHolder clientStats = new ClientStatisticsHolder();
711+
clientStats.start(ClientMetrics.OP_DURATION);
711712
ClickHouseClient client = createClient();
712713
ClickHouseRequest<?> request = client.read(getServerNode());
713714

@@ -762,7 +763,7 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
762763
settings = new QuerySettings();
763764
}
764765
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
765-
OperationStatistics.ClientStatistics clientStats = new OperationStatistics.ClientStatistics();
766+
ClientStatisticsHolder clientStats = new ClientStatisticsHolder();
766767
clientStats.start("query");
767768
ClickHouseClient client = createClient();
768769
ClickHouseRequest<?> request = client.read(getServerNode());
@@ -876,7 +877,7 @@ private ClickHouseRequest.Mutation createMutationRequest(ClickHouseRequest.Mutat
876877

877878
private String startOperation() {
878879
String operationId = UUID.randomUUID().toString();
879-
globalClientStats.put(operationId, new OperationStatistics.ClientStatistics());
880+
globalClientStats.put(operationId, new ClientStatisticsHolder());
880881
return operationId;
881882
}
882883

client-v2/src/main/java/com/clickhouse/client/api/OperationStatistics.java

Lines changed: 0 additions & 156 deletions
This file was deleted.

client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,22 @@
22

33
import com.clickhouse.client.ClickHouseClient;
44
import com.clickhouse.client.ClickHouseResponse;
5-
import com.clickhouse.client.ClickHouseResponseSummary;
6-
import com.clickhouse.client.api.OperationStatistics;
7-
8-
import java.util.concurrent.ExecutionException;
9-
import java.util.concurrent.TimeUnit;
10-
import java.util.concurrent.TimeoutException;
5+
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
6+
import com.clickhouse.client.api.metrics.OperationMetrics;
7+
import com.clickhouse.client.api.metrics.ServerMetrics;
118

129
public class InsertResponse implements AutoCloseable {
1310
private final ClickHouseResponse responseRef;
1411
private final ClickHouseClient client;
1512

16-
private OperationStatistics operationStatistics;
13+
private OperationMetrics operationMetrics;
1714

1815
public InsertResponse(ClickHouseClient client, ClickHouseResponse responseRef,
19-
OperationStatistics.ClientStatistics clientStatistics) {
16+
ClientStatisticsHolder clientStatisticsHolder) {
2017
this.responseRef = responseRef;
2118
this.client = client;
22-
this.operationStatistics = new OperationStatistics(clientStatistics);
23-
this.operationStatistics.updateServerStats(responseRef.getSummary());
19+
this.operationMetrics = new OperationMetrics(clientStatisticsHolder);
20+
this.operationMetrics.operationComplete(responseRef.getSummary());
2421
}
2522

2623
@Override
@@ -32,7 +29,60 @@ public void close() {
3229
}
3330
}
3431

35-
public OperationStatistics getOperationStatistics() {
36-
return operationStatistics;
32+
/**
33+
* Returns the metrics of this operation.
34+
*
35+
* @return metrics of this operation
36+
*/
37+
public OperationMetrics getMetrics() {
38+
return operationMetrics;
39+
}
40+
41+
/**
42+
* Alias for {@link ServerMetrics#NUM_ROWS_READ}
43+
* @return number of rows read by server from the storage
44+
*/
45+
public long getReadRows() {
46+
return operationMetrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong();
47+
}
48+
49+
/**
50+
* Alias for {@link ServerMetrics#NUM_BYTES_READ}
51+
* @return number of bytes read by server from the storage
52+
*/
53+
public long getReadBytes() {
54+
return operationMetrics.getMetric(ServerMetrics.NUM_BYTES_READ).getLong();
55+
}
56+
57+
/**
58+
* Alias for {@link ServerMetrics#NUM_ROWS_WRITTEN}
59+
* @return number of rows written by server to the storage
60+
*/
61+
public long getWrittenRows() {
62+
return operationMetrics.getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong();
63+
}
64+
65+
/**
66+
* Alias for {@link ServerMetrics#NUM_BYTES_WRITTEN}
67+
* @return number of bytes written by server to the storage
68+
*/
69+
public long getWrittenBytes() {
70+
return operationMetrics.getMetric(ServerMetrics.NUM_BYTES_WRITTEN).getLong();
71+
}
72+
73+
/**
74+
* Alias for {@link ServerMetrics#ELAPSED_TIME}
75+
* @return elapsed time in nanoseconds
76+
*/
77+
public long getServerTime() {
78+
return operationMetrics.getMetric(ServerMetrics.ELAPSED_TIME).getLong();
79+
}
80+
81+
/**
82+
* Alias for {@link ServerMetrics#RESULT_ROWS}
83+
* @return number of returned rows
84+
*/
85+
public long getResultRows() {
86+
return operationMetrics.getMetric(ServerMetrics.RESULT_ROWS).getLong();
3787
}
3888
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import com.clickhouse.client.api.metrics.ClientMetrics;
4+
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
8+
public class ClientStatisticsHolder {
9+
10+
private final Map<String, StopWatch> stopWatches = new HashMap<>();
11+
12+
public void start(ClientMetrics metric) {
13+
start(metric.getKey());
14+
}
15+
16+
public void start(String spanName) {
17+
stopWatches.computeIfAbsent(spanName, k -> new StopWatch()).start();
18+
}
19+
20+
public StopWatch stop(ClientMetrics metric) {
21+
return stop(metric.getKey());
22+
}
23+
24+
public StopWatch stop(String spanName) {
25+
StopWatch timer = stopWatches.computeIfAbsent(spanName, k -> new StopWatch());
26+
timer.stop();
27+
return timer;
28+
}
29+
30+
public long getElapsedTime(String spanName) {
31+
StopWatch sw = stopWatches.get(spanName);
32+
return sw == null ? -1 : sw.getElapsedTime();
33+
}
34+
35+
public Map<String, StopWatch> getStopWatches() {
36+
return stopWatches;
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return "ClientStatistics{" +
42+
"\"spans\"=" + stopWatches +
43+
'}';
44+
}
45+
}

0 commit comments

Comments
 (0)