Skip to content

Commit be749b4

Browse files
committed
feat: simple bench for ingestion
1 parent b045476 commit be749b4

File tree

8 files changed

+520
-2
lines changed

8 files changed

+520
-2
lines changed

ingester-common/src/main/java/io/greptime/common/SPI.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,11 @@
3232

3333
String name() default "";
3434

35+
/**
36+
* The priority of the SPI implementation.
37+
* If multiple SPI implementations are found, the ones with higher priority will be placed first.
38+
*
39+
* @return the priority of the SPI implementation
40+
*/
3541
int priority() default 0;
3642
}

ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@ public static void main(String[] args) throws Exception {
8080
.addField("field_json", DataType.Json)
8181
.build();
8282

83-
Config config = Config.newBuilder()
83+
Config cfg = Config.newBuilder()
8484
.allocatorInitReservation(0)
8585
.allocatorMaxAllocation(1024 * 1024 * 1024)
8686
.timeoutMsPerMessage(10000)
8787
.maxRequestsInFlight(8)
8888
.build();
8989
Context ctx = Context.newDefault().withCompression(Compression.None);
9090

91-
try (BulkStreamWriter bulkStreamWriter = greptimeDB.bulkStreamWriter(schema, config, ctx)) {
91+
try (BulkStreamWriter bulkStreamWriter = greptimeDB.bulkStreamWriter(schema, cfg, ctx)) {
9292

9393
// Write 100 times, each time write 100000 rows
9494
for (int i = 0; i < 100; i++) {
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2023 Greptime Team
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.greptime.bench;
18+
19+
import io.greptime.BulkStreamWriter;
20+
import io.greptime.BulkWrite;
21+
import io.greptime.GreptimeDB;
22+
import io.greptime.common.util.ServiceLoader;
23+
import io.greptime.common.util.SystemPropertyUtil;
24+
import io.greptime.models.Table;
25+
import io.greptime.models.TableSchema;
26+
import io.greptime.rpc.Compression;
27+
import io.greptime.rpc.Context;
28+
import java.util.Iterator;
29+
import java.util.concurrent.CompletableFuture;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
/**
34+
* BulkWriteBenchmark is a benchmark for the bulk write API of GreptimeDB.
35+
*
36+
* Env:
37+
* - db_endpoint: the endpoint of the GreptimeDB server
38+
* - db_name: the name of the database
39+
* - batch_size_per_request: the batch size per request
40+
* - zstd_compression: whether to use zstd compression
41+
*/
42+
public class BulkWriteBenchmark {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(BulkWriteBenchmark.class);
45+
46+
public static void main(String[] args) throws Exception {
47+
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
48+
String dbName = SystemPropertyUtil.get("db_name", "public");
49+
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
50+
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 100 * 1024);
51+
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
52+
LOG.info("Using zstd compression: {}", zstdCompression);
53+
LOG.info("Batch size: {}", batchSize);
54+
55+
GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
56+
TableDataProvider tableDataProvider =
57+
ServiceLoader.load(TableDataProvider.class).first();
58+
tableDataProvider.init();
59+
TableSchema tableSchema = tableDataProvider.tableSchema();
60+
61+
BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
62+
.allocatorInitReservation(0)
63+
.allocatorMaxAllocation(4 * 1024 * 1024 * 1024)
64+
.timeoutMsPerMessage(10000)
65+
.maxRequestsInFlight(8)
66+
.build();
67+
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
68+
Context ctx = Context.newDefault().withCompression(compression);
69+
70+
LOG.info("Start writing data");
71+
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) {
72+
Iterator<Object[]> rows = tableDataProvider.rows();
73+
74+
long start = System.nanoTime();
75+
for (; ; ) {
76+
Table.TableBufferRoot table = writer.tableBufferRoot();
77+
for (int i = 0; i < batchSize; i++) {
78+
if (!rows.hasNext()) {
79+
break;
80+
}
81+
table.addRow(rows.next());
82+
}
83+
// Complete the table; adding rows is no longer permitted.
84+
table.complete();
85+
86+
// Write the table data to the server
87+
CompletableFuture<Integer> future = writer.writeNext();
88+
future.whenComplete((r, t) -> {
89+
if (t != null) {
90+
LOG.error("Error writing data", t);
91+
} else {
92+
LOG.info("Wrote rows: {}", r);
93+
}
94+
});
95+
96+
if (!rows.hasNext()) {
97+
break;
98+
}
99+
}
100+
101+
writer.completed();
102+
103+
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
104+
}
105+
106+
greptimeDB.shutdownGracefully();
107+
}
108+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2023 Greptime Team
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.greptime.bench;
18+
19+
import io.greptime.GreptimeDB;
20+
import io.greptime.common.util.SerializingExecutor;
21+
import io.greptime.options.GreptimeOptions;
22+
23+
/**
24+
* DBConnector is a helper class to connect to a GreptimeDB instance.
25+
*/
26+
public class DBConnector {
27+
28+
public static GreptimeDB connectTo(String[] endpoints, String dbname) {
29+
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, dbname)
30+
.asyncPool(new SerializingExecutor("bench_async_pool"))
31+
.writeMaxRetries(0)
32+
.defaultStreamMaxWritePointsPerSecond(Integer.MAX_VALUE)
33+
.useZeroCopyWriteInBulkWrite(true)
34+
.build();
35+
return GreptimeDB.create(opts);
36+
}
37+
}

0 commit comments

Comments
 (0)