Skip to content

Commit d251174

Browse files
committed
Implementing ConcurrentInsert & ConcurrentQuery
1 parent 5eec2d6 commit d251174

File tree

5 files changed

+195
-85
lines changed

5 files changed

+195
-85
lines changed

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
package com.clickhouse.benchmark;
22

3-
4-
import com.clickhouse.benchmark.clients.Compression;
53
import com.clickhouse.benchmark.clients.ConcurrentInsertClient;
6-
import com.clickhouse.benchmark.clients.Deserializers;
7-
import com.clickhouse.benchmark.clients.InsertClient;
8-
import com.clickhouse.benchmark.clients.QueryClient;
9-
import com.clickhouse.benchmark.clients.Serializers;
4+
import com.clickhouse.benchmark.clients.ConcurrentQueryClient;
105
import org.openjdk.jmh.annotations.Mode;
116
import org.openjdk.jmh.profile.GCProfiler;
127
import org.openjdk.jmh.profile.MemPoolProfiler;
@@ -33,22 +28,23 @@ public static void main(String[] args) throws Exception {
3328
Map<String, String> argMap = parseArguments(args);
3429

3530
Options opt = new OptionsBuilder()
36-
.include(QueryClient.class.getSimpleName())
37-
.include(InsertClient.class.getSimpleName())
31+
// .include(QueryClient.class.getSimpleName())
32+
// .include(InsertClient.class.getSimpleName())
3833
.include(ConcurrentInsertClient.class.getSimpleName())
39-
.include(Compression.class.getSimpleName())
40-
.include(Serializers.class.getSimpleName())
41-
.include(Deserializers.class.getSimpleName())
34+
.include(ConcurrentQueryClient.class.getSimpleName())
35+
// .include(Compression.class.getSimpleName())
36+
// .include(Serializers.class.getSimpleName())
37+
// .include(Deserializers.class.getSimpleName())
4238
.forks(1) // must be a fork. No fork only for debugging
4339
.mode(Mode.SampleTime)
4440
.timeUnit(TimeUnit.MILLISECONDS)
4541
.addProfiler(GCProfiler.class)
4642
.addProfiler(MemPoolProfiler.class)
47-
.warmupIterations(3)
48-
.warmupTime(TimeValue.seconds(10))
49-
.measurementIterations(10)
50-
.jvmArgs("-Xms8g", "-Xmx8g")
51-
.measurementTime(TimeValue.seconds(isCloud() ? 30 : 10))
43+
.warmupIterations(1)
44+
.warmupTime(TimeValue.seconds(5))
45+
.measurementIterations(3)
46+
.jvmArgs("-Xms10g", "-Xmx10g")
47+
.measurementTime(TimeValue.seconds(isCloud() ? 30 : 5))
5248
.resultFormat(ResultFormatType.JSON)
5349
// .output(String.format("jmh-results-%s-%s.out", isCloud() ? "cloud" : "local", System.currentTimeMillis()))
5450
.result(String.format("jmh-results-%s-%s.json", isCloud() ? "cloud" : "local", System.currentTimeMillis()))

performance/src/test/com/clickhouse/benchmark/TestEnvironment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ public static void setupEnvironment() {
9999
.options(Collections.singletonMap(ClickHouseClientOption.SSL.getKey(), isCloud() ? "true" : "false"))
100100
.database(DB_NAME)
101101
.build();
102-
103102
createDatabase();
104103
}
105104

106105
public static void cleanupEnvironment() {
106+
LOGGER.info("Cleaning up ClickHouse test environment...");
107107
if (isCloud()) {
108108
dropDatabase();
109109
}

performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@ public class BenchmarkBase {
5353
protected Client clientV2;
5454
@Setup(Level.Iteration)
5555
public void setUpIteration() {
56+
LOGGER.info("BenchmarkBase::setUpIteration");
5657
clientV1 = getClientV1();
5758
clientV2 = getClientV2();
5859

5960
}
6061

6162
@TearDown(Level.Iteration)
6263
public void tearDownIteration() {
64+
LOGGER.info("BenchmarkBase::tearDownIteration");
6365
if (clientV1 != null) {
6466
clientV1.close();
6567
clientV1 = null;
@@ -115,6 +117,7 @@ public static DataSet getDataSet() {
115117

116118
@Setup(Level.Trial)
117119
public void setup(DataState dataState) {
120+
LOGGER.info("BenchmarkBase::setup");
118121
setupEnvironment();
119122
LOGGER.info("Setup benchmarks using dataset: {}", dataState.datasetSourceName);
120123
if (dataState.dataSet == null && "simple".equals(dataState.datasetSourceName)) {
@@ -183,6 +186,10 @@ public static void truncateTable(String tableName) {
183186
runAndSyncQuery(String.format("TRUNCATE TABLE IF EXISTS `%s`.`%s`", DB_NAME, tableName), tableName);
184187
}
185188

189+
public static void dropTable(String tableName) {
190+
LOGGER.info("Truncating table: {}", tableName);
191+
runAndSyncQuery(String.format("DROP TABLE IF EXISTS `%s`.`%s`", DB_NAME, tableName), tableName);
192+
}
186193

187194
public static void insertData(String tableName, InputStream dataStream, ClickHouseFormat format) {
188195
try (Client client = getClientV2();

performance/src/test/com/clickhouse/benchmark/clients/ConcurrentInsertClient.java

Lines changed: 74 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -27,70 +27,78 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828

2929
import static com.clickhouse.benchmark.TestEnvironment.getServer;
30-
@Threads(3)
31-
@State(Scope.Thread)
30+
@Threads(2)
31+
@State(Scope.Benchmark)
3232
public class ConcurrentInsertClient extends BenchmarkBase {
3333
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentInsertClient.class);
34-
private static final AtomicInteger GLOBAL_ID = new AtomicInteger(0);
35-
private final ThreadLocal<Integer> invocationId = new ThreadLocal<>();
36-
@State(Scope.Benchmark)
37-
public static class GlobalState {
38-
ClickHouseClient clientV1Global = BenchmarkBase.getClientV1();
39-
Client clientV2Global = BenchmarkBase.getClientV2();
40-
ClickHouseClient getClientV1() {
41-
return clientV1Global;
34+
private static ClickHouseClient clientV1Shared;
35+
private static Client clientV2Shared;
36+
@Setup(Level.Trial)
37+
public void setUpIteration() {
38+
clientV1Shared = getClientV1();
39+
clientV2Shared = getClientV2();
40+
}
41+
@TearDown(Level.Trial)
42+
public void tearDownIteration() {
43+
if (clientV1Shared != null) {
44+
clientV1Shared.close();
45+
clientV1Shared = null;
4246
}
43-
Client getClientV2() {
44-
return clientV2Global;
47+
if (clientV2Shared != null) {
48+
clientV2Shared.close();
49+
clientV2Shared = null;
4550
}
4651
}
52+
@State(Scope.Thread)
53+
public static class ThreadLocalState {
54+
public String createTableName() {
55+
String name = Thread.currentThread().getName();
56+
int index = name.lastIndexOf("-");
57+
String id = name.substring(index + 1);
58+
return String.format("%s_%s", "concurrent_data_empty", id);
59+
}
60+
@Setup(Level.Invocation)
61+
public void setup() {
62+
DataSet dataSet = DataState.getDataSet();
63+
String tableName = createTableName();
64+
LOGGER.warn("setup create table name: " + tableName);
65+
// create table
66+
String createTableString = dataSet.getCreateTableString(tableName);
67+
runAndSyncQuery(createTableString, tableName);
68+
}
4769

48-
private String createTableName(int id) {
49-
return String.format("%s_%d", "concurrent_data_empty", id);
50-
}
51-
52-
@Setup(Level.Invocation)
53-
public void setup() throws InterruptedException {
54-
int id = GLOBAL_ID.incrementAndGet();
55-
invocationId.set(id);
56-
DataSet dataSet = DataState.getDataSet();
57-
String tableName = createTableName(id);
58-
LOGGER.warn("setup create table name: " + tableName);
59-
// create table
60-
String createTableString = dataSet.getCreateTableString(tableName);
61-
runAndSyncQuery(createTableString, tableName);
62-
}
63-
@TearDown(Level.Invocation)
64-
public void verifyRowsInsertedAndCleanup(DataState dataState) throws InterruptedException {
65-
boolean success;
66-
String tableName = createTableName(invocationId.get());
67-
LOGGER.warn("TearDown: " + tableName);
68-
int count = 0;
69-
do {
70-
success = verifyCount(tableName, dataState.dataSet.getSize());
71-
if (!success) {
72-
LOGGER.warn("Retrying to verify rows inserted");
73-
try {
74-
Thread.sleep(2500);
75-
} catch (InterruptedException e) {
76-
LOGGER.error("Error: ", e);
70+
@TearDown(Level.Invocation)
71+
public void verifyRowsInsertedAndCleanup(DataState dataState) {
72+
String tableName = createTableName();
73+
boolean success;
74+
LOGGER.warn("TearDown: " + tableName);
75+
int count = 0;
76+
do {
77+
success = verifyCount(tableName, dataState.dataSet.getSize());
78+
if (!success) {
79+
LOGGER.warn("Retrying to verify rows inserted");
80+
try {
81+
Thread.sleep(2500);
82+
} catch (InterruptedException e) {
83+
LOGGER.error("Error: ", e);
84+
}
7785
}
86+
} while (!success && count++ < 10);
87+
if (!success) {
88+
LOGGER.error("Failed to verify rows inserted");
89+
throw new RuntimeException("Failed to verify rows inserted");
7890
}
79-
} while (!success && count++ < 10);
80-
if (!success) {
81-
LOGGER.error("Failed to verify rows inserted");
82-
throw new RuntimeException("Failed to verify rows inserted");
91+
truncateTable(tableName);
8392
}
84-
truncateTable(tableName);
8593
}
94+
95+
8696
@Benchmark
87-
public void insertV1(DataState dataState, GlobalState globalState) {
88-
int id = invocationId.get();
89-
String tableName = createTableName(id);
90-
// System.out.println(Thread.currentThread().getName() + " is executing insertV1:[" + id + "] " + globalState.getClientV1().hashCode());
97+
public void insertV1(DataState dataState, ThreadLocalState threadLocalState) {
98+
String tableName = threadLocalState.createTableName();
9199
try {
92100
ClickHouseFormat format = dataState.dataSet.getFormat();
93-
try (ClickHouseResponse response = globalState.getClientV1().read(getServer())
101+
try (ClickHouseResponse response = getClientV1().read(getServer())
94102
.write()
95103
.option(ClickHouseClientOption.ASYNC, false)
96104
.format(format)
@@ -108,19 +116,17 @@ public void insertV1(DataState dataState, GlobalState globalState) {
108116
}
109117

110118
@Benchmark
111-
public void insertV2(DataState dataState, GlobalState globalState) {
112-
int id = invocationId.get();
113-
// System.out.println(Thread.currentThread().getName() + " is executing insertV2:[" + id + "] " + globalState.getClientV2().hashCode());
114-
String tableName = createTableName(id);
119+
public void insertV2(DataState dataState, ThreadLocalState threadLocalState) {
120+
String tableName = threadLocalState.createTableName();
115121
LOGGER.warn("insertV2: " + tableName);
116122
try {
117123
ClickHouseFormat format = dataState.dataSet.getFormat();
118-
try (InsertResponse response = globalState.getClientV2().insert(tableName, out -> {
124+
try (InsertResponse response = clientV2Shared.insert(tableName, out -> {
119125
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
120126
out.write(bytes);
121127
}
122128
out.close();
123-
}, format, new InsertSettings().setDeduplicationToken("insert_v2")).get()) {
129+
}, format, new InsertSettings()).get()) {
124130
response.getWrittenRows();
125131
}
126132
} catch (Exception e) {
@@ -129,15 +135,15 @@ public void insertV2(DataState dataState, GlobalState globalState) {
129135
}
130136

131137
@Benchmark
132-
public void insertV1Compressed(DataState dataState, GlobalState globalState) {
138+
public void insertV1Compressed(DataState dataState, ThreadLocalState threadLocalState) {
133139
try {
134140
ClickHouseFormat format = dataState.dataSet.getFormat();
135-
try (ClickHouseResponse response = globalState.getClientV1().read(getServer())
141+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
136142
.write()
137143
.option(ClickHouseClientOption.ASYNC, false)
138144
.option(ClickHouseClientOption.DECOMPRESS, true)
139145
.format(format)
140-
.query(BenchmarkRunner.getInsertQuery(dataState.tableNameEmpty))
146+
.query(BenchmarkRunner.getInsertQuery(threadLocalState.createTableName()))
141147
.data(out -> {
142148
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
143149
out.write(bytes);
@@ -150,10 +156,10 @@ public void insertV1Compressed(DataState dataState, GlobalState globalState) {
150156
}
151157
}
152158
@Benchmark
153-
public void insertV2Compressed(DataState dataState, GlobalState globalState) {
159+
public void insertV2Compressed(DataState dataState, ThreadLocalState threadLocalState) {
154160
try {
155161
ClickHouseFormat format = dataState.dataSet.getFormat();
156-
try (InsertResponse response = globalState.getClientV2().insert(dataState.tableNameEmpty, out -> {
162+
try (InsertResponse response = clientV2Shared.insert(threadLocalState.createTableName(), out -> {
157163
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
158164
out.write(bytes);
159165
}
@@ -168,17 +174,17 @@ public void insertV2Compressed(DataState dataState, GlobalState globalState) {
168174
}
169175

170176
@Benchmark
171-
public void insertV1RowBinary(DataState dataState, GlobalState globalState) {
177+
public void insertV1RowBinary(DataState dataState, ThreadLocalState threadLocalState) {
172178
try {
173179
ClickHouseFormat format = ClickHouseFormat.RowBinary;
174-
try (ClickHouseResponse response = globalState.getClientV1().read(getServer())
180+
try (ClickHouseResponse response = clientV1Shared.read(getServer())
175181
.write()
176182
.option(ClickHouseClientOption.ASYNC, false)
177183
.format(format)
178-
.query(BenchmarkRunner.getInsertQuery(dataState.tableNameEmpty))
184+
.query(BenchmarkRunner.getInsertQuery(threadLocalState.createTableName()))
179185
.data(out -> {
180186
ClickHouseDataProcessor p = dataState.dataSet.getClickHouseDataProcessor();
181-
ClickHouseSerializer[] serializers = p.getSerializers(clientV1.getConfig(), p.getColumns());
187+
ClickHouseSerializer[] serializers = p.getSerializers(clientV1Shared.getConfig(), p.getColumns());
182188
for (ClickHouseRecord record : dataState.dataSet.getClickHouseRecords()) {
183189
for (int i = 0; i < serializers.length; i++) {
184190
serializers[i].serialize(record.getValue(i), out);
@@ -194,9 +200,9 @@ public void insertV1RowBinary(DataState dataState, GlobalState globalState) {
194200
}
195201

196202
@Benchmark
197-
public void insertV2RowBinary(DataState dataState, GlobalState globalState) {
203+
public void insertV2RowBinary(DataState dataState, ThreadLocalState threadLocalState) {
198204
try {
199-
try (InsertResponse response = globalState.getClientV2().insert(dataState.tableNameEmpty, out -> {
205+
try (InsertResponse response = clientV2Shared.insert(threadLocalState.createTableName(), out -> {
200206
RowBinaryFormatWriter w = new RowBinaryFormatWriter(out, dataState.dataSet.getSchema(), ClickHouseFormat.RowBinary);
201207
for (List<Object> row : dataState.dataSet.getRowsOrdered()) {
202208
int index = 1;

0 commit comments

Comments
 (0)