From 873f771bc19f5a205a309e88139b220f52277488 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Fri, 28 Mar 2025 19:11:38 +0800 Subject: [PATCH] [client] Introduce batchDeliveryTimeout in client to expire batches that have been stuck for a long time --- .../fluss/client/write/RecordAccumulator.java | 38 +++++++- .../alibaba/fluss/client/write/Sender.java | 90 ++++++++++++++++--- .../fluss/client/write/WriteBatch.java | 12 +++ .../fluss/client/write/WriterClient.java | 34 ++++++- .../client/write/RecordAccumulatorTest.java | 58 ++++++++++-- .../fluss/client/write/SenderTest.java | 82 +++++++++++++++-- .../alibaba/fluss/config/ConfigOptions.java | 17 ++++ website/docs/engine-flink/options.md | 33 +++---- 8 files changed, 317 insertions(+), 47 deletions(-) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java index 4321a35a0e..ceda43efc3 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java @@ -108,15 +108,21 @@ public final class RecordAccumulator { private final IdempotenceManager idempotenceManager; private final Clock clock; + /** + * An upper bound on the time to report success or failure on record delivery when append into + * WriteBatch. + */ + private final long batchDeliveryTimeoutMs; + // TODO add retryBackoffMs to retry the produce request upon receiving an error. - // TODO add deliveryTimeoutMs to report success or failure on record delivery. - // TODO add nextBatchExpiryTimeMs + // TODO add nextBatchExpiryTimeMs. Trace by https://github.com/alibaba/fluss/pull/300 RecordAccumulator( Configuration conf, IdempotenceManager idempotenceManager, WriterMetricGroup writerMetricGroup, - Clock clock) { + Clock clock, + long batchDeliveryTimeoutMs) { this.closed = false; this.flushesInProgress = new AtomicInteger(0); this.appendsInProgress = new AtomicInteger(0); @@ -136,6 +142,7 @@ public final class RecordAccumulator { this.nodesDrainIndex = new HashMap<>(); this.idempotenceManager = idempotenceManager; this.clock = clock; + this.batchDeliveryTimeoutMs = batchDeliveryTimeoutMs; registerMetrics(writerMetricGroup); } @@ -294,6 +301,31 @@ public void reEnqueue(WriteBatch batch) { } } + public List expiredBatches(long now) { + List expiredBatches = new ArrayList<>(); + for (BucketAndWriteBatches bucketAndWriteBatch : writeBatches.values()) { + for (Deque deque : bucketAndWriteBatch.batches.values()) { + // expire the batches in the order of sending. + synchronized (deque) { + while (!deque.isEmpty()) { + WriteBatch batch = deque.getFirst(); + if (batch.hasReachedDeliveryTimeout(batchDeliveryTimeoutMs, now)) { + deque.poll(); + expiredBatches.add(batch); + } else { + break; + } + } + } + } + } + return expiredBatches; + } + + public long getDeliveryTimeoutMs() { + return batchDeliveryTimeoutMs; + } + /** Get the deque for the given table-bucket, creating it if necessary. */ private Deque getOrCreateDeque( TableBucket tableBucket, PhysicalTablePath physicalTablePath) { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java index 545d49e398..8ab144377e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.exception.InvalidMetadataException; import com.alibaba.fluss.exception.OutOfOrderSequenceException; import com.alibaba.fluss.exception.RetriableException; +import com.alibaba.fluss.exception.TimeoutException; import com.alibaba.fluss.exception.UnknownTableOrBucketException; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; @@ -37,6 +38,7 @@ import com.alibaba.fluss.rpc.messages.PutKvResponse; import com.alibaba.fluss.rpc.protocol.ApiError; import com.alibaba.fluss.rpc.protocol.Errors; +import com.alibaba.fluss.utils.clock.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -81,6 +84,9 @@ public class Sender implements Runnable { /** the number of times to retry a failed write batch before giving up. */ private final int retries; + /* the clock instance used for getting the time */ + private final Clock clock; + /** true while the sender thread is still running. */ private volatile boolean running; @@ -111,6 +117,7 @@ public Sender( int retries, MetadataUpdater metadataUpdater, IdempotenceManager idempotenceManager, + Clock clock, WriterMetricGroup writerMetricGroup) { this.accumulator = accumulator; this.maxRequestSize = maxRequestSize; @@ -124,6 +131,7 @@ public Sender( checkNotNull(metadataUpdater.getCoordinatorServer()); this.idempotenceManager = idempotenceManager; + this.clock = clock; this.writerMetricGroup = writerMetricGroup; // TODO add retry logic while send failed. See FLUSS-56364375 @@ -175,7 +183,8 @@ public void runOnce() throws Exception { } // do send. - sendWriteData(); + long currentTimeMillis = clock.milliseconds(); + sendWriteData(currentTimeMillis); } public boolean isRunning() { @@ -188,7 +197,7 @@ private void addToInflightBatches(Map> batches) { } } - private void sendWriteData() throws Exception { + private void sendWriteData(long now) throws Exception { // get the list of buckets with data ready to send. ReadyCheckResult readyCheckResult = accumulator.ready(metadataUpdater.getCluster()); @@ -216,7 +225,9 @@ private void sendWriteData() throws Exception { if (!batches.isEmpty()) { addToInflightBatches(batches); - // TODO add logic for batch expire. + // check and expire batches if they have reached batch delivery timeout. This can avoid + // the client to wait for a long time while the batch is forever retry. + checkAndExpireBatches(now); sendWriteRequests(batches); @@ -239,9 +250,8 @@ private void failBatch(WriteBatch batch, Exception exception, boolean adjustBatc if (idempotenceManager.idempotenceEnabled()) { try { // This call can throw an exception in the rare case that there's an invalid - // state - // transition attempted. Catch these so as not to interfere with the rest of the - // logic. + // state transition attempted. Catch these so as not to interfere with the rest + // of the logic. idempotenceManager.handleFailedBatch(batch, exception, adjustBatchSequences); } catch (Exception e) { LOG.debug( @@ -280,6 +290,64 @@ private void maybeRemoveAndDeallocateBatch(WriteBatch batch) { accumulator.deallocate(batch); } + private void checkAndExpireBatches(long now) { + List expiredInflightBatches = getExpiredInflightBatches(now); + List expiredBatches = accumulator.expiredBatches(now); + expiredBatches.addAll(expiredInflightBatches); + if (!expiredBatches.isEmpty()) { + LOG.trace("Client found expired batches: {}", expiredBatches); + } + + for (WriteBatch batch : expiredBatches) { + failBatch( + batch, + new TimeoutException( + String.format( + "Expiring %s records for %s : %s ms has passed since batch creation.", + batch.recordCount, + batch.tableBucket(), + now - batch.getCreatedMs())), + false); + } + } + + /** Get the in-flight batches that has reached batch delivery timeout. */ + @VisibleForTesting + List getExpiredInflightBatches(long now) { + List expiredBatches = new ArrayList<>(); + for (Iterator>> batchIt = + inFlightBatches.entrySet().iterator(); + batchIt.hasNext(); ) { + Map.Entry> entry = batchIt.next(); + List tbFlightBatches = entry.getValue(); + if (tbFlightBatches != null) { + Iterator iter = tbFlightBatches.iterator(); + while (iter.hasNext()) { + WriteBatch batch = iter.next(); + if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) { + iter.remove(); + if (!batch.isDone()) { + expiredBatches.add(batch); + } else { + throw new IllegalStateException( + batch.tableBucket() + + " batch created at " + + batch.getCreatedMs() + + " gets unexpected final state " + + batch.getFinalState()); + } + } else { + break; + } + } + if (tbFlightBatches.isEmpty()) { + batchIt.remove(); + } + } + } + return expiredBatches; + } + private void maybeRemoveFromInflightBatches(WriteBatch batch) { synchronized (inFlightBatchesLock) { List batches = inFlightBatches.get(batch.tableBucket()); @@ -355,12 +423,11 @@ private void sendProduceLogRequestAndHandleResponse( ProduceLogRequest request, long tableId, Map recordsByBucket) { - long startTime = System.currentTimeMillis(); + long startTime = clock.milliseconds(); gateway.produceLog(request) .whenComplete( (produceLogResponse, e) -> { - writerMetricGroup.setSendLatencyInMs( - System.currentTimeMillis() - startTime); + writerMetricGroup.setSendLatencyInMs(clock.milliseconds() - startTime); if (e != null) { handleWriteRequestException(e, recordsByBucket); } else { @@ -375,12 +442,11 @@ private void sendPutKvRequestAndHandleResponse( PutKvRequest request, long tableId, Map recordsByBucket) { - long startTime = System.currentTimeMillis(); + long startTime = clock.milliseconds(); gateway.putKv(request) .whenComplete( (putKvResponse, e) -> { - writerMetricGroup.setSendLatencyInMs( - System.currentTimeMillis() - startTime); + writerMetricGroup.setSendLatencyInMs(clock.milliseconds() - startTime); if (e != null) { handleWriteRequestException(e, recordsByBucket); } else { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java index 4c3f9776bf..7a6cb0c4c5 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java @@ -140,6 +140,18 @@ public RequestFuture getRequestFuture() { return requestFuture; } + long getCreatedMs() { + return createdMs; + } + + boolean hasReachedDeliveryTimeout(long batchDeliveryTimeoutMs, long now) { + return batchDeliveryTimeoutMs <= now - this.createdMs; + } + + FinalState getFinalState() { + return finalState.get(); + } + public long waitedTimeMs(long nowMs) { return Math.max(0, nowMs - createdMs); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java index eba468f6be..45c2c4a1a5 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java @@ -31,6 +31,7 @@ import com.alibaba.fluss.rpc.gateway.TabletServerGateway; import com.alibaba.fluss.rpc.metrics.ClientMetricGroup; import com.alibaba.fluss.utils.CopyOnWriteMap; +import com.alibaba.fluss.utils.clock.Clock; import com.alibaba.fluss.utils.clock.SystemClock; import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; @@ -97,10 +98,16 @@ public WriterClient( short acks = configureAcks(idempotenceManager.idempotenceEnabled()); int retries = configureRetries(idempotenceManager.idempotenceEnabled()); + long batchDeliveryTimeoutMs = configureBatchDeliveryTimeout(conf); + SystemClock clock = SystemClock.getInstance(); this.accumulator = new RecordAccumulator( - conf, idempotenceManager, writerMetricGroup, SystemClock.getInstance()); - this.sender = newSender(acks, retries); + conf, + idempotenceManager, + writerMetricGroup, + clock, + batchDeliveryTimeoutMs); + this.sender = newSender(acks, retries, clock); this.ioThreadPool = createThreadPool(); ioThreadPool.submit(sender); } catch (Throwable t) { @@ -249,7 +256,27 @@ private int configureRetries(boolean idempotenceEnabled) { return retries; } - private Sender newSender(short acks, int retries) { + private long configureBatchDeliveryTimeout(Configuration conf) { + long batchDeliveryTimeoutMs = + conf.get(ConfigOptions.CLIENT_WRITER_BATCH_DELIVERY_TIMEOUT).toMillis(); + long batchTimeoutMs = conf.get(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT).toMillis(); + long requestTimeoutMs = conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis(); + long batchAndRequestTimeoutMs = Math.min(batchTimeoutMs + requestTimeoutMs, Long.MAX_VALUE); + + if (batchDeliveryTimeoutMs < batchAndRequestTimeoutMs) { + throw new IllegalConfigurationException( + "The value of " + + ConfigOptions.CLIENT_WRITER_BATCH_DELIVERY_TIMEOUT.key() + + " should be greater than or equal to " + + ConfigOptions.CLIENT_REQUEST_TIMEOUT.key() + + " + " + + ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key() + + "."); + } + return batchDeliveryTimeoutMs; + } + + private Sender newSender(short acks, int retries, Clock clock) { return new Sender( accumulator, (int) conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis(), @@ -258,6 +285,7 @@ private Sender newSender(short acks, int retries) { retries, metadataUpdater, idempotenceManager, + clock, writerMetricGroup); } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java index 201a7ae96a..81ca2f2752 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java @@ -176,7 +176,7 @@ void testDrainCompressedBatches() throws Exception { int bucketNum = 10; RecordAccumulator accum = createTestRecordAccumulator( - Integer.MAX_VALUE, batchSize, batchSize, Integer.MAX_VALUE); + Long.MAX_VALUE, Integer.MAX_VALUE, batchSize, batchSize, Integer.MAX_VALUE); List bucketLocations = new ArrayList<>(); for (int b = 0; b < bucketNum; b++) { bucketLocations.add( @@ -288,7 +288,8 @@ void testFull() throws Exception { void testAppendLarge() throws Exception { int batchSize = 100; // set batch timeout as 0 to make sure batch are always ready. - RecordAccumulator accum = createTestRecordAccumulator(0, batchSize, batchSize, 10L * 1024); + RecordAccumulator accum = + createTestRecordAccumulator(Long.MAX_VALUE, 0, batchSize, batchSize, 10L * 1024); // a row with size > 2 * batchSize IndexedRow row1 = @@ -326,6 +327,7 @@ void testAppendWithStickyBucketAssigner() throws Exception { StickyBucketAssigner bucketAssigner = new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH); RecordAccumulator accum = createTestRecordAccumulator( + Long.MAX_VALUE, (int) Duration.ofMinutes(1).toMillis(), batchSize, batchSize, @@ -440,7 +442,8 @@ void testFlush() throws Exception { void testTableWithUnknownLeader() throws Exception { int batchSize = 100; // set batch timeout as 0 to make sure batch are always ready. - RecordAccumulator accum = createTestRecordAccumulator(0, batchSize, batchSize, 10L * 1024); + RecordAccumulator accum = + createTestRecordAccumulator(Long.MAX_VALUE, 0, batchSize, batchSize, 10L * 1024); IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); BucketLocation bucket1 = @@ -484,7 +487,8 @@ public void testNextReadyCheckDelay() throws Exception { IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); // test case assumes that the records do not fill the batch completely RecordAccumulator accum = - createTestRecordAccumulator(batchTimeout, batchSize, 256, 10 * batchSize); + createTestRecordAccumulator( + Long.MAX_VALUE, batchTimeout, batchSize, 256, 10 * batchSize); // Just short of going over the limit so we trigger linger time int appends = expectedNumAppends(row, batchSize); @@ -518,8 +522,38 @@ public void testNextReadyCheckDelay() throws Exception { assertThat(result.nextReadyCheckDelayMs).isLessThanOrEqualTo(batchTimeout); } + @Test + void testExpiredWriteBatches() throws Exception { + long batchDeliveryTimeout = 3200; + int batchSize = 1024; + + IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + RecordAccumulator accum = + createTestRecordAccumulator( + batchDeliveryTimeout, 30, batchSize, 256, 10 * batchSize); + int appends = expectedNumAppends(row, batchSize); + for (int i = 0; i < appends; i++) { + accum.append(createRecord(row), writeCallback, cluster, bucket1.getBucketId(), false); + assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0); + } + + accum.append(createRecord(row), writeCallback, cluster, bucket1.getBucketId(), false); + // Make the batches ready due to batch full. + assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(1); + + // Advance the clock lower than expire time. + clock.advanceTime(batchDeliveryTimeout / 2, TimeUnit.MILLISECONDS); + List expiredBatches = accum.expiredBatches(clock.milliseconds()); + assertThat(expiredBatches.size()).isEqualTo(0); + + // Advance the clock to expire the batch. + clock.advanceTime(batchDeliveryTimeout / 2 + 1, TimeUnit.MILLISECONDS); + expiredBatches = accum.expiredBatches(clock.milliseconds()); + assertThat(expiredBatches.size()).isEqualTo(2); + } + /** - * Creates a indexed WriteRecord as the DATA1_PHYSICAL_TABLE_PATH is registered as a INDEXED + * Creates an indexed WriteRecord as the DATA1_PHYSICAL_TABLE_PATH is registered as a INDEXED * format , see {@link #updateCluster(List)}. */ private WriteRecord createRecord(IndexedRow row) { @@ -607,12 +641,19 @@ private int expectedNumAppends(IndexedRow row, int batchSize) { } private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize) { - return createTestRecordAccumulator(5000, batchSize, 256, totalSize); + return createTestRecordAccumulator(Long.MAX_VALUE, 5000, batchSize, 256, totalSize); } private RecordAccumulator createTestRecordAccumulator( - int batchTimeoutMs, int batchSize, int pageSize, long totalSize) { + long deliveryTimeoutMs, + int batchTimeoutMs, + int batchSize, + int pageSize, + long totalSize) { conf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ofMillis(batchTimeoutMs)); + conf.set( + ConfigOptions.CLIENT_WRITER_BATCH_DELIVERY_TIMEOUT, + Duration.ofMillis(deliveryTimeoutMs)); // TODO client writer buffer maybe removed. conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(totalSize)); conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(pageSize)); @@ -627,7 +668,8 @@ private RecordAccumulator createTestRecordAccumulator( RpcClient.create(conf, TestingClientMetricGroup.newInstance()), TabletServerGateway.class)), TestingWriterMetricGroup.newInstance(), - clock); + clock, + deliveryTimeoutMs); } private long getTestBatchSize(BinaryRow row) { diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java index b9da919544..8e23f36bc6 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java @@ -33,7 +33,7 @@ import com.alibaba.fluss.rpc.messages.ProduceLogResponse; import com.alibaba.fluss.rpc.protocol.Errors; import com.alibaba.fluss.server.tablet.TestTabletServerGateway; -import com.alibaba.fluss.utils.clock.SystemClock; +import com.alibaba.fluss.utils.clock.ManualClock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,6 +47,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; @@ -55,6 +56,7 @@ import static com.alibaba.fluss.server.utils.RpcMessageUtils.getProduceLogData; import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeProduceLogResponse; import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; /** ITCase for {@link Sender}. */ @@ -66,6 +68,7 @@ final class SenderTest { private static final int REQUEST_TIMEOUT = 5000; private static final short ACKS_ALL = -1; private static final int MAX_INFLIGHT_REQUEST_PER_BUCKET = 5; + private final ManualClock clock = new ManualClock(System.currentTimeMillis()); private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 0); private TestingMetadataUpdater metadataUpdater; @@ -107,6 +110,7 @@ void testRetries() throws Exception { MAX_INFLIGHT_REQUEST_PER_BUCKET, metadataUpdater.newRandomTabletServerClient()), maxRetries, + Long.MAX_VALUE, 0); // do a successful retry. CompletableFuture future = new CompletableFuture<>(); @@ -580,7 +584,8 @@ void testCorrectHandlingOfDuplicateSequenceError() throws Exception { void testSequenceNumberIncrement() throws Exception { int maxRetries = 10; IdempotenceManager idempotenceManager = createIdempotenceManager(true); - Sender sender1 = setupWithIdempotenceState(idempotenceManager, maxRetries, 0); + Sender sender1 = + setupWithIdempotenceState(idempotenceManager, maxRetries, Long.MAX_VALUE, 0); sender1.runOnce(); assertThat(idempotenceManager.isWriterIdValid()).isTrue(); assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(0); @@ -596,6 +601,60 @@ void testSequenceNumberIncrement() throws Exception { assertThat(future1.get()).isNull(); } + @Test + void testExpiredWriteBatches() throws Exception { + long batchDeliveryTimeout = 5000; + IdempotenceManager idempotenceManager = createIdempotenceManager(false); + Sender sender = setupWithIdempotenceState(idempotenceManager, batchDeliveryTimeout); + sender.runOnce(); + + // append many records with futures. don't return the request. + List> recordFutures = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + CompletableFuture future = new CompletableFuture<>(); + appendToAccumulator(tb1, row(1, "a"), future::complete); + recordFutures.add(future); + } + sender.runOnce(); // runOnce to send request. + } + + for (int i = 0; i < 20; i++) { + CompletableFuture future = new CompletableFuture<>(); + appendToAccumulator(tb1, row(1, "a"), future::complete); + recordFutures.add(future); + // just append to accumulate, don't seed the request. + } + + // batch shouldn't expire yet as the expiry timeout not reach. + clock.advanceTime(batchDeliveryTimeout / 2, TimeUnit.MILLISECONDS); + List expiredInflightBatches = + sender.getExpiredInflightBatches(clock.milliseconds()); + assertThat(expiredInflightBatches.size()).isEqualTo(0); + List expiredBatches = accumulator.expiredBatches(clock.milliseconds()); + assertThat(expiredBatches.size()).isEqualTo(0); + + // batch can be expired after the expiry timeout. + clock.advanceTime(batchDeliveryTimeout / 2 + 1, TimeUnit.MILLISECONDS); + // To expire batches. Add some batches to make sure batch can be drained to trigger expired + // logic. + for (int i = 0; i < 20; i++) { + CompletableFuture future = new CompletableFuture<>(); + appendToAccumulator(tb1, row(1, "a"), future::complete); + // just append to accumulate, don't seed the request. + } + sender.runOnce(); + retry( + Duration.ofMinutes(1), + () -> { + for (CompletableFuture future : recordFutures) { + assertThat(future.get()) + .isInstanceOf(TimeoutException.class) + .hasMessageContaining("has passed since batch creation."); + } + }); + } + private TestingMetadataUpdater initializeMetadataUpdater() { return new TestingMetadataUpdater( Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO)); @@ -661,19 +720,31 @@ private Sender setupWithIdempotenceState() { } private Sender setupWithIdempotenceState(IdempotenceManager idempotenceManager) { - return setupWithIdempotenceState(idempotenceManager, Integer.MAX_VALUE, 0); + return setupWithIdempotenceState(idempotenceManager, Integer.MAX_VALUE, Long.MAX_VALUE, 0); + } + + private Sender setupWithIdempotenceState( + IdempotenceManager idempotenceManager, long batchDeliveryTimeoutMs) { + return setupWithIdempotenceState( + idempotenceManager, Integer.MAX_VALUE, batchDeliveryTimeoutMs, 0); } private Sender setupWithIdempotenceState( - IdempotenceManager idempotenceManager, int reties, int batchTimeoutMs) { + IdempotenceManager idempotenceManager, + int reties, + long batchDeliveryTimeoutMs, + int batchTimeoutMs) { Configuration conf = new Configuration(); conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(TOTAL_MEMORY_SIZE)); + conf.set( + ConfigOptions.CLIENT_WRITER_BATCH_DELIVERY_TIMEOUT, + Duration.ofMillis(batchDeliveryTimeoutMs)); conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(BATCH_SIZE)); conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(PAGE_SIZE)); conf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ofMillis(batchTimeoutMs)); accumulator = new RecordAccumulator( - conf, idempotenceManager, writerMetricGroup, SystemClock.getInstance()); + conf, idempotenceManager, writerMetricGroup, clock, batchDeliveryTimeoutMs); return new Sender( accumulator, REQUEST_TIMEOUT, @@ -682,6 +753,7 @@ private Sender setupWithIdempotenceState( reties, metadataUpdater, idempotenceManager, + clock, writerMetricGroup); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 4d19b8e6cd..4edcc558e7 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -842,6 +842,23 @@ public class ConfigOptions { + "this timeout is the max time that delayed write try to complete. " + "The default setting is 30 seconds."); + public static final ConfigOption CLIENT_WRITER_BATCH_DELIVERY_TIMEOUT = + key("client.writer.batch-delivery-timeout") + .durationType() + .defaultValue(Duration.ofMinutes(2)) + .withDescription( + "An upper bound on the time to report success or failure after a call to " + + "Sender#send() returns. This limits the total time that a record " + + "will be delayed prior to sending, the time to await ack from the tabletServer" + + "(if expected), and the time allowed either an unrecoverable error is encountered, " + + "the retries have been exhausted, or the record is added to a batch which reached " + + "an earlier delivery expiration deadline. Thw vale of this config should be greater " + + "than or equal to the sum of '" + + CLIENT_REQUEST_TIMEOUT.key() + + "' and '" + + CLIENT_WRITER_BATCH_TIMEOUT.key() + + "'."); + public static final ConfigOption CLIENT_SCANNER_LOG_CHECK_CRC = key("client.scanner.log.check-crc") .booleanType() diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index bfaa1f999a..9da453c4a2 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -133,22 +133,23 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); ## Write Options -| Option | Type | Default | Description | -|-----------------------------------------------------|------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| sink.ignore-delete | Boolean | false | If set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events. | -| sink.bucket-shuffle | Boolean | true | Whether to shuffle by bucket id before write to sink. Shuffling the data with the same bucket id to be processed by the same task can improve the efficiency of client processing and reduce resource consumption. For Log Table, bucket shuffle will only take effect when the 'bucket.key' is defined. For Primary Key table, it is enabled by default. | -| client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. | -| client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers ('client.writer.buffer.memory-size'). | -| client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | -| client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. | -| client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer will block when waiting for segments to become available. | -| client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. | -| client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.
ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.
STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. | -| client.writer.acks | String | all | The number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case.
acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost.
acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee. | -| client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error. | -| client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. | -| client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown | -| client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. | +| Option | Type | Default | Description | +|------------------------------------------------|------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| sink.ignore-delete | Boolean | false | If set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events. | +| sink.bucket-shuffle | Boolean | true | Whether to shuffle by bucket id before write to sink. Shuffling the data with the same bucket id to be processed by the same task can improve the efficiency of client processing and reduce resource consumption. For Log Table, bucket shuffle will only take effect when the 'bucket.key' is defined. For Primary Key table, it is enabled by default. | +| client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. | +| client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers ('client.writer.buffer.memory-size'). | +| client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | +| client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. | +| client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer will block when waiting for segments to become available. | +| client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. | +| client.writer.batch-delivery-timeout | Duration | 2min | An upper bound on the time to report success or failure after a call to Sender#send() returns. This limits the total time that a record will be delayed prior to sending, the time to await ack from the tabletServer(if expected), and the time allowed either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch which reached an earlier delivery expiration deadline. Thw vale of this config should be greater than or equal to the sum of 'client.request-timeout' and 'client.writer.batch-timeout'. | +| client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.
ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.
STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. | +| client.writer.acks | String | all | The number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case.
acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost.
acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee. | +| client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error. | +| client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. | +| client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown | +| client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. | ## Other Options