Skip to content

Commit 0ea28d4

Browse files
authored
Merge pull request #2226 from ClickHouse/perf_deserializer_tests
[perf] Deserializer Tests
2 parents e363e40 + 78793f0 commit 0ea28d4

File tree

7 files changed

+183
-56
lines changed

7 files changed

+183
-56
lines changed

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import com.clickhouse.client.api.query.QuerySettings;
1212
import com.clickhouse.data.ClickHouseColumn;
1313
import com.clickhouse.data.ClickHouseDataType;
14-
import com.clickhouse.data.ClickHouseEnum;
1514
import com.clickhouse.data.value.ClickHouseBitmap;
1615
import com.clickhouse.data.value.ClickHouseGeoMultiPolygonValue;
1716
import com.clickhouse.data.value.ClickHouseGeoPointValue;
@@ -27,13 +26,20 @@
2726
import java.math.BigInteger;
2827
import java.net.Inet4Address;
2928
import java.net.Inet6Address;
30-
import java.time.*;
31-
import java.time.format.DateTimeFormatter;
32-
import java.time.temporal.ChronoUnit;
29+
import java.time.Duration;
30+
import java.time.Instant;
31+
import java.time.LocalDate;
32+
import java.time.LocalDateTime;
33+
import java.time.OffsetDateTime;
34+
import java.time.ZoneOffset;
35+
import java.time.ZonedDateTime;
3336
import java.time.temporal.TemporalAmount;
34-
import java.util.*;
35-
import java.util.concurrent.ConcurrentHashMap;
36-
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.Collections;
38+
import java.util.HashMap;
39+
import java.util.List;
40+
import java.util.Map;
41+
import java.util.TimeZone;
42+
import java.util.UUID;
3743
import java.util.function.Function;
3844

3945
public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {
@@ -49,8 +55,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
4955
private TableSchema schema;
5056
private ClickHouseColumn[] columns;
5157
private Map[] convertions;
52-
private volatile boolean hasNext = true;
53-
private volatile boolean initialState = true; // reader is in initial state, no records have been read yet
58+
private boolean hasNext = true;
59+
private boolean initialState = true; // reader is in initial state, no records have been read yet
5460

5561
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
5662
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
@@ -70,10 +76,10 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
7076
}
7177
}
7278

73-
protected Map<String, Object> currentRecord = new ConcurrentHashMap<>();
74-
protected Map<String, Object> nextRecord = new ConcurrentHashMap<>();
79+
protected Map<String, Object> currentRecord = new HashMap<>();
80+
protected Map<String, Object> nextRecord = new HashMap<>();
7581

76-
protected AtomicBoolean nextRecordEmpty = new AtomicBoolean(true);
82+
protected boolean nextRecordEmpty = true;
7783

7884
/**
7985
* Reads next record into POJO object using set of serializers.
@@ -170,11 +176,11 @@ public boolean hasNext() {
170176
protected void readNextRecord() {
171177
initialState = false;
172178
try {
173-
nextRecordEmpty.set(true);
179+
nextRecordEmpty = true;
174180
if (!readRecord(nextRecord)) {
175181
endReached();
176182
} else {
177-
nextRecordEmpty.compareAndSet(true, false);
183+
nextRecordEmpty = false;
178184
}
179185
} catch (IOException e) {
180186
endReached();
@@ -188,7 +194,7 @@ public Map<String, Object> next() {
188194
return null;
189195
}
190196

191-
if (!nextRecordEmpty.get()) {
197+
if (!nextRecordEmpty) {
192198
Map<String, Object> tmp = currentRecord;
193199
currentRecord = nextRecord;
194200
nextRecord = tmp;

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.clickhouse.benchmark;
22

33

4-
import com.clickhouse.benchmark.clients.Components;
4+
import com.clickhouse.benchmark.clients.Compression;
5+
import com.clickhouse.benchmark.clients.Deserializers;
56
import com.clickhouse.benchmark.clients.InsertClient;
67
import com.clickhouse.benchmark.clients.QueryClient;
8+
import com.clickhouse.benchmark.clients.Serializers;
79
import org.openjdk.jmh.annotations.Mode;
810
import org.openjdk.jmh.profile.GCProfiler;
911
import org.openjdk.jmh.profile.MemPoolProfiler;
@@ -32,7 +34,9 @@ public static void main(String[] args) throws Exception {
3234
Options opt = new OptionsBuilder()
3335
.include(QueryClient.class.getSimpleName())
3436
.include(InsertClient.class.getSimpleName())
35-
// .include(Components.class.getSimpleName())
37+
.include(Compression.class.getSimpleName())
38+
.include(Serializers.class.getSimpleName())
39+
.include(Deserializers.class.getSimpleName())
3640
.forks(1) // must be a fork. No fork only for debugging
3741
.mode(Mode.SampleTime)
3842
.timeUnit(TimeUnit.MILLISECONDS)

performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
import com.clickhouse.client.api.Client;
1313
import com.clickhouse.client.api.enums.Protocol;
1414
import com.clickhouse.client.api.insert.InsertResponse;
15-
import com.clickhouse.client.config.ClickHouseClientOption;
16-
import com.clickhouse.data.ClickHouseDataProcessor;
1715
import com.clickhouse.client.api.query.GenericRecord;
16+
import com.clickhouse.data.ClickHouseDataProcessor;
1817
import com.clickhouse.data.ClickHouseFormat;
1918
import com.clickhouse.data.ClickHouseOutputStream;
2019
import com.clickhouse.data.ClickHouseRecord;
@@ -30,9 +29,10 @@
3029

3130
import java.io.ByteArrayOutputStream;
3231
import java.io.InputStream;
32+
import java.math.BigInteger;
33+
import java.nio.ByteBuffer;
3334
import java.util.ArrayList;
3435
import java.util.Collections;
35-
import java.math.BigInteger;
3636
import java.util.List;
3737

3838
import static com.clickhouse.benchmark.BenchmarkRunner.getSelectCountQuery;
@@ -72,6 +72,7 @@ public void tearDownIteration() {
7272

7373
@State(Scope.Benchmark)
7474
public static class DataState {
75+
7576
@Param({"file://dataset_500k.csv"})
7677
String datasetSourceName;
7778
@Param({"300000", "220000", "100000", "10000"})
@@ -84,6 +85,8 @@ public static class DataState {
8485

8586
DataSet dataSet;
8687

88+
ByteBuffer datasetAsRowBinaryWithNamesAndTypes;
89+
8790
public void setDataSet(DataSet dataSet) {
8891
this.dataSet = dataSet;
8992
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.clickhouse.benchmark.clients;
2+
3+
import com.clickhouse.benchmark.data.DataSet;
4+
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
5+
import com.clickhouse.data.ClickHouseOutputStream;
6+
import com.clickhouse.data.stream.Lz4OutputStream;
7+
import net.jpountz.lz4.LZ4Factory;
8+
import org.openjdk.jmh.annotations.Benchmark;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.io.ByteArrayOutputStream;
13+
14+
public class Compression extends BenchmarkBase {
15+
private static final Logger LOGGER = LoggerFactory.getLogger(Compression.class);
16+
17+
static final int COMPRESS_BUFFER_SIZE = 64 * 1024; // 64K
18+
19+
@Benchmark
20+
public void CompressingOutputStreamV1(DataState dataState) {
21+
DataSet dataSet = dataState.dataSet;
22+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ClickHouseOutputStream out =
23+
new Lz4OutputStream(baos, COMPRESS_BUFFER_SIZE, null)) {
24+
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
25+
out.write(bytes);
26+
}
27+
} catch (Exception e) {
28+
LOGGER.error("Error: ", e);
29+
}
30+
}
31+
32+
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
33+
34+
@Benchmark
35+
public void CompressingOutputStreamV2(DataState dataState) {
36+
DataSet dataSet = dataState.dataSet;
37+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
38+
ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(baos,
39+
factory.fastCompressor(), COMPRESS_BUFFER_SIZE)) {
40+
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
41+
out.write(bytes);
42+
}
43+
} catch (Exception e) {
44+
LOGGER.error("Error: ", e);
45+
}
46+
}
47+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.clickhouse.benchmark.clients;
2+
3+
import com.clickhouse.client.ClickHouseConfig;
4+
import com.clickhouse.client.api.Client;
5+
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
6+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
7+
import com.clickhouse.client.api.query.QueryResponse;
8+
import com.clickhouse.client.api.query.QuerySettings;
9+
import com.clickhouse.client.config.ClickHouseClientOption;
10+
import com.clickhouse.data.ClickHouseColumn;
11+
import com.clickhouse.data.ClickHouseDataProcessor;
12+
import com.clickhouse.data.ClickHouseFormat;
13+
import com.clickhouse.data.ClickHouseInputStream;
14+
import com.clickhouse.data.ClickHouseRecord;
15+
import com.clickhouse.data.format.ClickHouseRowBinaryProcessor;
16+
import org.openjdk.jmh.annotations.Benchmark;
17+
import org.openjdk.jmh.annotations.Level;
18+
import org.openjdk.jmh.annotations.Setup;
19+
import org.openjdk.jmh.infra.Blackhole;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.io.ByteArrayInputStream;
24+
import java.io.InputStream;
25+
import java.nio.ByteBuffer;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
public class Deserializers extends BenchmarkBase {
31+
private static final Logger LOGGER = LoggerFactory.getLogger(Deserializers.class);
32+
33+
@Setup(Level.Iteration)
34+
public void setUpIteration(DataState dataState) {
35+
super.setUpIteration();
36+
37+
try (Client c = getClientV2(); QueryResponse r = c.query("SELECT * FROM " + dataState.tableNameFilled, new QuerySettings()
38+
.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes)).get()){
39+
dataState.datasetAsRowBinaryWithNamesAndTypes = ByteBuffer.wrap(r.getInputStream().readAllBytes());
40+
LOGGER.info("Loaded {} from dataset", dataState.datasetAsRowBinaryWithNamesAndTypes.capacity());
41+
} catch (Exception e ) {
42+
LOGGER.error("Failed to init data for components benchmark", e);
43+
}
44+
}
45+
46+
@Benchmark
47+
public void DeserializerOutputStreamV1(DataState dataState, Blackhole blackhole) {
48+
InputStream input = new ByteArrayInputStream(dataState.datasetAsRowBinaryWithNamesAndTypes.array());
49+
try {
50+
ClickHouseConfig config = new ClickHouseConfig(Collections.singletonMap(ClickHouseClientOption.FORMAT, ClickHouseFormat.RowBinaryWithNamesAndTypes));
51+
ClickHouseDataProcessor p = new ClickHouseRowBinaryProcessor(config,
52+
ClickHouseInputStream.of(input), null, null, Collections.emptyMap());
53+
List<ClickHouseColumn> columns = p.getColumns();
54+
for (ClickHouseRecord record : p.records()) {
55+
for (int i = 0; i < columns.size(); i++) {
56+
blackhole.consume(record.getValue(i).asObject());
57+
}
58+
}
59+
} catch (Exception e) {
60+
LOGGER.error("Error: ", e);
61+
}
62+
}
63+
64+
@Benchmark
65+
public void DeserializerOutputStreamV2(DataState dataState, Blackhole blackhole) {
66+
InputStream input = new ByteArrayInputStream(dataState.datasetAsRowBinaryWithNamesAndTypes.array());
67+
try {
68+
RowBinaryWithNamesAndTypesFormatReader r = new RowBinaryWithNamesAndTypesFormatReader(input,
69+
new QuerySettings()
70+
.setUseTimeZone("UTC")
71+
.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes), new BinaryStreamReader.DefaultByteBufferAllocator());
72+
73+
Map<String, Object> row;
74+
while ((row = r.next()) != null) {
75+
for (String column : row.keySet()) {
76+
blackhole.consume(row.get(column));
77+
}
78+
}
79+
80+
} catch (Exception e) {
81+
LOGGER.error("Error: ", e);
82+
}
83+
}
84+
}

performance/src/test/com/clickhouse/benchmark/clients/InsertClient.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.clickhouse.client.api.insert.InsertResponse;
77
import com.clickhouse.client.api.insert.InsertSettings;
88
import com.clickhouse.client.config.ClickHouseClientOption;
9-
import com.clickhouse.data.ClickHouseColumn;
109
import com.clickhouse.data.ClickHouseDataProcessor;
1110
import com.clickhouse.data.ClickHouseFormat;
1211
import com.clickhouse.data.ClickHouseRecord;
@@ -20,7 +19,6 @@
2019
import org.slf4j.LoggerFactory;
2120

2221
import java.util.List;
23-
import java.util.Map;
2422

2523
import static com.clickhouse.benchmark.TestEnvironment.getServer;
2624

performance/src/test/com/clickhouse/benchmark/clients/Components.java renamed to performance/src/test/com/clickhouse/benchmark/clients/Serializers.java

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,43 @@
11
package com.clickhouse.benchmark.clients;
22

33
import com.clickhouse.benchmark.data.DataSet;
4+
import com.clickhouse.client.ClickHouseConfig;
5+
import com.clickhouse.client.api.Client;
46
import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter;
5-
import com.clickhouse.client.api.insert.InsertResponse;
7+
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
8+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
69
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
10+
import com.clickhouse.client.api.query.QueryResponse;
11+
import com.clickhouse.client.api.query.QuerySettings;
12+
import com.clickhouse.client.config.ClickHouseClientOption;
713
import com.clickhouse.data.ClickHouseColumn;
814
import com.clickhouse.data.ClickHouseDataProcessor;
915
import com.clickhouse.data.ClickHouseFormat;
16+
import com.clickhouse.data.ClickHouseInputStream;
1017
import com.clickhouse.data.ClickHouseOutputStream;
11-
import com.clickhouse.data.ClickHousePassThruStream;
1218
import com.clickhouse.data.ClickHouseRecord;
1319
import com.clickhouse.data.ClickHouseSerializer;
20+
import com.clickhouse.data.format.ClickHouseRowBinaryProcessor;
1421
import com.clickhouse.data.stream.Lz4OutputStream;
1522
import net.jpountz.lz4.LZ4Factory;
1623
import org.openjdk.jmh.annotations.Benchmark;
24+
import org.openjdk.jmh.annotations.Level;
25+
import org.openjdk.jmh.annotations.Setup;
26+
import org.openjdk.jmh.infra.Blackhole;
1727
import org.slf4j.Logger;
1828
import org.slf4j.LoggerFactory;
1929

30+
import java.io.ByteArrayInputStream;
2031
import java.io.ByteArrayOutputStream;
32+
import java.io.InputStream;
2133
import java.io.OutputStream;
34+
import java.nio.ByteBuffer;
35+
import java.util.Collections;
2236
import java.util.List;
2337
import java.util.Map;
2438

25-
public class Components extends BenchmarkBase {
26-
private static final Logger LOGGER = LoggerFactory.getLogger(Components.class);
27-
28-
static final int COMPRESS_BUFFER_SIZE = 64 * 1024; // 64K
29-
@Benchmark
30-
public void CompressingOutputStreamV1(DataState dataState) {
31-
DataSet dataSet = dataState.dataSet;
32-
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ClickHouseOutputStream out =
33-
new Lz4OutputStream(baos, COMPRESS_BUFFER_SIZE, null)) {
34-
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
35-
out.write(bytes);
36-
}
37-
} catch (Exception e) {
38-
LOGGER.error("Error: ", e);
39-
}
40-
}
41-
42-
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
43-
44-
@Benchmark
45-
public void CompressingOutputStreamV2(DataState dataState) {
46-
DataSet dataSet = dataState.dataSet;
47-
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
48-
ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(baos,
49-
factory.fastCompressor(), COMPRESS_BUFFER_SIZE)) {
50-
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
51-
out.write(bytes);
52-
}
53-
} catch (Exception e) {
54-
LOGGER.error("Error: ", e);
55-
}
56-
}
39+
public class Serializers extends BenchmarkBase {
40+
private static final Logger LOGGER = LoggerFactory.getLogger(Serializers.class);
5741

5842
private OutputStream createEmptyOutputStream() {
5943
return new OutputStream() {
@@ -84,6 +68,7 @@ public void close() {
8468
}
8569
};
8670
}
71+
8772
@Benchmark
8873
public void SerializerOutputStreamV1(DataState dataState) {
8974
OutputStream empty = createEmptyOutputStream();

0 commit comments

Comments
 (0)