2424import com .alibaba .fluss .exception .InvalidMetadataException ;
2525import com .alibaba .fluss .exception .OutOfOrderSequenceException ;
2626import com .alibaba .fluss .exception .RetriableException ;
27+ import com .alibaba .fluss .exception .TimeoutException ;
2728import com .alibaba .fluss .exception .UnknownTableOrBucketException ;
2829import com .alibaba .fluss .metadata .PhysicalTablePath ;
2930import com .alibaba .fluss .metadata .TableBucket ;
3738import com .alibaba .fluss .rpc .messages .PutKvResponse ;
3839import com .alibaba .fluss .rpc .protocol .ApiError ;
3940import com .alibaba .fluss .rpc .protocol .Errors ;
41+ import com .alibaba .fluss .utils .clock .Clock ;
4042
4143import org .slf4j .Logger ;
4244import org .slf4j .LoggerFactory ;
4648import java .util .ArrayList ;
4749import java .util .HashMap ;
4850import java .util .HashSet ;
51+ import java .util .Iterator ;
4952import java .util .List ;
5053import java .util .Map ;
5154import java .util .Set ;
@@ -81,6 +84,9 @@ public class Sender implements Runnable {
8184 /** the number of times to retry a failed write batch before giving up. */
8285 private final int retries ;
8386
87+ /* the clock instance used for getting the time */
88+ private final Clock clock ;
89+
8490 /** true while the sender thread is still running. */
8591 private volatile boolean running ;
8692
@@ -111,6 +117,7 @@ public Sender(
111117 int retries ,
112118 MetadataUpdater metadataUpdater ,
113119 IdempotenceManager idempotenceManager ,
120+ Clock clock ,
114121 WriterMetricGroup writerMetricGroup ) {
115122 this .accumulator = accumulator ;
116123 this .maxRequestSize = maxRequestSize ;
@@ -124,6 +131,7 @@ public Sender(
124131 checkNotNull (metadataUpdater .getCoordinatorServer ());
125132
126133 this .idempotenceManager = idempotenceManager ;
134+ this .clock = clock ;
127135 this .writerMetricGroup = writerMetricGroup ;
128136
129137 // TODO add retry logic while send failed. See FLUSS-56364375
@@ -175,7 +183,8 @@ public void runOnce() throws Exception {
175183 }
176184
177185 // do send.
178- sendWriteData ();
186+ long currentTimeMillis = clock .milliseconds ();
187+ sendWriteData (currentTimeMillis );
179188 }
180189
181190 public boolean isRunning () {
@@ -188,7 +197,7 @@ private void addToInflightBatches(Map<Integer, List<WriteBatch>> batches) {
188197 }
189198 }
190199
191- private void sendWriteData () throws Exception {
200+ private void sendWriteData (long now ) throws Exception {
192201 // get the list of buckets with data ready to send.
193202 ReadyCheckResult readyCheckResult = accumulator .ready (metadataUpdater .getCluster ());
194203
@@ -216,7 +225,9 @@ private void sendWriteData() throws Exception {
216225 if (!batches .isEmpty ()) {
217226 addToInflightBatches (batches );
218227
219- // TODO add logic for batch expire.
228+ // check and expire batches if they have reached batch delivery timeout. This can avoid
229+ // the client to wait for a long time while the batch is forever retry.
230+ checkAndExpireBatches (now );
220231
221232 sendWriteRequests (batches );
222233
@@ -239,9 +250,8 @@ private void failBatch(WriteBatch batch, Exception exception, boolean adjustBatc
239250 if (idempotenceManager .idempotenceEnabled ()) {
240251 try {
241252 // This call can throw an exception in the rare case that there's an invalid
242- // state
243- // transition attempted. Catch these so as not to interfere with the rest of the
244- // logic.
253+ // state transition attempted. Catch these so as not to interfere with the rest
254+ // of the logic.
245255 idempotenceManager .handleFailedBatch (batch , exception , adjustBatchSequences );
246256 } catch (Exception e ) {
247257 LOG .debug (
@@ -280,6 +290,64 @@ private void maybeRemoveAndDeallocateBatch(WriteBatch batch) {
280290 accumulator .deallocate (batch );
281291 }
282292
293+ private void checkAndExpireBatches (long now ) {
294+ List <WriteBatch > expiredInflightBatches = getExpiredInflightBatches (now );
295+ List <WriteBatch > expiredBatches = accumulator .expiredBatches (now );
296+ expiredBatches .addAll (expiredInflightBatches );
297+ if (!expiredBatches .isEmpty ()) {
298+ LOG .trace ("Client found expired batches: {}" , expiredBatches );
299+ }
300+
301+ for (WriteBatch batch : expiredBatches ) {
302+ failBatch (
303+ batch ,
304+ new TimeoutException (
305+ String .format (
306+ "Expiring %s records for %s : %s ms has passed since batch creation." ,
307+ batch .recordCount ,
308+ batch .tableBucket (),
309+ now - batch .getCreatedMs ())),
310+ false );
311+ }
312+ }
313+
314+ /** Get the in-flight batches that has reached batch delivery timeout. */
315+ @ VisibleForTesting
316+ List <WriteBatch > getExpiredInflightBatches (long now ) {
317+ List <WriteBatch > expiredBatches = new ArrayList <>();
318+ for (Iterator <Map .Entry <TableBucket , List <WriteBatch >>> batchIt =
319+ inFlightBatches .entrySet ().iterator ();
320+ batchIt .hasNext (); ) {
321+ Map .Entry <TableBucket , List <WriteBatch >> entry = batchIt .next ();
322+ List <WriteBatch > tbFlightBatches = entry .getValue ();
323+ if (tbFlightBatches != null ) {
324+ Iterator <WriteBatch > iter = tbFlightBatches .iterator ();
325+ while (iter .hasNext ()) {
326+ WriteBatch batch = iter .next ();
327+ if (batch .hasReachedDeliveryTimeout (accumulator .getDeliveryTimeoutMs (), now )) {
328+ iter .remove ();
329+ if (!batch .isDone ()) {
330+ expiredBatches .add (batch );
331+ } else {
332+ throw new IllegalStateException (
333+ batch .tableBucket ()
334+ + " batch created at "
335+ + batch .getCreatedMs ()
336+ + " gets unexpected final state "
337+ + batch .getFinalState ());
338+ }
339+ } else {
340+ break ;
341+ }
342+ }
343+ if (tbFlightBatches .isEmpty ()) {
344+ batchIt .remove ();
345+ }
346+ }
347+ }
348+ return expiredBatches ;
349+ }
350+
283351 private void maybeRemoveFromInflightBatches (WriteBatch batch ) {
284352 synchronized (inFlightBatchesLock ) {
285353 List <WriteBatch > batches = inFlightBatches .get (batch .tableBucket ());
@@ -355,12 +423,11 @@ private void sendProduceLogRequestAndHandleResponse(
355423 ProduceLogRequest request ,
356424 long tableId ,
357425 Map <TableBucket , WriteBatch > recordsByBucket ) {
358- long startTime = System . currentTimeMillis ();
426+ long startTime = clock . milliseconds ();
359427 gateway .produceLog (request )
360428 .whenComplete (
361429 (produceLogResponse , e ) -> {
362- writerMetricGroup .setSendLatencyInMs (
363- System .currentTimeMillis () - startTime );
430+ writerMetricGroup .setSendLatencyInMs (clock .milliseconds () - startTime );
364431 if (e != null ) {
365432 handleWriteRequestException (e , recordsByBucket );
366433 } else {
@@ -375,12 +442,11 @@ private void sendPutKvRequestAndHandleResponse(
375442 PutKvRequest request ,
376443 long tableId ,
377444 Map <TableBucket , WriteBatch > recordsByBucket ) {
378- long startTime = System . currentTimeMillis ();
445+ long startTime = clock . milliseconds ();
379446 gateway .putKv (request )
380447 .whenComplete (
381448 (putKvResponse , e ) -> {
382- writerMetricGroup .setSendLatencyInMs (
383- System .currentTimeMillis () - startTime );
449+ writerMetricGroup .setSendLatencyInMs (clock .milliseconds () - startTime );
384450 if (e != null ) {
385451 handleWriteRequestException (e , recordsByBucket );
386452 } else {
0 commit comments