Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ingester-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.greptime</groupId>
<artifactId>greptimedb-ingester</artifactId>
<version>0.12.2</version>
<version>0.14.0</version>
</parent>

<artifactId>ingester-all</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ingester-bulk-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.greptime</groupId>
<artifactId>greptimedb-ingester</artifactId>
<version>0.12.2</version>
<version>0.14.0</version>
</parent>

<artifactId>ingester-bulk-protocol</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,26 @@ public static BulkWriteManager create(
* @param table the name of the target table
* @param schema the Arrow schema defining the structure of the data to be written
* @param timeoutMs the timeout in milliseconds for the write operation
* @param maxRequestsInFlight the max in-flight requests in the stream
* @param options optional RPC-layer hints to configure the underlying Flight client call
* @return a BulkStreamWriter instance that manages the data transfer process
*/
public BulkWriteService intoBulkWriteStream(String table, Schema schema, long timeoutMs, CallOption... options) {
public BulkWriteService intoBulkWriteStream(
String table, Schema schema, long timeoutMs, int maxRequestsInFlight, CallOption... options) {
FlightDescriptor descriptor = FlightDescriptor.path(table);
return new BulkWriteService(this, this.allocator, schema, descriptor, timeoutMs, options);
return new BulkWriteService(this, this.allocator, schema, descriptor, timeoutMs, maxRequestsInFlight, options);
}

VectorSchemaRoot createSchemaRoot(Schema schema) {
return VectorSchemaRoot.create(schema, this.allocator);
}

ClientStreamListener startPut(FlightDescriptor descriptor, PutListener metadataListener, CallOption... options) {
return this.flightClient.startPut(descriptor, metadataListener, options);
ClientStreamListener startPut(
FlightDescriptor descriptor,
PutListener metadataListener,
long maxRequestsInFlight,
CallOption... options) {
return this.flightClient.startPut(descriptor, metadataListener, maxRequestsInFlight, options);
}

DictionaryProvider newDefaultDictionaryProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class BulkWriteService implements AutoCloseable {
* @param schema The Arrow schema defining the data structure
* @param descriptor The FlightDescriptor identifying the data stream
* @param timeoutMs The timeout in milliseconds for operations
* @param maxRequestsInFlight the max in-flight requests in the stream
* @param options Additional call options for the Flight client
*/
public BulkWriteService(
Expand All @@ -78,12 +79,13 @@ public BulkWriteService(
Schema schema,
FlightDescriptor descriptor,
long timeoutMs,
int maxRequestsInFlight,
CallOption... options) {
this.manager = manager;
this.allocator = allocator;
this.root = manager.createSchemaRoot(schema);
this.metadataListener = new AsyncPutListener();
this.listener = manager.startPut(descriptor, this.metadataListener, options);
this.listener = manager.startPut(descriptor, this.metadataListener, maxRequestsInFlight, options);
this.timeoutMs = timeoutMs;
}

Expand Down Expand Up @@ -154,11 +156,8 @@ public PutStage putNext() {

// Prepare metadata buffer
byte[] metadata = new Metadata.RequestMetadata(id).toJsonBytesUtf8();
ArrowBuf metadataBuf = null;
try {
// The buffer will be closed in the putNext method, but if an error occurs during execution,
// we need to close it ourselves in the catch block to prevent memory leaks.
metadataBuf = this.allocator.buffer(metadata.length);
ArrowBuf metadataBuf = this.allocator.buffer(metadata.length);
metadataBuf.writeBytes(metadata);

// Send data to the server
Expand All @@ -169,12 +168,6 @@ public PutStage putNext() {
LOG.debug("Data sent successfully [id={}], in-flight requests: {}", id, inFlightCount);

return new PutStage(future, inFlightCount);
} catch (Throwable t) {
// Close the metadata buffer on error
if (metadataBuf != null) {
metadataBuf.close();
}
throw t;
} finally {
// Clear the root to prepare for next batch
this.root.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,17 @@ public void addClientMiddleware(FlightClientMiddleware.Factory factory) {
* @param root VectorSchemaRoot the root containing data
* @param metadataListener A handler for metadata messages from the server. This will be passed buffers that will be
* freed after {@link StreamListener#onNext(Object)} is called!
* @param maxRequestsInFlight the max in-flight requests in the stream
* @param options RPC-layer hints for this call.
* @return ClientStreamListener an interface to control uploading data
*/
public ClientStreamListener startPut(
FlightDescriptor descriptor, VectorSchemaRoot root, PutListener metadataListener, CallOption... options) {
return startPut(descriptor, root, new MapDictionaryProvider(), metadataListener, options);
FlightDescriptor descriptor,
VectorSchemaRoot root,
PutListener metadataListener,
long maxRequestsInFlight,
CallOption... options) {
return startPut(descriptor, root, new MapDictionaryProvider(), metadataListener, maxRequestsInFlight, options);
}

/**
Expand All @@ -128,6 +133,7 @@ public ClientStreamListener startPut(
* @param root VectorSchemaRoot the root containing data
* @param provider A dictionary provider for the root.
* @param metadataListener A handler for metadata messages from the server.
* @param maxRequestsInFlight the max in-flight requests in the stream
* @param options RPC-layer hints for this call.
* @return ClientStreamListener an interface to control uploading data.
* {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will already have been called.
Expand All @@ -137,10 +143,11 @@ public ClientStreamListener startPut(
VectorSchemaRoot root,
DictionaryProvider provider,
PutListener metadataListener,
long maxRequestsInFlight,
CallOption... options) {
Preconditions.checkNotNull(root, "root must not be null");
Preconditions.checkNotNull(provider, "provider must not be null");
ClientStreamListener writer = startPut(descriptor, metadataListener, options);
ClientStreamListener writer = startPut(descriptor, metadataListener, maxRequestsInFlight, options);
writer.start(root, provider);
return writer;
}
Expand All @@ -150,18 +157,22 @@ public ClientStreamListener startPut(
*
* @param descriptor FlightDescriptor the descriptor for the data
* @param metadataListener A handler for metadata messages from the server.
* @param maxRequestsInFlight the max in-flight requests in the stream
* @param options RPC-layer hints for this call.
* @return ClientStreamListener an interface to control uploading data.
* {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will NOT already have been called.
*/
public ClientStreamListener startPut(
FlightDescriptor descriptor, PutListener metadataListener, CallOption... options) {
FlightDescriptor descriptor,
PutListener metadataListener,
long maxRequestsInFlight,
CallOption... options) {
Preconditions.checkNotNull(descriptor, "descriptor must not be null");
Preconditions.checkNotNull(metadataListener, "metadataListener must not be null");

try {
ClientCall<ArrowMessage, Flight.PutResult> call = asyncStubNewCall(this.doPutDescriptor, options);
OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler();
OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler((int) maxRequestsInFlight);
SetStreamObserver resultObserver =
new SetStreamObserver(this.allocator, metadataListener, onStreamReadyHandler);
ClientCallStreamObserver<ArrowMessage> observer =
Expand All @@ -184,11 +195,17 @@ public DictionaryProvider newDefaultDictionaryProvider() {
}

private static class OnStreamReadyHandler implements Runnable {
private final Semaphore semaphore = new Semaphore(0);
private final int maxRequestsInFlight;
private final Semaphore semaphore;

OnStreamReadyHandler(int maxRequestsInFlight) {
this.maxRequestsInFlight = maxRequestsInFlight;
this.semaphore = new Semaphore(maxRequestsInFlight);
}

@Override
public void run() {
this.semaphore.release();
this.semaphore.release(this.maxRequestsInFlight);
}

/**
Expand Down Expand Up @@ -338,7 +355,10 @@ protected void waitUntilStreamReady() {
// If the stream is not ready, wait for a short time to avoid busy waiting
// This helps reduce CPU usage while still being responsive
try {
this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS);
if (this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS)) {
// Allow some in-flight requests to be sent
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for stream to be ready", e);
Expand Down
2 changes: 1 addition & 1 deletion ingester-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.greptime</groupId>
<artifactId>greptimedb-ingester</artifactId>
<version>0.12.2</version>
<version>0.14.0</version>
</parent>

<artifactId>ingester-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ingester-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.greptime</groupId>
<artifactId>greptimedb-ingester</artifactId>
<version>0.12.2</version>
<version>0.14.0</version>
</parent>

<artifactId>ingester-example</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,25 @@ public static void main(String[] args) throws Exception {

Config cfg = Config.newBuilder()
.allocatorInitReservation(0)
.allocatorMaxAllocation(1024 * 1024 * 1024)
.timeoutMsPerMessage(10000)
.allocatorMaxAllocation(1024 * 1024 * 1024L)
.timeoutMsPerMessage(30000)
.maxRequestsInFlight(8)
.build();
Context ctx = Context.newDefault().withCompression(Compression.None);

// Bulk write api cannot auto create table
Table toCreate = Table.from(schema);
toCreate.addRow(generateOneRow(100000));
toCreate.complete();
greptimeDB.write(toCreate).get();

try (BulkStreamWriter bulkStreamWriter = greptimeDB.bulkStreamWriter(schema, cfg, ctx)) {

// Write 100 times, each time write 100000 rows
// Write 100 times, each time write 10000 rows
for (int i = 0; i < 100; i++) {
long start = System.currentTimeMillis();
Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot();
for (int j = 0; j < 100000; j++) {
Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot(1024);
for (int j = 0; j < 10000; j++) {
// with 100000 cardinality
Object[] row = generateOneRow(100000);
table.addRow(row);
Expand Down Expand Up @@ -150,7 +156,7 @@ private static Object[] generateOneRow(int cardinality) {
System.currentTimeMillis(), // ts
random.nextInt(127), // field_int8
random.nextInt(32767), // field_int16
random.nextInt(), // field_int32
null, // field_int32
random.nextLong(), // field_int64
random.nextInt(255), // field_uint8
random.nextInt(65535), // field_uint16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,50 +46,55 @@ public class BulkWriteBenchmark {
public static void main(String[] args) throws Exception {
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
String dbName = SystemPropertyUtil.get("db_name", "public");
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024);
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true);
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
LOG.info("Using zstd compression: {}", zstdCompression);
LOG.info("Batch size: {}", batchSize);

GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
TableDataProvider tableDataProvider =
ServiceLoader.load(TableDataProvider.class).first();
tableDataProvider.init();
TableSchema tableSchema = tableDataProvider.tableSchema();

BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
.allocatorInitReservation(0)
.allocatorMaxAllocation(4 * 1024 * 1024 * 1024L)
.timeoutMsPerMessage(60000)
.maxRequestsInFlight(32)
.maxRequestsInFlight(8)
.build();
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
Context ctx = Context.newDefault().withCompression(compression);

TableDataProvider tableDataProvider =
ServiceLoader.load(TableDataProvider.class).first();
LOG.info("Table data provider: {}", tableDataProvider.getClass().getName());
tableDataProvider.init();
TableSchema tableSchema = tableDataProvider.tableSchema();

LOG.info("Start writing data");
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) {
Iterator<Object[]> rows = tableDataProvider.rows();

long start = System.nanoTime();
for (; ; ) {
Table.TableBufferRoot table = writer.tableBufferRoot();
Table.TableBufferRoot table = writer.tableBufferRoot(1024);
for (int i = 0; i < batchSize; i++) {
if (!rows.hasNext()) {
break;
}
table.addRow(rows.next());
}
LOG.info("Table bytes used: {}", table.bytesUsed());
// Complete the table; adding rows is no longer permitted.
table.complete();

// Write the table data to the server
CompletableFuture<Integer> future = writer.writeNext();
long fStart = System.nanoTime();
future.whenComplete((r, t) -> {
long costMs = (System.nanoTime() - fStart) / 1000000;
if (t != null) {
LOG.error("Error writing data", t);
LOG.error("Error writing data, time cost: {}ms", costMs, t);
} else {
LOG.info("Wrote rows: {}", r);
LOG.info("Wrote rows: {}, time cost: {}ms", r, costMs);
}
});

Expand All @@ -101,6 +106,8 @@ public static void main(String[] args) throws Exception {
writer.completed();

LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
} finally {
tableDataProvider.close();
}

greptimeDB.shutdownGracefully();
Expand Down
Loading