@@ -106,6 +106,11 @@ public class SharePartition {
106106
107107 private static final Logger log = LoggerFactory .getLogger (SharePartition .class );
108108
109+ /**
110+ * Minimum number of records to deliver when throttling
111+ */
112+ private static final int MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT = 2 ;
113+
109114 /**
110115 * empty member id used to indicate when a record is not acquired by any member.
111116 */
@@ -201,6 +206,11 @@ enum SharePartitionState {
201206 */
202207 private final int maxDeliveryCount ;
203208
209+ /**
210+ * Records whose delivery count exceeds this are deemed abnormal and the batching of these records
211+ * should be reduced. The limit is set to half of maxDeliveryCount rounded up, with a minimum of 2.
212+ */
213+ private final int throttleRecordsDeliveryLimit ;
204214 /**
205215 * The group config manager is used to retrieve the values for dynamic group configurations
206216 */
@@ -362,6 +372,7 @@ enum SharePartitionState {
362372 this .leaderEpoch = leaderEpoch ;
363373 this .maxInFlightRecords = maxInFlightRecords ;
364374 this .maxDeliveryCount = maxDeliveryCount ;
375+ this .throttleRecordsDeliveryLimit = Math .max (MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT , (int ) Math .ceil ((double ) maxDeliveryCount / 2 ));
365376 this .cachedState = new ConcurrentSkipListMap <>();
366377 this .lock = new ReentrantReadWriteLock ();
367378 this .findNextFetchOffset = false ;
@@ -834,7 +845,16 @@ public ShareAcquiredRecords acquire(
834845 boolean fullMatch = checkForFullMatch (inFlightBatch , firstBatch .baseOffset (), lastOffsetToAcquire );
835846 int numRecordsRemaining = maxRecordsToAcquire - acquiredCount ;
836847 boolean recordLimitSubsetMatch = isRecordLimitMode && checkForRecordLimitSubsetMatch (inFlightBatch , maxRecordsToAcquire , acquiredCount );
837- if (!fullMatch || inFlightBatch .offsetState () != null || recordLimitSubsetMatch ) {
848+ boolean throttleRecordsDelivery = shouldThrottleRecordsDelivery (inFlightBatch , firstBatch .baseOffset (), lastOffsetToAcquire );
849+ // Stop acquiring more records if records delivery has to be throttled. The throttling prevents
850+ // complete batch to be archived in case of a single record being corrupt.
851+ // Below check isolates the current batch/offsets to be delivered individually in subsequent fetches.
852+ if (throttleRecordsDelivery && acquiredCount > 0 ) {
853+ // Set the max records to acquire as 0 to prevent further acquisition of records.
854+ maxRecordsToAcquire = 0 ;
855+ break ;
856+ }
857+ if (!fullMatch || inFlightBatch .offsetState () != null || recordLimitSubsetMatch || throttleRecordsDelivery ) {
838858 log .trace ("Subset or offset tracked batch record found for share partition,"
839859 + " batch: {} request offsets - first: {}, last: {} for the share"
840860 + " partition: {}-{}" , inFlightBatch , firstBatch .baseOffset (),
@@ -859,7 +879,14 @@ public ShareAcquiredRecords acquire(
859879 // maxRecordsToAcquire. Hence, pass the remaining number of records that can
860880 // be acquired.
861881 int acquiredSubsetCount = acquireSubsetBatchRecords (memberId , isRecordLimitMode , numRecordsRemaining , firstBatch .baseOffset (), lastOffsetToAcquire , inFlightBatch , result );
882+
862883 acquiredCount += acquiredSubsetCount ;
884+ // If records are throttled, return immediately and set `maxRecordsToAcquire = 0`
885+ // to prevent acquiring any new records afterwards.
886+ if (throttleRecordsDelivery && acquiredSubsetCount > 0 ) {
887+ maxRecordsToAcquire = 0 ;
888+ break ;
889+ }
863890 continue ;
864891 }
865892
@@ -1866,6 +1893,8 @@ private int acquireSubsetBatchRecords(
18661893 ) {
18671894 lock .writeLock ().lock ();
18681895 int acquiredCount = 0 ;
1896+ long maxFetchRecordsWhileThrottledRecords = -1 ;
1897+ boolean hasThrottledRecord = false ;
18691898 try {
18701899 for (Map .Entry <Long , InFlightState > offsetState : inFlightBatch .offsetState ().entrySet ()) {
18711900 // For the first batch which might have offsets prior to the request base
@@ -1885,7 +1914,29 @@ private int acquireSubsetBatchRecords(
18851914 continue ;
18861915 }
18871916
1888- InFlightState updateResult = offsetState .getValue ().tryUpdateState (RecordState .ACQUIRED , DeliveryCountOps .INCREASE ,
1917+ int recordDeliveryCount = offsetState .getValue ().deliveryCount ();
1918+ // If the record is on last delivery attempt then isolate that record to be delivered alone.
1919+ // If the respective record is corrupt then it prevents increasing delivery count of multiple
1920+ // records in a single response batch. Condition below checks if the current record has reached
1921+ // the delivery limit and already have some records to return in response then skip processing
1922+ // the current record, which shall be delivered alone in next fetch.
1923+ if (maxDeliveryCount > 2 && recordDeliveryCount == maxDeliveryCount - 1 && acquiredCount > 0 ) {
1924+ break ;
1925+ }
1926+
1927+ // When record delivery count reach the throttle threshold, progressively reduce batch size to isolate records.
1928+ // The `maxFetchRecordsWhileThrottledRecords` is halved with each additional delivery attempt beyond the throttle limit.
1929+ // Example:
1930+ // - maxDeliveryCount = 6, throttleRecordsDeliveryLimit = 3, batch size = 500
1931+ // - deliveryCount = 3: maxFetchRecords = 500 >> (3 - 3 + 1) = 250
1932+ // - deliveryCount = 4: maxFetchRecords = 500 >> (4 - 3 + 1) = 125
1933+ // The `maxFetchRecordsWhileThrottledRecords` is calculated based on the first acquirable record that meets the throttling criteria in the batch.
1934+ if (recordDeliveryCount >= throttleRecordsDeliveryLimit && maxFetchRecordsWhileThrottledRecords < 0 ) {
1935+ maxFetchRecordsWhileThrottledRecords = Math .max (1 , (long ) inFlightBatch .offsetState ().size () >> (recordDeliveryCount - throttleRecordsDeliveryLimit + 1 ));
1936+ hasThrottledRecord = true ;
1937+ }
1938+
1939+ InFlightState updateResult = offsetState .getValue ().tryUpdateState (RecordState .ACQUIRED , DeliveryCountOps .INCREASE ,
18891940 maxDeliveryCount , memberId );
18901941 if (updateResult == null || updateResult .state () != RecordState .ACQUIRED ) {
18911942 log .trace ("Unable to acquire records for the offset: {} in batch: {}"
@@ -1904,10 +1955,18 @@ private int acquireSubsetBatchRecords(
19041955 .setLastOffset (offsetState .getKey ())
19051956 .setDeliveryCount ((short ) offsetState .getValue ().deliveryCount ()));
19061957 acquiredCount ++;
1958+
1959+ // Delivered alone.
1960+ if (offsetState .getValue ().deliveryCount () == maxDeliveryCount && maxDeliveryCount > 2 ) {
1961+ break ;
1962+ }
19071963 if (isRecordLimitMode && acquiredCount == maxFetchRecords ) {
19081964 // In record_limit mode, acquire only the requested number of records.
19091965 break ;
19101966 }
1967+ if (hasThrottledRecord && acquiredCount == maxFetchRecordsWhileThrottledRecords ) {
1968+ break ;
1969+ }
19111970 }
19121971 } finally {
19131972 lock .writeLock ().unlock ();
@@ -1942,6 +2001,33 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch
19422001 return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset ;
19432002 }
19442003
2004+ /**
2005+ * Check if the in-flight batch should be throttled based on delivery count.
2006+ *
2007+ * @param inFlightBatch The in-flight batch to check for throttling.
2008+ * @param requestFirstOffset The first offset to acquire.
2009+ * @param requestLastOffset THe last offset to acquire.
2010+ * @return True if the batch should be throttled (delivery count >= threshold), false otherwise.
2011+ */
2012+ private boolean shouldThrottleRecordsDelivery (InFlightBatch inFlightBatch , long requestFirstOffset , long requestLastOffset ) {
2013+ if (inFlightBatch .offsetState () == null ) {
2014+ return inFlightBatch .batchDeliveryCount () >= throttleRecordsDeliveryLimit ;
2015+ }
2016+
2017+ return inFlightBatch .offsetState ().entrySet ().stream ().filter (entry -> {
2018+ if (entry .getKey () < requestFirstOffset ) {
2019+ return false ;
2020+ }
2021+ if (entry .getKey () > requestLastOffset ) {
2022+ return false ;
2023+ }
2024+ if (entry .getValue ().state () != RecordState .AVAILABLE ) {
2025+ return false ;
2026+ }
2027+ return true ;
2028+ }).mapToInt (entry -> entry .getValue ().deliveryCount ()).max ().orElse (0 ) >= throttleRecordsDeliveryLimit ;
2029+ }
2030+
19452031 // Visibility for test
19462032 static Map <Long , Byte > fetchAckTypeMapForBatch (ShareAcknowledgementBatch batch ) {
19472033 // Client can either send a single entry in acknowledgeTypes which represents the state
0 commit comments