Skip to content

Commit 98a4c11

Browse files
authored
Merge pull request #2237 from ClickHouse/bencmark-concurrency
Benchmark concurrency
2 parents 7ea67f9 + c143f3a commit 98a4c11

File tree

5 files changed

+335
-5
lines changed

5 files changed

+335
-5
lines changed

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.clickhouse.benchmark;
22

3-
43
import com.clickhouse.benchmark.clients.Compression;
4+
import com.clickhouse.benchmark.clients.ConcurrentInsertClient;
5+
import com.clickhouse.benchmark.clients.ConcurrentQueryClient;
56
import com.clickhouse.benchmark.clients.Deserializers;
67
import com.clickhouse.benchmark.clients.InsertClient;
78
import com.clickhouse.benchmark.clients.MixedWorkload;
@@ -37,6 +38,8 @@ public static void main(String[] args) throws Exception {
3738
Options opt = new OptionsBuilder()
3839
.include(QueryClient.class.getSimpleName())
3940
.include(InsertClient.class.getSimpleName())
41+
.include(ConcurrentInsertClient.class.getSimpleName())
42+
.include(ConcurrentQueryClient.class.getSimpleName())
4043
.include(Compression.class.getSimpleName())
4144
.include(Serializers.class.getSimpleName())
4245
.include(Deserializers.class.getSimpleName())
@@ -46,8 +49,8 @@ public static void main(String[] args) throws Exception {
4649
.timeUnit(TimeUnit.MILLISECONDS)
4750
.addProfiler(GCProfiler.class)
4851
.addProfiler(MemPoolProfiler.class)
49-
.warmupIterations(3)
50-
.warmupTime(TimeValue.seconds(10))
52+
.warmupIterations(1)
53+
.warmupTime(TimeValue.seconds(5))
5154
.measurementIterations(10)
5255
.jvmArgs("-Xms8g", "-Xmx8g")
5356
.measurementTime(TimeValue.seconds(isCloud() ? 30 : 10))
@@ -80,6 +83,10 @@ public static String getSelectQuery(String tableName) {
8083
return "SELECT * FROM `" + DB_NAME + "`.`" + tableName + "`";
8184
}
8285

86+
public static String getSelectQueryWithLimit(String tableName, int limit) {
87+
return "SELECT * FROM `" + DB_NAME + "`.`" + tableName + "` LIMIT " + limit;
88+
}
89+
8390
public static String getSelectCountQuery(String tableName) {
8491
return String.format("SELECT COUNT(*) FROM `%s`.`%s`", DB_NAME, tableName);
8592
}

performance/src/test/com/clickhouse/benchmark/TestEnvironment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ public static void setupEnvironment() {
9999
.options(Collections.singletonMap(ClickHouseClientOption.SSL.getKey(), isCloud() ? "true" : "false"))
100100
.database(DB_NAME)
101101
.build();
102-
103102
createDatabase();
104103
}
105104

106105
public static void cleanupEnvironment() {
106+
LOGGER.info("Cleaning up ClickHouse test environment...");
107107
if (isCloud()) {
108108
dropDatabase();
109109
}

performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,14 @@ public class BenchmarkBase {
5757
protected Client clientV2;
5858
@Setup(Level.Iteration)
5959
public void setUpIteration() {
60+
LOGGER.info("BenchmarkBase::setUpIteration");
6061
clientV1 = getClientV1();
6162
clientV2 = getClientV2();
6263
}
6364

6465
@TearDown(Level.Iteration)
6566
public void tearDownIteration() {
67+
LOGGER.info("BenchmarkBase::tearDownIteration");
6668
if (clientV1 != null) {
6769
clientV1.close();
6870
clientV1 = null;
@@ -86,7 +88,7 @@ public static class DataState {
8688
@Param({"data_empty"})
8789
String tableNameEmpty;
8890

89-
DataSet dataSet;
91+
static DataSet dataSet;
9092

9193
ByteBuffer datasetAsRowBinaryWithNamesAndTypes;
9294

@@ -110,10 +112,15 @@ public void setTableNameEmpty(String tableNameEmpty) {
110112
this.tableNameEmpty = tableNameEmpty;
111113
}
112114

115+
public static DataSet getDataSet() {
116+
return dataSet;
117+
}
118+
113119
}
114120

115121
@Setup(Level.Trial)
116122
public void setup(DataState dataState) {
123+
LOGGER.info("BenchmarkBase::setup");
117124
setupEnvironment();
118125
LOGGER.info("Setup benchmarks using dataset: {}", dataState.datasetSourceName);
119126
if (dataState.dataSet == null && "simple".equals(dataState.datasetSourceName)) {
@@ -182,6 +189,10 @@ public static void truncateTable(String tableName) {
182189
runAndSyncQuery(String.format("TRUNCATE TABLE IF EXISTS `%s`.`%s`", DB_NAME, tableName), tableName);
183190
}
184191

192+
public static void dropTable(String tableName) {
193+
LOGGER.info("Truncating table: {}", tableName);
194+
runAndSyncQuery(String.format("DROP TABLE IF EXISTS `%s`.`%s`", DB_NAME, tableName), tableName);
195+
}
185196

186197
public static void insertData(String tableName, InputStream dataStream, ClickHouseFormat format) {
187198
try (Client client = getClientV2();
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
package com.clickhouse.benchmark.clients;
2+
3+
import com.clickhouse.benchmark.BenchmarkRunner;
4+
import com.clickhouse.benchmark.data.DataSet;
5+
import com.clickhouse.client.ClickHouseClient;
6+
import com.clickhouse.client.ClickHouseResponse;
7+
import com.clickhouse.client.api.Client;
8+
import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter;
9+
import com.clickhouse.client.api.insert.InsertResponse;
10+
import com.clickhouse.client.api.insert.InsertSettings;
11+
import com.clickhouse.client.config.ClickHouseClientOption;
12+
import com.clickhouse.data.ClickHouseDataProcessor;
13+
import com.clickhouse.data.ClickHouseFormat;
14+
import com.clickhouse.data.ClickHouseRecord;
15+
import com.clickhouse.data.ClickHouseSerializer;
16+
import org.openjdk.jmh.annotations.Benchmark;
17+
import org.openjdk.jmh.annotations.Level;
18+
import org.openjdk.jmh.annotations.Scope;
19+
import org.openjdk.jmh.annotations.Setup;
20+
import org.openjdk.jmh.annotations.State;
21+
import org.openjdk.jmh.annotations.TearDown;
22+
import org.openjdk.jmh.annotations.Threads;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.util.List;
27+
28+
import static com.clickhouse.benchmark.TestEnvironment.getServer;
29+
@Threads(3)
30+
@State(Scope.Benchmark)
31+
public class ConcurrentInsertClient extends BenchmarkBase {
32+
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentInsertClient.class);
33+
private static ClickHouseClient clientV1Shared;
34+
private static Client clientV2Shared;
35+
@Setup(Level.Trial)
36+
public void setUpIteration() {
37+
clientV1Shared = getClientV1();
38+
clientV2Shared = getClientV2();
39+
}
40+
@TearDown(Level.Trial)
41+
public void tearDownIteration() {
42+
if (clientV1Shared != null) {
43+
clientV1Shared.close();
44+
clientV1Shared = null;
45+
}
46+
if (clientV2Shared != null) {
47+
clientV2Shared.close();
48+
clientV2Shared = null;
49+
}
50+
}
51+
@State(Scope.Thread)
52+
public static class ThreadLocalState {
53+
public String createTableName() {
54+
String name = Thread.currentThread().getName();
55+
int index = name.lastIndexOf("-");
56+
String id = name.substring(index + 1);
57+
return String.format("%s_%s", "concurrent_data_empty", id);
58+
}
59+
@Setup(Level.Trial)
60+
public void setup() {
61+
DataSet dataSet = DataState.getDataSet();
62+
String tableName = createTableName();
63+
LOGGER.warn("setup create table name: " + tableName);
64+
// create table
65+
String createTableString = dataSet.getCreateTableString(tableName);
66+
runAndSyncQuery(createTableString, tableName);
67+
}
68+
69+
@TearDown(Level.Invocation)
70+
public void verifyRowsInsertedAndCleanup(DataState dataState) {
71+
String tableName = createTableName();
72+
boolean success;
73+
LOGGER.warn("TearDown: " + tableName);
74+
int count = 0;
75+
do {
76+
success = verifyCount(tableName, dataState.dataSet.getSize());
77+
if (!success) {
78+
LOGGER.warn("Retrying to verify rows inserted");
79+
try {
80+
Thread.sleep(2500);
81+
} catch (InterruptedException e) {
82+
LOGGER.error("Error: ", e);
83+
}
84+
}
85+
} while (!success && count++ < 10);
86+
if (!success) {
87+
LOGGER.error("Failed to verify rows inserted");
88+
throw new RuntimeException("Failed to verify rows inserted");
89+
}
90+
truncateTable(tableName);
91+
}
92+
}
93+
94+
95+
@Benchmark
96+
public void insertV1(DataState dataState, ThreadLocalState threadLocalState) {
97+
String tableName = threadLocalState.createTableName();
98+
try {
99+
ClickHouseFormat format = dataState.dataSet.getFormat();
100+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
101+
.write()
102+
.option(ClickHouseClientOption.ASYNC, false)
103+
.format(format)
104+
.query(BenchmarkRunner.getInsertQuery(tableName))
105+
.data(out -> {
106+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
107+
out.write(bytes);
108+
}
109+
}).executeAndWait()) {
110+
response.getSummary();
111+
}
112+
} catch (Exception e) {
113+
LOGGER.error("Error: ", e);
114+
}
115+
}
116+
117+
@Benchmark
118+
public void insertV2(DataState dataState, ThreadLocalState threadLocalState) {
119+
String tableName = threadLocalState.createTableName();
120+
LOGGER.warn("insertV2: " + tableName);
121+
try {
122+
ClickHouseFormat format = dataState.dataSet.getFormat();
123+
try (InsertResponse response = clientV2Shared.insert(tableName, out -> {
124+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
125+
out.write(bytes);
126+
}
127+
out.close();
128+
}, format, new InsertSettings()).get()) {
129+
response.getWrittenRows();
130+
}
131+
} catch (Exception e) {
132+
LOGGER.error("Error: ", e);
133+
}
134+
}
135+
136+
@Benchmark
137+
public void insertV1Compressed(DataState dataState, ThreadLocalState threadLocalState) {
138+
try {
139+
ClickHouseFormat format = dataState.dataSet.getFormat();
140+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
141+
.write()
142+
.option(ClickHouseClientOption.ASYNC, false)
143+
.option(ClickHouseClientOption.DECOMPRESS, true)
144+
.format(format)
145+
.query(BenchmarkRunner.getInsertQuery(threadLocalState.createTableName()))
146+
.data(out -> {
147+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
148+
out.write(bytes);
149+
}
150+
}).executeAndWait()) {
151+
response.getSummary();
152+
}
153+
} catch (Exception e) {
154+
LOGGER.error("Error: ", e);
155+
}
156+
}
157+
@Benchmark
158+
public void insertV2Compressed(DataState dataState, ThreadLocalState threadLocalState) {
159+
try {
160+
ClickHouseFormat format = dataState.dataSet.getFormat();
161+
try (InsertResponse response = clientV2Shared.insert(threadLocalState.createTableName(), out -> {
162+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
163+
out.write(bytes);
164+
}
165+
out.close();
166+
}, format, new InsertSettings()
167+
.compressClientRequest(true)).get()) {
168+
response.getWrittenRows();
169+
}
170+
} catch (Exception e) {
171+
LOGGER.error("Error: ", e);
172+
}
173+
}
174+
175+
@Benchmark
176+
public void insertV1RowBinary(DataState dataState, ThreadLocalState threadLocalState) {
177+
try {
178+
ClickHouseFormat format = ClickHouseFormat.RowBinary;
179+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
180+
.write()
181+
.option(ClickHouseClientOption.ASYNC, false)
182+
.format(format)
183+
.query(BenchmarkRunner.getInsertQuery(threadLocalState.createTableName()))
184+
.data(out -> {
185+
ClickHouseDataProcessor p = dataState.dataSet.getClickHouseDataProcessor();
186+
ClickHouseSerializer[] serializers = p.getSerializers(clientV1Shared.getConfig(), p.getColumns());
187+
for (ClickHouseRecord record : dataState.dataSet.getClickHouseRecords()) {
188+
for (int i = 0; i < serializers.length; i++) {
189+
serializers[i].serialize(record.getValue(i), out);
190+
}
191+
}
192+
})
193+
.executeAndWait()) {
194+
response.getSummary();
195+
}
196+
} catch ( Exception e) {
197+
LOGGER.error("Error: ", e);
198+
}
199+
}
200+
201+
@Benchmark
202+
public void insertV2RowBinary(DataState dataState, ThreadLocalState threadLocalState) {
203+
try {
204+
try (InsertResponse response = clientV2Shared.insert(threadLocalState.createTableName(), out -> {
205+
RowBinaryFormatWriter w = new RowBinaryFormatWriter(out, dataState.dataSet.getSchema(), ClickHouseFormat.RowBinary);
206+
for (List<Object> row : dataState.dataSet.getRowsOrdered()) {
207+
int index = 1;
208+
for (Object value : row) {
209+
w.setValue(index, value);
210+
index++;
211+
}
212+
w.commitRow();
213+
}
214+
out.flush();
215+
216+
}, ClickHouseFormat.RowBinaryWithDefaults, new InsertSettings()).get()) {
217+
response.getWrittenRows();
218+
}
219+
} catch (Exception e) {
220+
LOGGER.error("Error: ", e);
221+
}
222+
}
223+
}

0 commit comments

Comments
 (0)