Skip to content

Commit 5eec2d6

Browse files
committed
Implementing ConcurrentInsert
1 parent 0ea28d4 commit 5eec2d6

File tree

3 files changed

+225
-2
lines changed

3 files changed

+225
-2
lines changed

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33

44
import com.clickhouse.benchmark.clients.Compression;
5+
import com.clickhouse.benchmark.clients.ConcurrentInsertClient;
56
import com.clickhouse.benchmark.clients.Deserializers;
67
import com.clickhouse.benchmark.clients.InsertClient;
78
import com.clickhouse.benchmark.clients.QueryClient;
@@ -34,13 +35,13 @@ public static void main(String[] args) throws Exception {
3435
Options opt = new OptionsBuilder()
3536
.include(QueryClient.class.getSimpleName())
3637
.include(InsertClient.class.getSimpleName())
38+
.include(ConcurrentInsertClient.class.getSimpleName())
3739
.include(Compression.class.getSimpleName())
3840
.include(Serializers.class.getSimpleName())
3941
.include(Deserializers.class.getSimpleName())
4042
.forks(1) // must be a fork. No fork only for debugging
4143
.mode(Mode.SampleTime)
4244
.timeUnit(TimeUnit.MILLISECONDS)
43-
.threads(1)
4445
.addProfiler(GCProfiler.class)
4546
.addProfiler(MemPoolProfiler.class)
4647
.warmupIterations(3)

performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static class DataState {
8383
@Param({"data_empty"})
8484
String tableNameEmpty;
8585

86-
DataSet dataSet;
86+
static DataSet dataSet;
8787

8888
ByteBuffer datasetAsRowBinaryWithNamesAndTypes;
8989

@@ -107,6 +107,10 @@ public void setTableNameEmpty(String tableNameEmpty) {
107107
this.tableNameEmpty = tableNameEmpty;
108108
}
109109

110+
public static DataSet getDataSet() {
111+
return dataSet;
112+
}
113+
110114
}
111115

112116
@Setup(Level.Trial)
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package com.clickhouse.benchmark.clients;
2+
3+
import com.clickhouse.benchmark.BenchmarkRunner;
4+
import com.clickhouse.benchmark.data.DataSet;
5+
import com.clickhouse.client.ClickHouseClient;
6+
import com.clickhouse.client.ClickHouseResponse;
7+
import com.clickhouse.client.api.Client;
8+
import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter;
9+
import com.clickhouse.client.api.insert.InsertResponse;
10+
import com.clickhouse.client.api.insert.InsertSettings;
11+
import com.clickhouse.client.config.ClickHouseClientOption;
12+
import com.clickhouse.data.ClickHouseDataProcessor;
13+
import com.clickhouse.data.ClickHouseFormat;
14+
import com.clickhouse.data.ClickHouseRecord;
15+
import com.clickhouse.data.ClickHouseSerializer;
16+
import org.openjdk.jmh.annotations.Benchmark;
17+
import org.openjdk.jmh.annotations.Level;
18+
import org.openjdk.jmh.annotations.Scope;
19+
import org.openjdk.jmh.annotations.Setup;
20+
import org.openjdk.jmh.annotations.State;
21+
import org.openjdk.jmh.annotations.TearDown;
22+
import org.openjdk.jmh.annotations.Threads;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.util.List;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
import static com.clickhouse.benchmark.TestEnvironment.getServer;
30+
@Threads(3)
31+
@State(Scope.Thread)
32+
public class ConcurrentInsertClient extends BenchmarkBase {
33+
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentInsertClient.class);
34+
private static final AtomicInteger GLOBAL_ID = new AtomicInteger(0);
35+
private final ThreadLocal<Integer> invocationId = new ThreadLocal<>();
36+
@State(Scope.Benchmark)
37+
public static class GlobalState {
38+
ClickHouseClient clientV1Global = BenchmarkBase.getClientV1();
39+
Client clientV2Global = BenchmarkBase.getClientV2();
40+
ClickHouseClient getClientV1() {
41+
return clientV1Global;
42+
}
43+
Client getClientV2() {
44+
return clientV2Global;
45+
}
46+
}
47+
48+
private String createTableName(int id) {
49+
return String.format("%s_%d", "concurrent_data_empty", id);
50+
}
51+
52+
@Setup(Level.Invocation)
53+
public void setup() throws InterruptedException {
54+
int id = GLOBAL_ID.incrementAndGet();
55+
invocationId.set(id);
56+
DataSet dataSet = DataState.getDataSet();
57+
String tableName = createTableName(id);
58+
LOGGER.warn("setup create table name: " + tableName);
59+
// create table
60+
String createTableString = dataSet.getCreateTableString(tableName);
61+
runAndSyncQuery(createTableString, tableName);
62+
}
63+
@TearDown(Level.Invocation)
64+
public void verifyRowsInsertedAndCleanup(DataState dataState) throws InterruptedException {
65+
boolean success;
66+
String tableName = createTableName(invocationId.get());
67+
LOGGER.warn("TearDown: " + tableName);
68+
int count = 0;
69+
do {
70+
success = verifyCount(tableName, dataState.dataSet.getSize());
71+
if (!success) {
72+
LOGGER.warn("Retrying to verify rows inserted");
73+
try {
74+
Thread.sleep(2500);
75+
} catch (InterruptedException e) {
76+
LOGGER.error("Error: ", e);
77+
}
78+
}
79+
} while (!success && count++ < 10);
80+
if (!success) {
81+
LOGGER.error("Failed to verify rows inserted");
82+
throw new RuntimeException("Failed to verify rows inserted");
83+
}
84+
truncateTable(tableName);
85+
}
86+
@Benchmark
87+
public void insertV1(DataState dataState, GlobalState globalState) {
88+
int id = invocationId.get();
89+
String tableName = createTableName(id);
90+
// System.out.println(Thread.currentThread().getName() + " is executing insertV1:[" + id + "] " + globalState.getClientV1().hashCode());
91+
try {
92+
ClickHouseFormat format = dataState.dataSet.getFormat();
93+
try (ClickHouseResponse response = globalState.getClientV1().read(getServer())
94+
.write()
95+
.option(ClickHouseClientOption.ASYNC, false)
96+
.format(format)
97+
.query(BenchmarkRunner.getInsertQuery(tableName))
98+
.data(out -> {
99+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
100+
out.write(bytes);
101+
}
102+
}).executeAndWait()) {
103+
response.getSummary();
104+
}
105+
} catch (Exception e) {
106+
LOGGER.error("Error: ", e);
107+
}
108+
}
109+
110+
@Benchmark
111+
public void insertV2(DataState dataState, GlobalState globalState) {
112+
int id = invocationId.get();
113+
// System.out.println(Thread.currentThread().getName() + " is executing insertV2:[" + id + "] " + globalState.getClientV2().hashCode());
114+
String tableName = createTableName(id);
115+
LOGGER.warn("insertV2: " + tableName);
116+
try {
117+
ClickHouseFormat format = dataState.dataSet.getFormat();
118+
try (InsertResponse response = globalState.getClientV2().insert(tableName, out -> {
119+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
120+
out.write(bytes);
121+
}
122+
out.close();
123+
}, format, new InsertSettings().setDeduplicationToken("insert_v2")).get()) {
124+
response.getWrittenRows();
125+
}
126+
} catch (Exception e) {
127+
LOGGER.error("Error: ", e);
128+
}
129+
}
130+
131+
@Benchmark
132+
public void insertV1Compressed(DataState dataState, GlobalState globalState) {
133+
try {
134+
ClickHouseFormat format = dataState.dataSet.getFormat();
135+
try (ClickHouseResponse response = globalState.getClientV1().read(getServer())
136+
.write()
137+
.option(ClickHouseClientOption.ASYNC, false)
138+
.option(ClickHouseClientOption.DECOMPRESS, true)
139+
.format(format)
140+
.query(BenchmarkRunner.getInsertQuery(dataState.tableNameEmpty))
141+
.data(out -> {
142+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
143+
out.write(bytes);
144+
}
145+
}).executeAndWait()) {
146+
response.getSummary();
147+
}
148+
} catch (Exception e) {
149+
LOGGER.error("Error: ", e);
150+
}
151+
}
152+
@Benchmark
153+
public void insertV2Compressed(DataState dataState, GlobalState globalState) {
154+
try {
155+
ClickHouseFormat format = dataState.dataSet.getFormat();
156+
try (InsertResponse response = globalState.getClientV2().insert(dataState.tableNameEmpty, out -> {
157+
for (byte[] bytes: dataState.dataSet.getBytesList(format)) {
158+
out.write(bytes);
159+
}
160+
out.close();
161+
}, format, new InsertSettings()
162+
.compressClientRequest(true)).get()) {
163+
response.getWrittenRows();
164+
}
165+
} catch (Exception e) {
166+
LOGGER.error("Error: ", e);
167+
}
168+
}
169+
170+
@Benchmark
171+
public void insertV1RowBinary(DataState dataState, GlobalState globalState) {
172+
try {
173+
ClickHouseFormat format = ClickHouseFormat.RowBinary;
174+
try (ClickHouseResponse response = globalState.getClientV1().read(getServer())
175+
.write()
176+
.option(ClickHouseClientOption.ASYNC, false)
177+
.format(format)
178+
.query(BenchmarkRunner.getInsertQuery(dataState.tableNameEmpty))
179+
.data(out -> {
180+
ClickHouseDataProcessor p = dataState.dataSet.getClickHouseDataProcessor();
181+
ClickHouseSerializer[] serializers = p.getSerializers(clientV1.getConfig(), p.getColumns());
182+
for (ClickHouseRecord record : dataState.dataSet.getClickHouseRecords()) {
183+
for (int i = 0; i < serializers.length; i++) {
184+
serializers[i].serialize(record.getValue(i), out);
185+
}
186+
}
187+
})
188+
.executeAndWait()) {
189+
response.getSummary();
190+
}
191+
} catch ( Exception e) {
192+
LOGGER.error("Error: ", e);
193+
}
194+
}
195+
196+
@Benchmark
197+
public void insertV2RowBinary(DataState dataState, GlobalState globalState) {
198+
try {
199+
try (InsertResponse response = globalState.getClientV2().insert(dataState.tableNameEmpty, out -> {
200+
RowBinaryFormatWriter w = new RowBinaryFormatWriter(out, dataState.dataSet.getSchema(), ClickHouseFormat.RowBinary);
201+
for (List<Object> row : dataState.dataSet.getRowsOrdered()) {
202+
int index = 1;
203+
for (Object value : row) {
204+
w.setValue(index, value);
205+
index++;
206+
}
207+
w.commitRow();
208+
}
209+
out.flush();
210+
211+
}, ClickHouseFormat.RowBinaryWithDefaults, new InsertSettings()).get()) {
212+
response.getWrittenRows();
213+
}
214+
} catch (Exception e) {
215+
LOGGER.error("Error: ", e);
216+
}
217+
}
218+
}

0 commit comments

Comments
 (0)