Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ private static class RootAllocatorHolder {

private static BufferAllocator createRootAllocator() {
// max allocation size in bytes
long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 1024 * 1024 * 1024);
long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 4 * 1024 * 1024 * 1024L);
BufferAllocator rootAllocator = new RootAllocator(new FlightAllocationListener(), allocationLimit);

// Add a shutdown hook to close the root allocator when the JVM exits
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
LOG.info("Closing root allocator: {}", rootAllocator);
AutoCloseables.close(rootAllocator);
} catch (Exception e) {
LOG.error("Failed to close root allocator", e);
} catch (Exception ignored) {
}
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public boolean isStreamReady() {
* @return A PutStage object containing the future and the number of in-flight requests
*/
public PutStage putNext() {
long id = this.idGenerator.incrementAndGet();
long id = nextId();
long totalRowCount = this.root.getRowCount();

LOG.debug("Starting putNext operation [id={}], total row count: {}", id, totalRowCount);
Expand All @@ -159,7 +159,7 @@ public PutStage putNext() {
// The buffer will be closed in the putNext method, but if an error occurs during execution,
// we need to close it ourselves in the catch block to prevent memory leaks.
metadataBuf = this.allocator.buffer(metadata.length);
metadataBuf.setBytes(0, metadata);
metadataBuf.writeBytes(metadata);

// Send data to the server
LOG.debug("Sending data to server [id={}]", id);
Expand Down Expand Up @@ -207,6 +207,14 @@ public void close() throws Exception {
AutoCloseables.close(this.root, this.manager);
}

private long nextId() {
long id;
do {
id = this.idGenerator.incrementAndGet();
} while (id == 0); // Skip ID 0 as it's reserved for special cases
return id;
}

/**
* Represents the state of a put operation, containing both the future for tracking
* completion and the current count of in-flight requests.
Expand Down Expand Up @@ -277,7 +285,7 @@ public long getId() {
* Listener for handling asynchronous responses from the server during bulk write operations.
* Manages the lifecycle of in-flight requests and their associated futures.
*/
class AsyncPutListener implements PutListener {
static class AsyncPutListener implements PutListener {
private final ConcurrentMap<Long, IdentifiableCompletableFuture> futuresInFlight;
private final CompletableFuture<Void> completed;

Expand Down Expand Up @@ -313,9 +321,11 @@ public void attach(long id, IdentifiableCompletableFuture future) {

if (t != null) {
LOG.error("Put operation failed [id={}]: {}", id, t.getMessage(), t);
// If a put next operation fails, we complete the future with the exception
// and the stream will be terminated immediately to prevent further operations
onError(t);
if (!(t instanceof TimeoutCompletableFuture.FutureDeadlineExceededException)) {
// If a put next operation fails, we complete the future with the exception
// and the stream will be terminated immediately to prevent further operations
onError(t);
}
} else {
LOG.debug("Put operation succeeded [id={}], affected rows: {}", id, r);
}
Expand All @@ -338,26 +348,24 @@ public int numInFlight() {

@Override
public void onNext(PutResult val) {
try (ArrowBuf metadata = val.getApplicationMetadata()) {
if (metadata == null) {
LOG.warn("Received PutResult with null metadata");
return;
}
String metadataString =
ByteString.copyFrom(metadata.nioBuffer()).toStringUtf8();
Metadata.ResponseMetadata responseMetadata = Metadata.ResponseMetadata.fromJson(metadataString);
ArrowBuf metadata = val.getApplicationMetadata();
if (metadata == null) {
LOG.warn("Received PutResult with null metadata");
return;
}
String metadataString = ByteString.copyFrom(metadata.nioBuffer()).toStringUtf8();
Metadata.ResponseMetadata responseMetadata = Metadata.ResponseMetadata.fromJson(metadataString);

long requestId = responseMetadata.getRequestId();
int affectedRows = responseMetadata.getAffectedRows();
long requestId = responseMetadata.getRequestId();
int affectedRows = responseMetadata.getAffectedRows();

LOG.debug("Received response [id={}], affected rows: {}", requestId, affectedRows);
LOG.debug("Received response [id={}], affected rows: {}", requestId, affectedRows);

IdentifiableCompletableFuture future = this.futuresInFlight.get(requestId);
if (future != null) {
future.complete(affectedRows);
} else {
LOG.warn("A timeout response [id={}] finally received", requestId);
}
IdentifiableCompletableFuture future = this.futuresInFlight.get(requestId);
if (future != null) {
future.complete(affectedRows);
} else if (requestId != 0) { // 0 is reserved for special cases
LOG.warn("A timeout response [id={}] finally received", requestId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.apache.arrow.flight;

import com.codahale.metrics.Timer;
import io.greptime.ArrowCompressionType;
import io.greptime.common.util.MetricsUtil;
import io.greptime.rpc.TlsOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand Down Expand Up @@ -54,8 +56,6 @@
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Client for Flight services.
Expand All @@ -64,8 +64,6 @@
* with some changes to support bulk write.
*/
public class BulkFlightClient implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BulkFlightClient.class);

/** The maximum number of trace events to keep on the gRPC Channel. This value disables channel tracing. */
private static final int MAX_CHANNEL_TRACE_EVENTS = 0;

Expand All @@ -86,7 +84,7 @@ public class BulkFlightClient implements AutoCloseable {
ManagedChannel channel,
List<FlightClientMiddleware.Factory> middleware,
ArrowCompressionType compressionType) {
this.allocator = incomingAllocator.newChildAllocator("flight-client", 0, Long.MAX_VALUE);
this.allocator = incomingAllocator.newChildAllocator("bulk-flight-client", 0, Long.MAX_VALUE);
this.channel = channel;
this.middleware = middleware;
this.compressionType = compressionType;
Expand Down Expand Up @@ -326,22 +324,28 @@ public void start(VectorSchemaRoot root, DictionaryProvider dictionaries, IpcOpt

@Override
protected void waitUntilStreamReady() {
// Check isCancelled as well to avoid inadvertently blocking forever
// (so long as PutListener properly implements it)
while (!super.responseObserver.isReady() && !this.isCancelled.getAsBoolean()) {
if (this.isCompletedExceptionally.getAsBoolean()) {
// Will throw the error immediately
getResult();
}
Timer.Context timerCtx = MetricsUtil.timer("bulk_flight_client.wait_until_stream_ready")
.time();
try {
// Check isCancelled as well to avoid inadvertently blocking forever
// (so long as PutListener properly implements it)
while (!super.responseObserver.isReady() && !this.isCancelled.getAsBoolean()) {
if (this.isCompletedExceptionally.getAsBoolean()) {
// Will throw the error immediately
getResult();
}

// If the stream is not ready, wait for a short time to avoid busy waiting
// This helps reduce CPU usage while still being responsive
try {
this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for stream to be ready", e);
// If the stream is not ready, wait for a short time to avoid busy waiting
// This helps reduce CPU usage while still being responsive
try {
this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for stream to be ready", e);
}
}
} finally {
timerCtx.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,20 @@ public TimeoutCompletableFuture<T> scheduleTimeout() {
if (isCancelled() || isDone()) {
return;
}
completeExceptionally(
new TimeoutException("Operation timed out after " + this.timeout + " " + this.unit));
completeExceptionally(new FutureDeadlineExceededException(
"Future deadline exceeded, timeout: " + this.timeout + " " + this.unit));
},
this.timeout,
this.unit);

return this;
}

public static class FutureDeadlineExceededException extends TimeoutException {
private static final long serialVersionUID = 1L;

public FutureDeadlineExceededException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception {
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
String dbName = SystemPropertyUtil.get("db_name", "public");
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 100 * 1024);
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024);
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
LOG.info("Using zstd compression: {}", zstdCompression);
LOG.info("Batch size: {}", batchSize);
Expand All @@ -60,9 +60,9 @@ public static void main(String[] args) throws Exception {

BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
.allocatorInitReservation(0)
.allocatorMaxAllocation(4 * 1024 * 1024 * 1024)
.timeoutMsPerMessage(10000)
.maxRequestsInFlight(8)
.allocatorMaxAllocation(4 * 1024 * 1024 * 1024L)
.timeoutMsPerMessage(60000)
.maxRequestsInFlight(32)
.build();
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
Context ctx = Context.newDefault().withCompression(compression);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class RandomTableDataProvider implements TableDataProvider {
.build();
// Total number of rows to generate, configurable via system property
// Default is 1 billion rows if not specified
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000_000L);
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000L);
}

@Override
Expand Down Expand Up @@ -97,8 +97,8 @@ public Object[] next() {
String traceState = "trace_state_" + random.nextInt(1000);
String podName = "pod_" + random.nextInt(1000);
timerContext.stop();
MetricsUtil.counter("random_table_data_provider.log_message_length")
.inc(logMessage.length());
MetricsUtil.histogram("random_table_data_provider.log_message_length")
.update(logMessage.length());

return new Object[] {
logTs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static void main(String[] args) throws Exception {
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
String dbName = SystemPropertyUtil.get("db_name", "public");
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 100 * 1024);
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024);
int maxPointsPerSecond = SystemPropertyUtil.getInt("max_points_per_second", Integer.MAX_VALUE);
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
LOG.info("Using zstd compression: {}", zstdCompression);
Expand Down
2 changes: 1 addition & 1 deletion ingester-example/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

</Appenders>
<Loggers>
<Root level="debug">
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
Expand Down
2 changes: 1 addition & 1 deletion ingester-protocol/src/main/java/io/greptime/BulkWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface BulkWrite {
/**
* The default timeout in milliseconds for each message.
*/
long DEFAULT_TIMEOUT_MS_PER_MESSAGE = 10000;
long DEFAULT_TIMEOUT_MS_PER_MESSAGE = 60000;

/**
* The default allocator init reservation bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public CompletableFuture<Integer> writeNext() throws Exception {
InnerMetricHelper.putTime().update(clock.duration(startCall), TimeUnit.MILLISECONDS);
});

LOG.debug("Write request sent successfully, in-flight requests: {}", stage.numInFlight());
LOG.info("Write request sent successfully, in-flight requests: {}", stage.numInFlight());

return future;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public Table addRow(Object... values) {
for (int i = 0; i < values.length; i++) {
FieldVector vector = this.root.getVector(i);
if (vector.getValueCapacity() < rowCount + 1) {
vector.allocateNew();
vector.reAlloc();
}
ArrowHelper.addValue(
vector, rowCount, this.dataTypes.get(i), this.dataTypeExtensions.get(i), values[i]);
Expand Down