|
41 | 41 | import org.apache.kafka.coordinator.group.GroupConfigManager; |
42 | 42 | import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; |
43 | 43 | import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; |
| 44 | +import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler; |
| 45 | +import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask; |
44 | 46 | import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; |
45 | 47 | import org.apache.kafka.server.share.fetch.DelayedShareFetchKey; |
46 | 48 | import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; |
@@ -2391,59 +2393,61 @@ private AcquisitionLockTimerTask acquisitionLockTimerTask( |
2391 | 2393 | long lastOffset, |
2392 | 2394 | long delayMs |
2393 | 2395 | ) { |
2394 | | - return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, lastOffset); |
| 2396 | + return new AcquisitionLockTimerTask(time, delayMs, memberId, firstOffset, lastOffset, releaseAcquisitionLockOnTimeout(), sharePartitionMetrics); |
2395 | 2397 | } |
2396 | 2398 |
|
2397 | | - private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) { |
2398 | | - List<PersisterStateBatch> stateBatches; |
2399 | | - lock.writeLock().lock(); |
2400 | | - try { |
2401 | | - Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset); |
2402 | | - if (floorOffset == null) { |
2403 | | - log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition); |
2404 | | - return; |
2405 | | - } |
2406 | | - stateBatches = new ArrayList<>(); |
2407 | | - NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true); |
2408 | | - for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { |
2409 | | - InFlightBatch inFlightBatch = entry.getValue(); |
| 2399 | + private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() { |
| 2400 | + return (memberId, firstOffset, lastOffset) -> { |
| 2401 | + List<PersisterStateBatch> stateBatches; |
| 2402 | + lock.writeLock().lock(); |
| 2403 | + try { |
| 2404 | + Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset); |
| 2405 | + if (floorOffset == null) { |
| 2406 | + log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition); |
| 2407 | + return; |
| 2408 | + } |
| 2409 | + stateBatches = new ArrayList<>(); |
| 2410 | + NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true); |
| 2411 | + for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { |
| 2412 | + InFlightBatch inFlightBatch = entry.getValue(); |
2410 | 2413 |
|
2411 | | - if (inFlightBatch.offsetState() == null |
| 2414 | + if (inFlightBatch.offsetState() == null |
2412 | 2415 | && inFlightBatch.batchState() == RecordState.ACQUIRED |
2413 | 2416 | && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) { |
2414 | 2417 |
|
2415 | | - // For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some |
2416 | | - // acquired records that need to move to archived state despite their delivery count. |
2417 | | - inFlightBatch.maybeInitializeOffsetStateUpdate(); |
2418 | | - } |
| 2418 | + // For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some |
| 2419 | + // acquired records that need to move to archived state despite their delivery count. |
| 2420 | + inFlightBatch.maybeInitializeOffsetStateUpdate(); |
| 2421 | + } |
2419 | 2422 |
|
2420 | | - // Case when the state of complete batch is valid |
2421 | | - if (inFlightBatch.offsetState() == null) { |
2422 | | - releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId); |
2423 | | - } else { // Case when batch has a valid offset state map. |
2424 | | - releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset); |
| 2423 | + // Case when the state of complete batch is valid |
| 2424 | + if (inFlightBatch.offsetState() == null) { |
| 2425 | + releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId); |
| 2426 | + } else { // Case when batch has a valid offset state map. |
| 2427 | + releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset); |
| 2428 | + } |
2425 | 2429 | } |
2426 | | - } |
2427 | 2430 |
|
2428 | | - if (!stateBatches.isEmpty()) { |
2429 | | - writeShareGroupState(stateBatches).whenComplete((result, exception) -> { |
2430 | | - if (exception != null) { |
2431 | | - log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", |
2432 | | - groupId, topicIdPartition, memberId, exception); |
2433 | | - } |
2434 | | - // Even if write share group state RPC call fails, we will still go ahead with the state transition. |
2435 | | - // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. |
2436 | | - maybeUpdateCachedStateAndOffsets(); |
2437 | | - }); |
| 2431 | + if (!stateBatches.isEmpty()) { |
| 2432 | + writeShareGroupState(stateBatches).whenComplete((result, exception) -> { |
| 2433 | + if (exception != null) { |
| 2434 | + log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", |
| 2435 | + groupId, topicIdPartition, memberId, exception); |
| 2436 | + } |
| 2437 | + // Even if write share group state RPC call fails, we will still go ahead with the state transition. |
| 2438 | + // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. |
| 2439 | + maybeUpdateCachedStateAndOffsets(); |
| 2440 | + }); |
| 2441 | + } |
| 2442 | + } finally { |
| 2443 | + lock.writeLock().unlock(); |
2438 | 2444 | } |
2439 | | - } finally { |
2440 | | - lock.writeLock().unlock(); |
2441 | | - } |
2442 | 2445 |
|
2443 | | - // If we have an acquisition lock timeout for a share-partition, then we should check if |
2444 | | - // there is a pending share fetch request for the share-partition and complete it. |
2445 | | - // Skip null check for stateBatches, it should always be initialized if reached here. |
2446 | | - maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty()); |
| 2446 | + // If we have an acquisition lock timeout for a share-partition, then we should check if |
| 2447 | + // there is a pending share fetch request for the share-partition and complete it. |
| 2448 | + // Skip null check for stateBatches, it should always be initialized if reached here. |
| 2449 | + maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty()); |
| 2450 | + }; |
2447 | 2451 | } |
2448 | 2452 |
|
2449 | 2453 | private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch, |
@@ -2834,35 +2838,6 @@ void gapStartOffset(long gapStartOffset) { |
2834 | 2838 | } |
2835 | 2839 | } |
2836 | 2840 |
|
2837 | | - // Visible for testing |
2838 | | - final class AcquisitionLockTimerTask extends TimerTask { |
2839 | | - private final long expirationMs; |
2840 | | - private final String memberId; |
2841 | | - private final long firstOffset; |
2842 | | - private final long lastOffset; |
2843 | | - |
2844 | | - AcquisitionLockTimerTask(long delayMs, String memberId, long firstOffset, long lastOffset) { |
2845 | | - super(delayMs); |
2846 | | - this.expirationMs = time.hiResClockMs() + delayMs; |
2847 | | - this.memberId = memberId; |
2848 | | - this.firstOffset = firstOffset; |
2849 | | - this.lastOffset = lastOffset; |
2850 | | - } |
2851 | | - |
2852 | | - long expirationMs() { |
2853 | | - return expirationMs; |
2854 | | - } |
2855 | | - |
2856 | | - /** |
2857 | | - * The task is executed when the acquisition lock timeout is reached. The task releases the acquired records. |
2858 | | - */ |
2859 | | - @Override |
2860 | | - public void run() { |
2861 | | - sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1); |
2862 | | - releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset); |
2863 | | - } |
2864 | | - } |
2865 | | - |
2866 | 2841 | /** |
2867 | 2842 | * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. |
2868 | 2843 | */ |
|
0 commit comments