Skip to content

Paused by user partitions are resumed after rebalance with CooperativeStickyAssignor #432

@ajax-shuliak-n

Description

@ajax-shuliak-n

Description

In ConsumerEventLoop.SubscribeEvent, the onPartitionsAssigned callback clears all entries from the pausedByUser set. This works correctly with eager rebalancing protocols (e.g., RangeAssignor, StickyAssignor) where all partitions are revoked and reassigned during rebalance. However, with CooperativeStickyAssignor, this behavior is incorrect because the cooperative protocol performs incremental rebalancing—only the partitions that are actually being transferred are revoked and reassigned, while other partitions continue to be owned by the same consumer.

The Problem

When a user pauses specific partitions using KafkaReceiver.doOnConsumer() and a rebalance occurs with CooperativeStickyAssignor:

  1. User pauses partition X via consumer.pause()
  2. Partition X is added to pausedByUser set
  3. Rebalance occurs (e.g., old consumer leaves the group)
  4. With CooperativeStickyAssignor, partition X is NOT revoked (it stays with the same consumer)
  5. onPartitionsAssigned is called with the newly assigned partitions
  6. BUG: pausedByUser is then repopulated only with partitions that are currently paused AND are in the newly assigned set, removing partition X from the tracked set
  7. Since partition X was never revoked, it's still assigned but no longer tracked in pausedByUser
  8. On the next resume operation, partition X is incorrectly resumed even though the user intended it to remain paused

Code Reference

The problematic code is in the onPartitionsAssigned callback within ConsumerEventLoop.SubscribeEvent:

@Override
public void onPartitionsAssigned(Collection partitions) {
    // ... other code ...
      if (receiverOptions.pauseAllAfterRebalance()) {
          // pause all partitions if any partitions are paused by user before rebalance
          log.debug("User requested re-pausing all assignments");
          toRepause.addAll(partitions);
          pausedByUser.clear(); // clear all previously paused partitions
          pausedByUser.addAll(partitions); // add only currently assigned partitions
      
      } else {
          Iterator<TopicPartition> iterator = pausedByUser.iterator();
          while (iterator.hasNext()) {
              TopicPartition next = iterator.next();
              if (partitions.contains(next)) {
                  toRepause.add(next); 
              } else {
                  iterator.remove(); // remove partitions that were not in assignment
              }
          }
      }
    // ... other code ...
}

Expected Behavior

When a rebalance occurs with CooperativeStickyAssignor, partitions that were previously paused by the user should remain paused, unless those specific partitions were revoked during the rebalance. New partition assignments should not affect previously paused partitions that the consumer continues to own.

In other words: if I pause partition X, and then a rebalance happens that doesn't revoke partition X from me, partition X should still be paused after the rebalance completes.

Actual Behavior

When a rebalance occurs with CooperativeStickyAssignor, all previously paused partitions lose their "paused by user" tracking, even if those partitions were not revoked. As a result, partitions that the user explicitly paused may be automatically resumed, violating the user's intent.

Example scenario:

  1. User pauses partition 0
  2. Another consumer leaves the group, triggering rebalance
  3. Partition 0 stays on the same consumer (not revoked) and new partition assigned
  4. After rebalance, partition 0 is no longer tracked as "paused by user"
  5. Later, when backpressure clears, partition 0 gets resumed automatically — even though the user never requested to resume it

Steps to Reproduce

  1. Configure consumers with CooperativeStickyAssignor:

    props.put("partition.assignment.strategy", 
        "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
  2. Create a topic with multiple partitions (e.g., 4 partitions)

  3. Start two consumers (Consumer A and Consumer B) in the same consumer group — partitions are distributed between them

  4. On Consumer A, pause one of its assigned partitions:

    receiverA.doOnConsumer(consumer -> {
        consumer.pause(Collections.singleton(new TopicPartition("topic", 0)));
        return null;
    }).block();
  5. Consumer B leaves the group (shutdown or disconnect)

  6. Rebalance occurs — Consumer A receives new partition assignments (partitions previously owned by Consumer B)

  7. With CooperativeStickyAssignor, Consumer A's originally owned partitions (including the paused one) are NOT revoked, only new partitions are assigned

  8. Bug: The previously paused partition is no longer tracked in pausedByUser and can be incorrectly resumed.

Possible Solution

Use the current assignment from the consumer to determine which partitions to re-pause, rather than relying solely on the newly assigned partitions passed to onPartitionsAssigned.

@Override
public void onPartitionsAssigned(Collection partitions) {
    // ... other code ...
      Set<TopicPartition> currentAssignment = consumer.assignment();
      if (!pausedByUser.isEmpty()) {
         List<TopicPartition> toRepause = new ArrayList<>();
         if (receiverOptions.pauseAllAfterRebalance()) {
            // pause all partitions if any partitions are paused by user before rebalance
            log.debug("User requested re-pausing all assignments");
            toRepause.addAll(currentAssignment); // using currentAssignment instead of partitions
            pausedByUser.clear();
            pausedByUser.addAll(currentAssignment); // using currentAssignment instead of partitions
   
         } else {
            Iterator<TopicPartition> iterator = pausedByUser.iterator();
            while (iterator.hasNext()) {
               TopicPartition next = iterator.next();
               if (currentAssignment.contains(next)) { // using currentAssignment instead of partitions
                  toRepause.add(next);
               } else {
                  iterator.remove();
               }
            }
         }
         if (!repausedAll && !toRepause.isEmpty()) {
            consumer.pause(toRepause);
         }
      }
    // ... other code ...
}

Your Environment

  • reactor-kafka version: 1.3.23
  • JVM version: 21

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions