Skip to content

Commit 8f1b22d

Browse files
committed
Merge branch 'main' into perf_typed_tests
2 parents c677b50 + 98a4c11 commit 8f1b22d

File tree

5 files changed

+335
-5
lines changed

5 files changed

+335
-5
lines changed

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package com.clickhouse.benchmark;
22

3-
43
import com.clickhouse.benchmark.clients.Compression;
54
import com.clickhouse.benchmark.clients.DataTypes;
5+
import com.clickhouse.benchmark.clients.ConcurrentInsertClient;
6+
import com.clickhouse.benchmark.clients.ConcurrentQueryClient;
67
import com.clickhouse.benchmark.clients.Deserializers;
78
import com.clickhouse.benchmark.clients.InsertClient;
89
import com.clickhouse.benchmark.clients.MixedWorkload;
@@ -38,7 +39,9 @@ public static void main(String[] args) throws Exception {
3839
Options opt = new OptionsBuilder()
3940
.include(QueryClient.class.getName())
4041
.include(InsertClient.class.getName())
41-
// .include(Compression.class.getName())
42+
// .include(ConcurrentInsertClient.class.getSimpleName())
43+
.include(ConcurrentQueryClient.class.getSimpleName())
44+
.include(Compression.class.getName())
4245
.include(Serializers.class.getName())
4346
.include(Deserializers.class.getName())
4447
.include(MixedWorkload.class.getName())
@@ -49,7 +52,7 @@ public static void main(String[] args) throws Exception {
4952
.addProfiler(GCProfiler.class)
5053
.addProfiler(MemPoolProfiler.class)
5154
.warmupIterations(1)
52-
.warmupTime(TimeValue.seconds(10))
55+
.warmupTime(TimeValue.seconds(5))
5356
.measurementIterations(10)
5457
.jvmArgs("-Xms8g", "-Xmx8g")
5558
.measurementTime(TimeValue.seconds(isCloud() ? 30 : 10))
@@ -82,6 +85,10 @@ public static String getSelectQuery(String tableName) {
8285
return "SELECT * FROM `" + DB_NAME + "`.`" + tableName + "`";
8386
}
8487

88+
public static String getSelectQueryWithLimit(String tableName, int limit) {
89+
return "SELECT * FROM `" + DB_NAME + "`.`" + tableName + "` LIMIT " + limit;
90+
}
91+
8592
public static String getSelectCountQuery(String tableName) {
8693
return String.format("SELECT COUNT(*) FROM `%s`.`%s`", DB_NAME, tableName);
8794
}

performance/src/test/com/clickhouse/benchmark/TestEnvironment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ public static void setupEnvironment() {
9999
.options(Collections.singletonMap(ClickHouseClientOption.SSL.getKey(), isCloud() ? "true" : "false"))
100100
.database(DB_NAME)
101101
.build();
102-
103102
createDatabase();
104103
}
105104

106105
public static void cleanupEnvironment() {
106+
LOGGER.info("Cleaning up ClickHouse test environment...");
107107
if (isCloud()) {
108108
dropDatabase();
109109
}

performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ public class BenchmarkBase {
5858
protected Client clientV2;
5959
@Setup(Level.Iteration)
6060
public void setUpIteration() {
61+
LOGGER.info("BenchmarkBase::setUpIteration");
6162
clientV1 = getClientV1();
6263
clientV2 = getClientV2();
6364
}
6465

6566
@TearDown(Level.Iteration)
6667
public void tearDownIteration() {
68+
LOGGER.info("BenchmarkBase::tearDownIteration");
6769
if (clientV1 != null) {
6870
clientV1.close();
6971
clientV1 = null;
@@ -87,7 +89,7 @@ public static class DataState {
8789
@Param({"data_empty"})
8890
String tableNameEmpty;
8991

90-
DataSet dataSet;
92+
static DataSet dataSet;
9193

9294
ByteBuffer datasetAsRowBinaryWithNamesAndTypes;
9395

@@ -113,10 +115,15 @@ public void setTableNameEmpty(String tableNameEmpty) {
113115
this.tableNameEmpty = tableNameEmpty;
114116
}
115117

118+
public static DataSet getDataSet() {
119+
return dataSet;
120+
}
121+
116122
}
117123

118124
@Setup(Level.Trial)
119125
public void setup(DataState dataState) {
126+
LOGGER.info("BenchmarkBase::setup");
120127
setupEnvironment();
121128
LOGGER.info("Setup benchmarks using dataset: {}", dataState.datasetSourceName);
122129
if (dataState.dataSet == null && "simple".equals(dataState.datasetSourceName)) {
@@ -185,6 +192,10 @@ public static void truncateTable(String tableName) {
185192
runAndSyncQuery(String.format("TRUNCATE TABLE IF EXISTS `%s`.`%s`", DB_NAME, tableName), tableName);
186193
}
187194

195+
public static void dropTable(String tableName) {
196+
LOGGER.info("Truncating table: {}", tableName);
197+
runAndSyncQuery(String.format("DROP TABLE IF EXISTS `%s`.`%s`", DB_NAME, tableName), tableName);
198+
}
188199

189200
public static void insertData(String tableName, InputStream dataStream, ClickHouseFormat format) {
190201
try (Client client = getClientV2();
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
package com.clickhouse.benchmark.clients;
2+
3+
import com.clickhouse.benchmark.BenchmarkRunner;
4+
import com.clickhouse.benchmark.data.DataSet;
5+
import com.clickhouse.client.ClickHouseClient;
6+
import com.clickhouse.client.ClickHouseResponse;
7+
import com.clickhouse.client.api.Client;
8+
import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter;
9+
import com.clickhouse.client.api.insert.InsertResponse;
10+
import com.clickhouse.client.api.insert.InsertSettings;
11+
import com.clickhouse.client.config.ClickHouseClientOption;
12+
import com.clickhouse.data.ClickHouseDataProcessor;
13+
import com.clickhouse.data.ClickHouseFormat;
14+
import com.clickhouse.data.ClickHouseRecord;
15+
import com.clickhouse.data.ClickHouseSerializer;
16+
import org.openjdk.jmh.annotations.Benchmark;
17+
import org.openjdk.jmh.annotations.Level;
18+
import org.openjdk.jmh.annotations.Scope;
19+
import org.openjdk.jmh.annotations.Setup;
20+
import org.openjdk.jmh.annotations.State;
21+
import org.openjdk.jmh.annotations.TearDown;
22+
import org.openjdk.jmh.annotations.Threads;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.util.List;
27+
28+
import static com.clickhouse.benchmark.TestEnvironment.getServer;
29+
@Threads(3)
30+
@State(Scope.Benchmark)
31+
public class ConcurrentInsertClient extends BenchmarkBase {
32+
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentInsertClient.class);
33+
private static ClickHouseClient clientV1Shared;
34+
private static Client clientV2Shared;
35+
@Setup(Level.Trial)
36+
public void setUpIteration() {
37+
clientV1Shared = getClientV1();
38+
clientV2Shared = getClientV2();
39+
}
40+
@TearDown(Level.Trial)
41+
public void tearDownIteration() {
42+
if (clientV1Shared != null) {
43+
clientV1Shared.close();
44+
clientV1Shared = null;
45+
}
46+
if (clientV2Shared != null) {
47+
clientV2Shared.close();
48+
clientV2Shared = null;
49+
}
50+
}
51+
@State(Scope.Thread)
52+
public static class ThreadLocalState {
53+
public String createTableName() {
54+
String name = Thread.currentThread().getName();
55+
int index = name.lastIndexOf("-");
56+
String id = name.substring(index + 1);
57+
return String.format("%s_%s", "concurrent_data_empty", id);
58+
}
59+
@Setup(Level.Trial)
60+
public void setup() {
61+
DataSet dataSet = DataState.getDataSet();
62+
String tableName = createTableName();
63+
LOGGER.warn("setup create table name: " + tableName);
64+
// create table
65+
String createTableString = dataSet.getCreateTableString(tableName);
66+
runAndSyncQuery(createTableString, tableName);
67+
}
68+
69+
@TearDown(Level.Invocation)
70+
public void verifyRowsInsertedAndCleanup(DataState dataState) {
71+
String tableName = createTableName();
72+
boolean success;
73+
LOGGER.warn("TearDown: " + tableName);
74+
int count = 0;
75+
do {
76+
success = verifyCount(tableName, dataState.dataSet.getSize());
77+
if (!success) {
78+
LOGGER.warn("Retrying to verify rows inserted");
79+
try {
80+
Thread.sleep(2500);
81+
} catch (InterruptedException e) {
82+
LOGGER.error("Error: ", e);
83+
}
84+
}
85+
} while (!success && count++ < 10);
86+
if (!success) {
87+
LOGGER.error("Failed to verify rows inserted");
88+
throw new RuntimeException("Failed to verify rows inserted");
89+
}
90+
truncateTable(tableName);
91+
}
92+
}
93+
94+
95+
@Benchmark
96+
public void insertV1(DataState dataState, ThreadLocalState threadLocalState) {
97+
String tableName = threadLocalState.createTableName();
98+
try {
99+
ClickHouseFormat format = dataState.dataSet.getFormat();
100+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
101+
.write()
102+
.option(ClickHouseClientOption.ASYNC, false)
103+
.format(format)
104+
.query(BenchmarkRunner.getInsertQuery(tableName))
105+
.data(out -> {
106+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
107+
out.write(bytes);
108+
}
109+
}).executeAndWait()) {
110+
response.getSummary();
111+
}
112+
} catch (Exception e) {
113+
LOGGER.error("Error: ", e);
114+
}
115+
}
116+
117+
@Benchmark
118+
public void insertV2(DataState dataState, ThreadLocalState threadLocalState) {
119+
String tableName = threadLocalState.createTableName();
120+
LOGGER.warn("insertV2: " + tableName);
121+
try {
122+
ClickHouseFormat format = dataState.dataSet.getFormat();
123+
try (InsertResponse response = clientV2Shared.insert(tableName, out -> {
124+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
125+
out.write(bytes);
126+
}
127+
out.close();
128+
}, format, new InsertSettings()).get()) {
129+
response.getWrittenRows();
130+
}
131+
} catch (Exception e) {
132+
LOGGER.error("Error: ", e);
133+
}
134+
}
135+
136+
@Benchmark
137+
public void insertV1Compressed(DataState dataState, ThreadLocalState threadLocalState) {
138+
try {
139+
ClickHouseFormat format = dataState.dataSet.getFormat();
140+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
141+
.write()
142+
.option(ClickHouseClientOption.ASYNC, false)
143+
.option(ClickHouseClientOption.DECOMPRESS, true)
144+
.format(format)
145+
.query(BenchmarkRunner.getInsertQuery(threadLocalState.createTableName()))
146+
.data(out -> {
147+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
148+
out.write(bytes);
149+
}
150+
}).executeAndWait()) {
151+
response.getSummary();
152+
}
153+
} catch (Exception e) {
154+
LOGGER.error("Error: ", e);
155+
}
156+
}
157+
@Benchmark
158+
public void insertV2Compressed(DataState dataState, ThreadLocalState threadLocalState) {
159+
try {
160+
ClickHouseFormat format = dataState.dataSet.getFormat();
161+
try (InsertResponse response = clientV2Shared.insert(threadLocalState.createTableName(), out -> {
162+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
163+
out.write(bytes);
164+
}
165+
out.close();
166+
}, format, new InsertSettings()
167+
.compressClientRequest(true)).get()) {
168+
response.getWrittenRows();
169+
}
170+
} catch (Exception e) {
171+
LOGGER.error("Error: ", e);
172+
}
173+
}
174+
175+
@Benchmark
176+
public void insertV1RowBinary(DataState dataState, ThreadLocalState threadLocalState) {
177+
try {
178+
ClickHouseFormat format = ClickHouseFormat.RowBinary;
179+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
180+
.write()
181+
.option(ClickHouseClientOption.ASYNC, false)
182+
.format(format)
183+
.query(BenchmarkRunner.getInsertQuery(threadLocalState.createTableName()))
184+
.data(out -> {
185+
ClickHouseDataProcessor p = dataState.dataSet.getClickHouseDataProcessor();
186+
ClickHouseSerializer[] serializers = p.getSerializers(clientV1Shared.getConfig(), p.getColumns());
187+
for (ClickHouseRecord record : dataState.dataSet.getClickHouseRecords()) {
188+
for (int i = 0; i < serializers.length; i++) {
189+
serializers[i].serialize(record.getValue(i), out);
190+
}
191+
}
192+
})
193+
.executeAndWait()) {
194+
response.getSummary();
195+
}
196+
} catch ( Exception e) {
197+
LOGGER.error("Error: ", e);
198+
}
199+
}
200+
201+
@Benchmark
202+
public void insertV2RowBinary(DataState dataState, ThreadLocalState threadLocalState) {
203+
try {
204+
try (InsertResponse response = clientV2Shared.insert(threadLocalState.createTableName(), out -> {
205+
RowBinaryFormatWriter w = new RowBinaryFormatWriter(out, dataState.dataSet.getSchema(), ClickHouseFormat.RowBinary);
206+
for (List<Object> row : dataState.dataSet.getRowsOrdered()) {
207+
int index = 1;
208+
for (Object value : row) {
209+
w.setValue(index, value);
210+
index++;
211+
}
212+
w.commitRow();
213+
}
214+
out.flush();
215+
216+
}, ClickHouseFormat.RowBinaryWithDefaults, new InsertSettings()).get()) {
217+
response.getWrittenRows();
218+
}
219+
} catch (Exception e) {
220+
LOGGER.error("Error: ", e);
221+
}
222+
}
223+
}

0 commit comments

Comments
 (0)