6969import java .util .Set ;
7070import java .util .concurrent .CompletableFuture ;
7171import java .util .concurrent .ConcurrentHashMap ;
72- import java .util .concurrent .ConcurrentLinkedQueue ;
73- import java .util .concurrent .atomic .AtomicBoolean ;
7472
7573import scala .jdk .javaapi .CollectionConverters ;
7674
@@ -102,16 +100,6 @@ public class SharePartitionManager implements AutoCloseable {
102100 */
103101 private final ShareSessionCache cache ;
104102
105- /**
106- * The fetch queue stores the share fetch requests that are waiting to be processed.
107- */
108- private final ConcurrentLinkedQueue <ShareFetchData > fetchQueue ;
109-
110- /**
111- * The process fetch queue lock is used to ensure that only one thread is processing the fetch queue at a time.
112- */
113- private final AtomicBoolean processFetchQueueLock ;
114-
115103 /**
116104 * The group config manager is used to retrieve the values for dynamic group configurations
117105 */
@@ -184,30 +172,27 @@ private SharePartitionManager(
184172 GroupConfigManager groupConfigManager ,
185173 Metrics metrics
186174 ) {
187- this .replicaManager = replicaManager ;
188- this .time = time ;
189- this .cache = cache ;
190- this .partitionCacheMap = partitionCacheMap ;
191- this .fetchQueue = new ConcurrentLinkedQueue <>();
192- this .processFetchQueueLock = new AtomicBoolean (false );
193- this .defaultRecordLockDurationMs = defaultRecordLockDurationMs ;
194- this .timer = new SystemTimerReaper ("share-group-lock-timeout-reaper" ,
195- new SystemTimer ("share-group-lock-timeout" ));
196- this .maxDeliveryCount = maxDeliveryCount ;
197- this .maxInFlightMessages = maxInFlightMessages ;
198- this .persister = persister ;
199- this .groupConfigManager = groupConfigManager ;
200- this .shareGroupMetrics = new ShareGroupMetrics (Objects .requireNonNull (metrics ), time );
175+ this (replicaManager ,
176+ time ,
177+ cache ,
178+ partitionCacheMap ,
179+ defaultRecordLockDurationMs ,
180+ new SystemTimerReaper ("share-group-lock-timeout-reaper" ,
181+ new SystemTimer ("share-group-lock-timeout" )),
182+ maxDeliveryCount ,
183+ maxInFlightMessages ,
184+ persister ,
185+ groupConfigManager ,
186+ metrics
187+ );
201188 }
202189
203190 // Visible for testing.
204- @ SuppressWarnings ({"checkstyle:ParameterNumber" })
205191 SharePartitionManager (
206192 ReplicaManager replicaManager ,
207193 Time time ,
208194 ShareSessionCache cache ,
209195 Map <SharePartitionKey , SharePartition > partitionCacheMap ,
210- ConcurrentLinkedQueue <ShareFetchData > fetchQueue ,
211196 int defaultRecordLockDurationMs ,
212197 Timer timer ,
213198 int maxDeliveryCount ,
@@ -220,8 +205,6 @@ private SharePartitionManager(
220205 this .time = time ;
221206 this .cache = cache ;
222207 this .partitionCacheMap = partitionCacheMap ;
223- this .fetchQueue = fetchQueue ;
224- this .processFetchQueueLock = new AtomicBoolean (false );
225208 this .defaultRecordLockDurationMs = defaultRecordLockDurationMs ;
226209 this .timer = timer ;
227210 this .maxDeliveryCount = maxDeliveryCount ;
@@ -252,9 +235,7 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
252235 partitionMaxBytes .keySet (), groupId , fetchParams );
253236
254237 CompletableFuture <Map <TopicIdPartition , PartitionData >> future = new CompletableFuture <>();
255- ShareFetchData shareFetchData = new ShareFetchData (fetchParams , groupId , memberId , future , partitionMaxBytes );
256- fetchQueue .add (shareFetchData );
257- maybeProcessFetchQueue ();
238+ processShareFetch (new ShareFetchData (fetchParams , groupId , memberId , future , partitionMaxBytes ));
258239
259240 return future ;
260241 }
@@ -530,13 +511,6 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<Delay
530511 @ Override
531512 public void close () throws Exception {
532513 this .timer .close ();
533- this .persister .stop ();
534- if (!fetchQueue .isEmpty ()) {
535- log .warn ("Closing SharePartitionManager with pending fetch requests count: {}" , fetchQueue .size ());
536- fetchQueue .forEach (shareFetchData -> shareFetchData .future ().completeExceptionally (
537- Errors .BROKER_NOT_AVAILABLE .exception ()));
538- fetchQueue .clear ();
539- }
540514 }
541515
542516 private ShareSessionKey shareSessionKey (String groupId , Uuid memberId ) {
@@ -547,31 +521,11 @@ private static String partitionsToLogString(Collection<TopicIdPartition> partiti
547521 return ShareSession .partitionsToLogString (partitions , log .isTraceEnabled ());
548522 }
549523
550- /**
551- * Recursive function to process all the fetch requests present inside the fetch queue
552- */
553524 // Visible for testing.
554- void maybeProcessFetchQueue () {
555- if (!acquireProcessFetchQueueLock ()) {
556- // The queue is already being processed hence avoid re-triggering.
557- return ;
558- }
559-
560- ShareFetchData shareFetchData = fetchQueue .poll ();
561- if (shareFetchData == null ) {
562- // No more requests to process, so release the lock. Though we should not reach here as the lock
563- // is acquired only when there are requests in the queue. But still, it's safe to release the lock.
564- releaseProcessFetchQueueLock ();
565- return ;
566- }
567-
525+ void processShareFetch (ShareFetchData shareFetchData ) {
568526 if (shareFetchData .partitionMaxBytes ().isEmpty ()) {
569527 // If there are no partitions to fetch then complete the future with an empty map.
570528 shareFetchData .future ().complete (Collections .emptyMap ());
571- // Release the lock so that other threads can process the queue.
572- releaseProcessFetchQueueLock ();
573- if (!fetchQueue .isEmpty ())
574- maybeProcessFetchQueue ();
575529 return ;
576530 }
577531
@@ -590,7 +544,6 @@ void maybeProcessFetchQueue() {
590544 sharePartition .maybeInitialize ().whenComplete ((result , throwable ) -> {
591545 if (throwable != null ) {
592546 maybeCompleteInitializationWithException (sharePartitionKey , shareFetchData .future (), throwable );
593- return ;
594547 }
595548 });
596549 });
@@ -611,20 +564,9 @@ void maybeProcessFetchQueue() {
611564 // Add the share fetch to the delayed share fetch purgatory to process the fetch request.
612565 addDelayedShareFetch (new DelayedShareFetch (shareFetchData , replicaManager , this ),
613566 delayedShareFetchWatchKeys );
614-
615- // Release the lock so that other threads can process the queue.
616- releaseProcessFetchQueueLock ();
617- // If there are more requests in the queue, then process them.
618- if (!fetchQueue .isEmpty ())
619- maybeProcessFetchQueue ();
620-
621567 } catch (Exception e ) {
622568 // In case exception occurs then release the locks so queue can be further processed.
623569 log .error ("Error processing fetch queue for share partitions" , e );
624- releaseProcessFetchQueueLock ();
625- // If there are more requests in the queue, then process them.
626- if (!fetchQueue .isEmpty ())
627- maybeProcessFetchQueue ();
628570 }
629571 }
630572
@@ -679,15 +621,6 @@ private void maybeCompleteInitializationWithException(
679621 future .completeExceptionally (throwable );
680622 }
681623
682- // Visible for testing.
683- boolean acquireProcessFetchQueueLock () {
684- return processFetchQueueLock .compareAndSet (false , true );
685- }
686-
687- private void releaseProcessFetchQueueLock () {
688- processFetchQueueLock .set (false );
689- }
690-
691624 private SharePartitionKey sharePartitionKey (String groupId , TopicIdPartition topicIdPartition ) {
692625 return new SharePartitionKey (groupId , topicIdPartition );
693626 }
0 commit comments