Skip to content

Commit 6150d88

Browse files
authored
feat: real pipeline for bulk write (#71)
* feat: real pipeline write and new bench code * release: 0.14.0 * chore: minor change * chore: improve prepare data perf * fix: json interal type
1 parent 14c7cae commit 6150d88

File tree

28 files changed

+751
-249
lines changed

28 files changed

+751
-249
lines changed

ingester-all/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.greptime</groupId>
2323
<artifactId>greptimedb-ingester</artifactId>
24-
<version>0.12.2</version>
24+
<version>0.14.0</version>
2525
</parent>
2626

2727
<artifactId>ingester-all</artifactId>

ingester-bulk-protocol/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.greptime</groupId>
2323
<artifactId>greptimedb-ingester</artifactId>
24-
<version>0.12.2</version>
24+
<version>0.14.0</version>
2525
</parent>
2626

2727
<artifactId>ingester-bulk-protocol</artifactId>

ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,20 +135,26 @@ public static BulkWriteManager create(
135135
* @param table the name of the target table
136136
* @param schema the Arrow schema defining the structure of the data to be written
137137
* @param timeoutMs the timeout in milliseconds for the write operation
138+
* @param maxRequestsInFlight the max in-flight requests in the stream
138139
* @param options optional RPC-layer hints to configure the underlying Flight client call
139140
* @return a BulkStreamWriter instance that manages the data transfer process
140141
*/
141-
public BulkWriteService intoBulkWriteStream(String table, Schema schema, long timeoutMs, CallOption... options) {
142+
public BulkWriteService intoBulkWriteStream(
143+
String table, Schema schema, long timeoutMs, int maxRequestsInFlight, CallOption... options) {
142144
FlightDescriptor descriptor = FlightDescriptor.path(table);
143-
return new BulkWriteService(this, this.allocator, schema, descriptor, timeoutMs, options);
145+
return new BulkWriteService(this, this.allocator, schema, descriptor, timeoutMs, maxRequestsInFlight, options);
144146
}
145147

146148
VectorSchemaRoot createSchemaRoot(Schema schema) {
147149
return VectorSchemaRoot.create(schema, this.allocator);
148150
}
149151

150-
ClientStreamListener startPut(FlightDescriptor descriptor, PutListener metadataListener, CallOption... options) {
151-
return this.flightClient.startPut(descriptor, metadataListener, options);
152+
ClientStreamListener startPut(
153+
FlightDescriptor descriptor,
154+
PutListener metadataListener,
155+
long maxRequestsInFlight,
156+
CallOption... options) {
157+
return this.flightClient.startPut(descriptor, metadataListener, maxRequestsInFlight, options);
152158
}
153159

154160
DictionaryProvider newDefaultDictionaryProvider() {

ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class BulkWriteService implements AutoCloseable {
7070
* @param schema The Arrow schema defining the data structure
7171
* @param descriptor The FlightDescriptor identifying the data stream
7272
* @param timeoutMs The timeout in milliseconds for operations
73+
* @param maxRequestsInFlight the max in-flight requests in the stream
7374
* @param options Additional call options for the Flight client
7475
*/
7576
public BulkWriteService(
@@ -78,12 +79,13 @@ public BulkWriteService(
7879
Schema schema,
7980
FlightDescriptor descriptor,
8081
long timeoutMs,
82+
int maxRequestsInFlight,
8183
CallOption... options) {
8284
this.manager = manager;
8385
this.allocator = allocator;
8486
this.root = manager.createSchemaRoot(schema);
8587
this.metadataListener = new AsyncPutListener();
86-
this.listener = manager.startPut(descriptor, this.metadataListener, options);
88+
this.listener = manager.startPut(descriptor, this.metadataListener, maxRequestsInFlight, options);
8789
this.timeoutMs = timeoutMs;
8890
}
8991

@@ -154,11 +156,8 @@ public PutStage putNext() {
154156

155157
// Prepare metadata buffer
156158
byte[] metadata = new Metadata.RequestMetadata(id).toJsonBytesUtf8();
157-
ArrowBuf metadataBuf = null;
158159
try {
159-
// The buffer will be closed in the putNext method, but if an error occurs during execution,
160-
// we need to close it ourselves in the catch block to prevent memory leaks.
161-
metadataBuf = this.allocator.buffer(metadata.length);
160+
ArrowBuf metadataBuf = this.allocator.buffer(metadata.length);
162161
metadataBuf.writeBytes(metadata);
163162

164163
// Send data to the server
@@ -169,12 +168,6 @@ public PutStage putNext() {
169168
LOG.debug("Data sent successfully [id={}], in-flight requests: {}", id, inFlightCount);
170169

171170
return new PutStage(future, inFlightCount);
172-
} catch (Throwable t) {
173-
// Close the metadata buffer on error
174-
if (metadataBuf != null) {
175-
metadataBuf.close();
176-
}
177-
throw t;
178171
} finally {
179172
// Clear the root to prepare for next batch
180173
this.root.clear();

ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,17 @@ public void addClientMiddleware(FlightClientMiddleware.Factory factory) {
113113
* @param root VectorSchemaRoot the root containing data
114114
* @param metadataListener A handler for metadata messages from the server. This will be passed buffers that will be
115115
* freed after {@link StreamListener#onNext(Object)} is called!
116+
* @param maxRequestsInFlight the max in-flight requests in the stream
116117
* @param options RPC-layer hints for this call.
117118
* @return ClientStreamListener an interface to control uploading data
118119
*/
119120
public ClientStreamListener startPut(
120-
FlightDescriptor descriptor, VectorSchemaRoot root, PutListener metadataListener, CallOption... options) {
121-
return startPut(descriptor, root, new MapDictionaryProvider(), metadataListener, options);
121+
FlightDescriptor descriptor,
122+
VectorSchemaRoot root,
123+
PutListener metadataListener,
124+
long maxRequestsInFlight,
125+
CallOption... options) {
126+
return startPut(descriptor, root, new MapDictionaryProvider(), metadataListener, maxRequestsInFlight, options);
122127
}
123128

124129
/**
@@ -128,6 +133,7 @@ public ClientStreamListener startPut(
128133
* @param root VectorSchemaRoot the root containing data
129134
* @param provider A dictionary provider for the root.
130135
* @param metadataListener A handler for metadata messages from the server.
136+
* @param maxRequestsInFlight the max in-flight requests in the stream
131137
* @param options RPC-layer hints for this call.
132138
* @return ClientStreamListener an interface to control uploading data.
133139
* {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will already have been called.
@@ -137,10 +143,11 @@ public ClientStreamListener startPut(
137143
VectorSchemaRoot root,
138144
DictionaryProvider provider,
139145
PutListener metadataListener,
146+
long maxRequestsInFlight,
140147
CallOption... options) {
141148
Preconditions.checkNotNull(root, "root must not be null");
142149
Preconditions.checkNotNull(provider, "provider must not be null");
143-
ClientStreamListener writer = startPut(descriptor, metadataListener, options);
150+
ClientStreamListener writer = startPut(descriptor, metadataListener, maxRequestsInFlight, options);
144151
writer.start(root, provider);
145152
return writer;
146153
}
@@ -150,18 +157,22 @@ public ClientStreamListener startPut(
150157
*
151158
* @param descriptor FlightDescriptor the descriptor for the data
152159
* @param metadataListener A handler for metadata messages from the server.
160+
* @param maxRequestsInFlight the max in-flight requests in the stream
153161
* @param options RPC-layer hints for this call.
154162
* @return ClientStreamListener an interface to control uploading data.
155163
* {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will NOT already have been called.
156164
*/
157165
public ClientStreamListener startPut(
158-
FlightDescriptor descriptor, PutListener metadataListener, CallOption... options) {
166+
FlightDescriptor descriptor,
167+
PutListener metadataListener,
168+
long maxRequestsInFlight,
169+
CallOption... options) {
159170
Preconditions.checkNotNull(descriptor, "descriptor must not be null");
160171
Preconditions.checkNotNull(metadataListener, "metadataListener must not be null");
161172

162173
try {
163174
ClientCall<ArrowMessage, Flight.PutResult> call = asyncStubNewCall(this.doPutDescriptor, options);
164-
OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler();
175+
OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler((int) maxRequestsInFlight);
165176
SetStreamObserver resultObserver =
166177
new SetStreamObserver(this.allocator, metadataListener, onStreamReadyHandler);
167178
ClientCallStreamObserver<ArrowMessage> observer =
@@ -184,11 +195,17 @@ public DictionaryProvider newDefaultDictionaryProvider() {
184195
}
185196

186197
private static class OnStreamReadyHandler implements Runnable {
187-
private final Semaphore semaphore = new Semaphore(0);
198+
private final int maxRequestsInFlight;
199+
private final Semaphore semaphore;
200+
201+
OnStreamReadyHandler(int maxRequestsInFlight) {
202+
this.maxRequestsInFlight = maxRequestsInFlight;
203+
this.semaphore = new Semaphore(maxRequestsInFlight);
204+
}
188205

189206
@Override
190207
public void run() {
191-
this.semaphore.release();
208+
this.semaphore.release(this.maxRequestsInFlight);
192209
}
193210

194211
/**
@@ -338,7 +355,10 @@ protected void waitUntilStreamReady() {
338355
// If the stream is not ready, wait for a short time to avoid busy waiting
339356
// This helps reduce CPU usage while still being responsive
340357
try {
341-
this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS);
358+
if (this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS)) {
359+
// Allow some in-flight requests to be sent
360+
break;
361+
}
342362
} catch (InterruptedException e) {
343363
Thread.currentThread().interrupt();
344364
throw new RuntimeException("Interrupted while waiting for stream to be ready", e);

ingester-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.greptime</groupId>
2323
<artifactId>greptimedb-ingester</artifactId>
24-
<version>0.12.2</version>
24+
<version>0.14.0</version>
2525
</parent>
2626

2727
<artifactId>ingester-common</artifactId>

ingester-example/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.greptime</groupId>
2323
<artifactId>greptimedb-ingester</artifactId>
24-
<version>0.12.2</version>
24+
<version>0.14.0</version>
2525
</parent>
2626

2727
<artifactId>ingester-example</artifactId>

ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,25 @@ public static void main(String[] args) throws Exception {
8282

8383
Config cfg = Config.newBuilder()
8484
.allocatorInitReservation(0)
85-
.allocatorMaxAllocation(1024 * 1024 * 1024)
86-
.timeoutMsPerMessage(10000)
85+
.allocatorMaxAllocation(1024 * 1024 * 1024L)
86+
.timeoutMsPerMessage(30000)
8787
.maxRequestsInFlight(8)
8888
.build();
8989
Context ctx = Context.newDefault().withCompression(Compression.None);
9090

91+
// Bulk write api cannot auto create table
92+
Table toCreate = Table.from(schema);
93+
toCreate.addRow(generateOneRow(100000));
94+
toCreate.complete();
95+
greptimeDB.write(toCreate).get();
96+
9197
try (BulkStreamWriter bulkStreamWriter = greptimeDB.bulkStreamWriter(schema, cfg, ctx)) {
9298

93-
// Write 100 times, each time write 100000 rows
99+
// Write 100 times, each time write 10000 rows
94100
for (int i = 0; i < 100; i++) {
95101
long start = System.currentTimeMillis();
96-
Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot();
97-
for (int j = 0; j < 100000; j++) {
102+
Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot(1024);
103+
for (int j = 0; j < 10000; j++) {
98104
// with 100000 cardinality
99105
Object[] row = generateOneRow(100000);
100106
table.addRow(row);
@@ -150,7 +156,7 @@ private static Object[] generateOneRow(int cardinality) {
150156
System.currentTimeMillis(), // ts
151157
random.nextInt(127), // field_int8
152158
random.nextInt(32767), // field_int16
153-
random.nextInt(), // field_int32
159+
null, // field_int32
154160
random.nextLong(), // field_int64
155161
random.nextInt(255), // field_uint8
156162
random.nextInt(65535), // field_uint16

ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,50 +46,55 @@ public class BulkWriteBenchmark {
4646
public static void main(String[] args) throws Exception {
4747
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
4848
String dbName = SystemPropertyUtil.get("db_name", "public");
49-
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
50-
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024);
49+
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true);
50+
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
5151
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
5252
LOG.info("Using zstd compression: {}", zstdCompression);
5353
LOG.info("Batch size: {}", batchSize);
5454

5555
GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
56-
TableDataProvider tableDataProvider =
57-
ServiceLoader.load(TableDataProvider.class).first();
58-
tableDataProvider.init();
59-
TableSchema tableSchema = tableDataProvider.tableSchema();
6056

6157
BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
6258
.allocatorInitReservation(0)
6359
.allocatorMaxAllocation(4 * 1024 * 1024 * 1024L)
6460
.timeoutMsPerMessage(60000)
65-
.maxRequestsInFlight(32)
61+
.maxRequestsInFlight(8)
6662
.build();
6763
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
6864
Context ctx = Context.newDefault().withCompression(compression);
6965

66+
TableDataProvider tableDataProvider =
67+
ServiceLoader.load(TableDataProvider.class).first();
68+
LOG.info("Table data provider: {}", tableDataProvider.getClass().getName());
69+
tableDataProvider.init();
70+
TableSchema tableSchema = tableDataProvider.tableSchema();
71+
7072
LOG.info("Start writing data");
7173
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) {
7274
Iterator<Object[]> rows = tableDataProvider.rows();
7375

7476
long start = System.nanoTime();
7577
for (; ; ) {
76-
Table.TableBufferRoot table = writer.tableBufferRoot();
78+
Table.TableBufferRoot table = writer.tableBufferRoot(1024);
7779
for (int i = 0; i < batchSize; i++) {
7880
if (!rows.hasNext()) {
7981
break;
8082
}
8183
table.addRow(rows.next());
8284
}
85+
LOG.info("Table bytes used: {}", table.bytesUsed());
8386
// Complete the table; adding rows is no longer permitted.
8487
table.complete();
8588

8689
// Write the table data to the server
8790
CompletableFuture<Integer> future = writer.writeNext();
91+
long fStart = System.nanoTime();
8892
future.whenComplete((r, t) -> {
93+
long costMs = (System.nanoTime() - fStart) / 1000000;
8994
if (t != null) {
90-
LOG.error("Error writing data", t);
95+
LOG.error("Error writing data, time cost: {}ms", costMs, t);
9196
} else {
92-
LOG.info("Wrote rows: {}", r);
97+
LOG.info("Wrote rows: {}, time cost: {}ms", r, costMs);
9398
}
9499
});
95100

@@ -101,6 +106,8 @@ public static void main(String[] args) throws Exception {
101106
writer.completed();
102107

103108
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
109+
} finally {
110+
tableDataProvider.close();
104111
}
105112

106113
greptimeDB.shutdownGracefully();

0 commit comments

Comments
 (0)