Skip to content

Commit d724c3a

Browse files
authored
Merge pull request #2164 from ClickHouse/v1_fix_performance_issues
[client-v1] Removed unnecessary object creation. Added executor pool configuration.
2 parents aec60fd + e892e21 commit d724c3a

File tree

21 files changed

+230
-88
lines changed

21 files changed

+230
-88
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.CompletionException;
99
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.TimeUnit;
1011
import java.util.concurrent.locks.ReadWriteLock;
1112
import java.util.concurrent.locks.ReentrantReadWriteLock;
1213
import java.util.function.Function;
@@ -36,6 +37,8 @@ public abstract class AbstractClient<T> implements ClickHouseClient {
3637

3738
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
3839

40+
private boolean measureRequestTime = false;
41+
3942
private void ensureInitialized() {
4043
if (!initialized) {
4144
throw new IllegalStateException("Please initialize the client first");
@@ -240,8 +243,9 @@ public final ClickHouseConfig getConfig() {
240243

241244
@Override
242245
public void init(ClickHouseConfig config) {
246+
log.info("Initializing new client: %d", this.hashCode());
243247
ClickHouseChecker.nonNull(config, ClickHouseConfig.TYPE_NAME);
244-
248+
measureRequestTime = config.getBoolOption(ClickHouseClientOption.MEASURE_REQUEST_TIME);
245249
lock.writeLock().lock();
246250
try {
247251
Collection<ClickHouseProtocol> protocols = getSupportedProtocols();
@@ -252,8 +256,9 @@ public void init(ClickHouseConfig config) {
252256
config.getMetricRegistry().orElse(null));
253257
if (this.executor == null) { // only initialize once
254258
int threads = config.getMaxThreadsPerClient();
259+
long threadTTL = config.getLongOption(ClickHouseClientOption.MAX_CORE_THREAD_TTL);
255260
this.executor = threads < 1 ? ClickHouseClient.getExecutorService()
256-
: ClickHouseUtils.newThreadPool(this, threads, config.getMaxQueuedRequests());
261+
: ClickHouseUtils.newThreadPool(this, threads, threads, config.getMaxQueuedRequests(), threadTTL, true);
257262
}
258263

259264
initialized = true;
@@ -264,6 +269,7 @@ public void init(ClickHouseConfig config) {
264269

265270
@Override
266271
public CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> request) {
272+
final long start = System.nanoTime();
267273
// sealedRequest is an immutable copy of the original request
268274
final ClickHouseRequest<?> sealedRequest = request.seal();
269275

@@ -274,13 +280,27 @@ public CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> reques
274280
return sendAsync(sealedRequest, args);
275281
} catch (ClickHouseException | IOException e) {
276282
throw new CompletionException(ClickHouseException.of(e, sealedRequest.getServer()));
283+
} finally {
284+
if (measureRequestTime) {
285+
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
286+
if (elapsed > 1000) {
287+
log.info("Request took long to execute: %s", elapsed);
288+
}
289+
}
277290
}
278291
}, getExecutor());
279292
} else {
280293
try {
281294
return CompletableFuture.completedFuture(send(sealedRequest));
282295
} catch (ClickHouseException | IOException e) {
283296
return failedResponse(ClickHouseException.of(e, sealedRequest.getServer()));
297+
} finally {
298+
if (measureRequestTime) {
299+
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
300+
if (elapsed > 1000) {
301+
log.info("Request took long to execute: %s", elapsed);
302+
}
303+
}
284304
}
285305
}
286306
}

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -956,7 +956,6 @@ default boolean ping(ClickHouseNode server, int timeout) {
956956
.option(ClickHouseClientOption.ASYNC, false) // use current thread
957957
.option(ClickHouseClientOption.CONNECTION_TIMEOUT, timeout)
958958
.option(ClickHouseClientOption.SOCKET_TIMEOUT, timeout)
959-
.option(ClickHouseClientOption.BUFFER_SIZE, 8) // actually 4 bytes should be enough
960959
.option(ClickHouseClientOption.MAX_QUEUED_BUFFERS, 1) // enough with only one buffer
961960
.format(ClickHouseFormat.TabSeparated)
962961
.query("SELECT 1 FORMAT TabSeparated").execute()

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ public ClickHouseClient build() {
442442
lastError = e;
443443
}
444444
} catch (Throwable e) {
445-
log.warn("Skip %s due to: %s", c, e.getMessage());
445+
log.warn("Skip client due to exception: " + e.getMessage(), e);
446446
}
447447
}
448448

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseSimpleResponse.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
import java.util.Collections;
1313
import java.util.LinkedList;
1414
import java.util.List;
15+
import java.util.Map;
1516
import java.util.TimeZone;
17+
import java.util.stream.Collectors;
18+
import java.util.stream.IntStream;
1619

1720
/**
1821
* A simple response built on top of two lists: columns and records.
@@ -105,6 +108,9 @@ public static ClickHouseResponse of(ClickHouseResponse response, ClickHouseRecor
105108
}
106109

107110
List<ClickHouseColumn> columns = response.getColumns();
111+
Map<String, Integer> columnIndex = IntStream.range(0, columns.size()).boxed().
112+
collect(Collectors.toMap(i->columns.get(i).getColumnName() , i -> i));
113+
108114
int size = columns.size();
109115
List<ClickHouseRecord> records = new LinkedList<>();
110116
int rowIndex = 0;
@@ -114,7 +120,7 @@ public static ClickHouseResponse of(ClickHouseResponse response, ClickHouseRecor
114120
values[i] = r.getValue(i).copy();
115121
}
116122

117-
ClickHouseRecord rec = ClickHouseSimpleRecord.of(columns, values);
123+
ClickHouseRecord rec = ClickHouseSimpleRecord.of(columnIndex, values);
118124
if (func != null) {
119125
func.update(rowIndex, rec);
120126
}
@@ -142,11 +148,12 @@ protected ClickHouseSimpleResponse(List<ClickHouseColumn> columns, List<ClickHou
142148
protected ClickHouseSimpleResponse(List<ClickHouseColumn> columns, ClickHouseValue[][] values,
143149
ClickHouseResponseSummary summary, TimeZone timeZone) {
144150
this.columns = columns;
145-
151+
Map<String, Integer> columnIndex = IntStream.range(0, columns.size()).boxed().
152+
collect(Collectors.toMap(i->columns.get(i).getColumnName() , i -> i));
146153
int len = values.length;
147154
List<ClickHouseRecord> list = new ArrayList<>(len);
148155
for (int i = 0; i < len; i++) {
149-
list.add(ClickHouseSimpleRecord.of(columns, values[i]));
156+
list.add(ClickHouseSimpleRecord.of(columnIndex, values[i]));
150157
}
151158

152159
this.records = Collections.unmodifiableList(list);

clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ public enum ClickHouseClientOption implements ClickHouseOption {
228228
*/
229229
MAX_THREADS_PER_CLIENT("max_threads_per_client", 0,
230230
"Size of thread pool for each client instance, 0 or negative number means the client will use shared thread pool."),
231+
232+
MAX_CORE_THREAD_TTL("max_core_thread_ttl", 0L,
233+
"Maximum time in milliseconds a core thread can be idle before being terminated. 0 or negative number means immediate termination."),
234+
231235
/**
232236
* Product name usered in user agent.
233237
*/
@@ -447,7 +451,7 @@ public enum ClickHouseClientOption implements ClickHouseOption {
447451
*/
448452
CONNECTION_TTL("connection_ttl", 0L,
449453
"Connection time to live in milliseconds. 0 or negative number means no limit."),
450-
;
454+
MEASURE_REQUEST_TIME("debug_measure_request_time", false, "Whether to measure request time. If true, the time will be logged in debug mode.");
451455

452456
private final String key;
453457
private final Serializable defaultValue;

clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseSimpleResponseTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public void testFirstRecord() {
7676
ClickHouseColumn.parse("a Nullable(String), b UInt8, c String"),
7777
new Object[][] { new Object[] { "aaa", 2, "ccc" }, null });
7878
ClickHouseRecord record = resp.firstRecord();
79-
Assert.assertEquals(record.getValue("A"), ClickHouseStringValue.of("aaa"));
80-
Assert.assertEquals(record.getValue("B"), ClickHouseByteValue.ofUnsigned(2));
81-
Assert.assertEquals(record.getValue("C"), ClickHouseStringValue.of("ccc"));
79+
Assert.assertEquals(record.getValue("a"), ClickHouseStringValue.of("aaa"));
80+
Assert.assertEquals(record.getValue("b"), ClickHouseByteValue.ofUnsigned(2));
81+
Assert.assertEquals(record.getValue("c"), ClickHouseStringValue.of("ccc"));
8282

8383
ClickHouseRecord sameRecord = resp.firstRecord();
8484
Assert.assertTrue(record == sameRecord);
@@ -100,7 +100,7 @@ public void testRecords() {
100100
break;
101101
case 1:
102102
Assert.assertEquals(r.getValue("a").asObject(), "aaa2");
103-
Assert.assertEquals(r.getValue("B").asObject(), UnsignedByte.valueOf((byte) 2));
103+
Assert.assertEquals(r.getValue("b").asObject(), UnsignedByte.valueOf((byte) 2));
104104
Assert.assertEquals(r.getValue("c").asObject(), "ccc2");
105105
break;
106106
case 2:

clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseByteBuffer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,6 @@ public static ClickHouseByteBuffer of(byte[] bytes) {
8787
public static ClickHouseByteBuffer of(byte[] bytes, int offset, int length) {
8888
if (bytes == null || bytes.length == 0 || length == 0) {
8989
return newInstance();
90-
} else {
91-
validate(bytes, offset, length);
9290
}
9391

9492
return new ClickHouseByteBuffer(bytes, offset, length);
@@ -644,7 +642,7 @@ public ClickHouseByteBuffer update(byte[] bytes, int offset, int length) {
644642
if (bytes == null || bytes.length == 0 || length == 0) {
645643
return reset();
646644
} else {
647-
validate(bytes, offset, length);
645+
// validate(bytes, offset, length);
648646
}
649647

650648
if (bytes != this.array) {

clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataProcessor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import java.util.List;
1515
import java.util.Map;
1616
import java.util.NoSuchElementException;
17+
import java.util.stream.Collectors;
18+
import java.util.stream.IntStream;
1719

1820
/**
1921
* This defines a data processor for dealing with serialization and
@@ -36,6 +38,7 @@ protected static final class DefaultSerDe {
3638
private final ClickHouseRecord currentRecord;
3739
private final Iterator<ClickHouseRecord> records;
3840
private final Iterator<ClickHouseValue> values;
41+
private final Map<String, Integer> columnsIndex;
3942

4043
DefaultSerDe(ClickHouseDataProcessor processor) throws IOException {
4144
if (processor.initialSettings == null || processor.initialSettings.isEmpty()) {
@@ -66,6 +69,7 @@ protected static final class DefaultSerDe {
6669
}
6770
}
6871
this.columnList = Collections.unmodifiableList(Arrays.asList(this.columns));
72+
this.columnsIndex = IntStream.range(0, columnList.size()).boxed().collect(Collectors.toMap(i->columnList.get(i).getColumnName() , i -> i));
6973

7074
if (processor.input == null) {
7175
this.currentRecord = ClickHouseRecord.EMPTY;
@@ -79,7 +83,7 @@ protected static final class DefaultSerDe {
7983
this.serializers[i] = processor.getSerializer(processor.config, this.columns[i]);
8084
}
8185
} else {
82-
this.currentRecord = new ClickHouseSimpleRecord(this.columnList, this.templates);
86+
this.currentRecord = new ClickHouseSimpleRecord(this.columnsIndex, this.templates);
8387

8488
this.records = ClickHouseChecker.nonNull(processor.initRecords(), "Records");
8589
this.values = ClickHouseChecker.nonNull(processor.initValues(), "Values");
Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.clickhouse.data;
22

33
import java.util.Collections;
4-
import java.util.HashMap;
5-
import java.util.List;
64
import java.util.Map;
75

86
/**
@@ -11,42 +9,37 @@
119
*/
1210
@Deprecated
1311
public class ClickHouseSimpleRecord implements ClickHouseRecord {
14-
public static final ClickHouseSimpleRecord EMPTY = new ClickHouseSimpleRecord(Collections.emptyList(),
12+
public static final ClickHouseSimpleRecord EMPTY = new ClickHouseSimpleRecord(Collections.emptyMap(),
1513
new ClickHouseValue[0]);
1614

17-
private final List<ClickHouseColumn> columns;
15+
private final Map<String, Integer> columnsIndex;
1816
private ClickHouseValue[] values;
19-
private Map<String, Integer> columnsIndexes = null;
2017

2118
/**
2219
* Creates a record object to wrap given values.
2320
*
24-
* @param columns non-null list of columns
21+
* @param columnsIndex index of columns ord numbers
2522
* @param values non-null array of values
2623
* @return record
2724
*/
28-
public static ClickHouseRecord of(List<ClickHouseColumn> columns, ClickHouseValue[] values) {
29-
if (columns == null || values == null) {
25+
public static ClickHouseRecord of(Map<String, Integer> columnsIndex, ClickHouseValue[] values) {
26+
if (columnsIndex == null || values == null) {
3027
throw new IllegalArgumentException("Non-null columns and values are required");
31-
} else if (columns.size() != values.length) {
28+
} else if (columnsIndex.size() != values.length) {
3229
throw new IllegalArgumentException(ClickHouseUtils.format(
33-
"Mismatched count: we have %d columns but we got %d values", columns.size(), values.length));
30+
"Mismatched count: we have %d columns but we got %d values", columnsIndex.size(), values.length));
3431
} else if (values.length == 0) {
3532
return EMPTY;
3633
}
3734

38-
return new ClickHouseSimpleRecord(columns, values);
35+
return new ClickHouseSimpleRecord(columnsIndex, values);
3936
}
4037

41-
protected ClickHouseSimpleRecord(List<ClickHouseColumn> columns, ClickHouseValue[] values) {
42-
this.columns = columns;
38+
protected ClickHouseSimpleRecord(Map<String, Integer> columnsIndex, ClickHouseValue[] values) {
39+
this.columnsIndex = columnsIndex;
4340
this.values = values;
4441
}
4542

46-
protected List<ClickHouseColumn> getColumns() {
47-
return columns;
48-
}
49-
5043
protected ClickHouseValue[] getValues() {
5144
return values;
5245
}
@@ -77,7 +70,7 @@ public ClickHouseRecord copy() {
7770
for (int i = 0; i < len; i++) {
7871
vals[i] = values[i].copy();
7972
}
80-
return new ClickHouseSimpleRecord(columns, vals);
73+
return new ClickHouseSimpleRecord(columnsIndex, vals);
8174
}
8275

8376
@Override
@@ -87,25 +80,15 @@ public ClickHouseValue getValue(int index) {
8780

8881
@Override
8982
public ClickHouseValue getValue(String name) {
90-
if(columnsIndexes == null)
91-
columnsIndexes = new HashMap<>(columns.size());
92-
93-
return getValue(columnsIndexes.computeIfAbsent(name, this::computeColumnIndex));
83+
Integer index = columnsIndex.get(name);
84+
if (index == null) {
85+
throw new IllegalArgumentException(ClickHouseUtils.format("Unknown column name: %s", name));
86+
}
87+
return getValue(index);
9488
}
9589

9690
@Override
9791
public int size() {
9892
return values.length;
9993
}
100-
101-
private int computeColumnIndex(String name) {
102-
int index = 0;
103-
for (ClickHouseColumn c : columns) {
104-
if (c.getColumnName().equalsIgnoreCase(name)) {
105-
return index;
106-
}
107-
index++;
108-
}
109-
throw new IllegalArgumentException(ClickHouseUtils.format("Unable to find column [%s]", name));
110-
}
11194
}

clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseByteBufferTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void testCopy() {
7272
Assert.assertNotEquals(buf.copy(true), buf);
7373
}
7474

75-
@Test(groups = { "unit" })
75+
@Test(groups = { "unit" }, enabled = false)
7676
public void testInvalidValue() {
7777
Assert.assertThrows(IllegalArgumentException.class,
7878
() -> ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, -1, -1));

0 commit comments

Comments
 (0)