Skip to content

Commit 364365d

Browse files
committed
chore: add FutureDeadlineExceededException
1 parent b908c83 commit 364365d

File tree

4 files changed

+18
-12
lines changed

4 files changed

+18
-12
lines changed

ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ private static class RootAllocatorHolder {
5959

6060
private static BufferAllocator createRootAllocator() {
6161
// max allocation size in bytes
62-
long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 1024 * 1024 * 1024);
62+
long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 4 * 1024 * 1024 * 1024L);
6363
BufferAllocator rootAllocator = new RootAllocator(new FlightAllocationListener(), allocationLimit);
6464

6565
// Add a shutdown hook to close the root allocator when the JVM exits
6666
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
6767
try {
68+
LOG.info("Closing root allocator: {}", rootAllocator);
6869
AutoCloseables.close(rootAllocator);
69-
} catch (Exception e) {
70-
LOG.error("Failed to close root allocator", e);
70+
} catch (Exception ignored) {
7171
}
7272
}));
7373

ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,11 @@ public void attach(long id, IdentifiableCompletableFuture future) {
313313

314314
if (t != null) {
315315
LOG.error("Put operation failed [id={}]: {}", id, t.getMessage(), t);
316-
// If a put next operation fails, we complete the future with the exception
317-
// and the stream will be terminated immediately to prevent further operations
318-
onError(t);
316+
if (!(t instanceof TimeoutCompletableFuture.FutureDeadlineExceededException)) {
317+
// If a put next operation fails, we complete the future with the exception
318+
// and the stream will be terminated immediately to prevent further operations
319+
onError(t);
320+
}
319321
} else {
320322
LOG.debug("Put operation succeeded [id={}], affected rows: {}", id, r);
321323
}

ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@
5656
import org.apache.arrow.vector.dictionary.DictionaryProvider;
5757
import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
5858
import org.apache.arrow.vector.ipc.message.IpcOption;
59-
import org.slf4j.Logger;
60-
import org.slf4j.LoggerFactory;
6159

6260
/**
6361
* Client for Flight services.
@@ -66,8 +64,6 @@
6664
* with some changes to support bulk write.
6765
*/
6866
public class BulkFlightClient implements AutoCloseable {
69-
private static final Logger LOG = LoggerFactory.getLogger(BulkFlightClient.class);
70-
7167
/** The maximum number of trace events to keep on the gRPC Channel. This value disables channel tracing. */
7268
private static final int MAX_CHANNEL_TRACE_EVENTS = 0;
7369

ingester-common/src/main/java/io/greptime/common/TimeoutCompletableFuture.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,20 @@ public TimeoutCompletableFuture<T> scheduleTimeout() {
5454
if (isCancelled() || isDone()) {
5555
return;
5656
}
57-
completeExceptionally(
58-
new TimeoutException("Operation timed out after " + this.timeout + " " + this.unit));
57+
completeExceptionally(new FutureDeadlineExceededException(
58+
"Future deadline exceeded, timeout: " + this.timeout + " " + this.unit));
5959
},
6060
this.timeout,
6161
this.unit);
6262

6363
return this;
6464
}
65+
66+
public static class FutureDeadlineExceededException extends TimeoutException {
67+
private static final long serialVersionUID = 1L;
68+
69+
public FutureDeadlineExceededException(String message) {
70+
super(message);
71+
}
72+
}
6573
}

0 commit comments

Comments
 (0)