Skip to content

Conversation

Copy link

Copilot AI commented Jan 15, 2026

Problem

Consumers permanently stop receiving messages when topic ownership transfers (broker failover, unload) and the CommandCloseConsumer notification fails to arrive due to network issues or broker crashes. Current connection-level ping-pong only validates TCP connectivity, not topic ownership.

Changes

Core Implementation

  • PartitionConsumerIdleDetector: Tracks consumer activity (receive, ack, nack, redeliver) and triggers lookup when idle for 30s (configurable). Reconnects with cleanup if ownership changed.
    • Clears unacked tracker, flushes ack queues, increments consumer epoch
    • Null-safe broker address comparison
    • Graceful handling of lookup failures
    • No-op pattern eliminates null checks throughout codebase
    • Race condition handling with broker notifications

Configuration (ConsumerConfigurationData)

  • consumerIdleTimeoutMs (default: 30000, set to 0 to disable)

Integration (ConsumerImpl)

  • Activity markers in internalReceive(), doAcknowledge(), negativeAcknowledge(), redeliverUnacknowledgedMessages()
  • Scheduled check every 10s with try-catch for thread pool safety
  • Package-private accessors for detector: getUnAckedMessageTracker(), getAcknowledgmentsGroupingTracker(), incrementConsumerEpoch()
  • Factory pattern ensures detector is always present (enabled or no-op)

Example Usage

Consumer<String> consumer = client.newConsumer()
    .topic("persistent://tenant/ns/topic")
    .subscriptionName("sub")
    .consumerIdleTimeoutMs(30000)  // default, 0 to disable
    .subscribe();

Test Coverage

  • Idle detection triggers reconnect
  • Activity resets timer
  • Ownership change detection via lookup
  • No reconnect when ownership unchanged
  • Lookup failure handling
  • Feature disable scenarios
Original prompt

Add partition-level idle detection for consumer to prevent permanent message loss after topic ownership transfer

Problem

When topic ownership transfers (e.g., broker failover, topic unload), some partition consumers may permanently fail to receive messages because:

  1. Consumers rely entirely on broker's CommandCloseConsumer notification
  2. If the broker fails to send the notification (network issues, broker crash, race conditions), the consumer never knows the topic is fenced
  3. Current connection-level ping-pong only checks TCP connectivity, not topic-level state
  4. There is NO active detection mechanism on the consumer side

Impact: Silent data loss, requires manual intervention, affects production availability

Solution

Implement partition-level idle detection + automatic reconnect mechanism in the consumer:

Key Features

  1. Granularity: Per-partition consumer (not connection-level)
  2. Activity Markers: Mark consumer as active on:
    • Message received (internalReceive())
    • Successful acknowledgment (doAcknowledge())
    • Negative acknowledgment (negativeAcknowledge())
    • Unacknowledged message redelivery
  3. Detection Logic:
    • If no activity for 30 seconds (configurable)
    • Trigger lookup to verify topic ownership
    • If ownership changed → reconnect with cleanup
  4. Cleanup on Reconnect:
    • Clear unacked message tracker
    • Clear pending ack queues (individual + cumulative)
    • Clear batch message ack tracker
    • Reset consumer epoch (prevent old acks from being accepted)

Implementation Requirements

1. Create PartitionConsumerIdleDetector.java

Location: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java

Key Methods:

  • markActive() - Called when consumer has activity
  • checkIdleAndReconnectIfNeeded() - Periodic check for idle state
  • verifyTopicOwnership() - Lookup to verify if broker changed
  • reconnectWithCleanup() - Reconnect + cleanup logic

Cleanup Logic:

private CompletableFuture<Void> reconnectWithCleanup() {
    return CompletableFuture.runAsync(() -> {
        // 1. Clear unacked message tracker
        consumer.getUnAckedMessageTracker().clear();
        
        // 2. Clear pending ack queues
        consumer.getPendingIndividualAcks().clear();
        consumer.getPendingCumulativeAcks().clear();
        
        // 3. Clear batch message ack tracker
        if (consumer.getBatchMessageAckTracker() != null) {
            consumer.getBatchMessageAckTracker().clear();
        }
        
        // 4. Reset consumer epoch
        consumer.incrementConsumerEpoch();
        
    }, consumer.getInternalExecutor())
    .thenCompose(__ -> consumer.reconnectLater(...));
}

2. Modify ConsumerImpl.java

Add fields:

private final PartitionConsumerIdleDetector idleDetector;
private ScheduledFuture<?> idleCheckTask;

Integrate in constructor:

// Initialize idle detector if enabled
long idleTimeoutMs = conf.getConsumerIdleTimeoutMs();
if (idleTimeoutMs > 0 && conf.isEnablePartitionOwnershipCheck()) {
    this.idleDetector = new PartitionConsumerIdleDetector(this, idleTimeoutMs);
    
    // Schedule periodic check every 10 seconds
    this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> {
        idleDetector.checkIdleAndReconnectIfNeeded()
            .exceptionally(ex -> {
                log.warn("[{}][{}] Idle check failed", topic, subscription, ex);
                return null;
            });
    }, idleTimeoutMs, 10_000, TimeUnit.MILLISECONDS);
} else {
    this.idleDetector = null;
}

Add activity markers:

// In internalReceive()
if (idleDetector != null) {
    idleDetector.markActive();
}

// In doAcknowledge()
if (idleDetector != null) {
    idleDetector.markActive();
}

// In negativeAcknowledge()
if (idleDetector != null) {
    idleDetector.markActive();
}

// In redeliverUnacknowledgedMessages()
if (idleDetector != null) {
    idleDetector.markActive();
}

Expose helper methods (package-private for IdleDetector):

UnAckedMessageTracker getUnAckedMessageTracker() { return unAckedMessageTracker; }
ConcurrentLinkedQueue<OpSendMsg> getPendingIndividualAcks() { return pendingIndividualAcks; }
ConcurrentLinkedQueue<OpSendMsg> getPendingCumulativeAcks() { return pendingCumulativeAcks; }
BatchMessageAckTracker getBatchMessageAckTracker() { return batchMessageAckTracker; }
void incrementConsumerEpoch() { this.consumerEpoch.incrementAndGet(); }

Clean up in closeAsync():

if (idleCheckTask != null) {
    idleCheckTask.cancel(false);
}

3. Modify ConsumerConfigurationData.java

Add configuration fields:

/**
 * Consumer idle timeout in milliseconds.
 * If the consumer doesn't receive messages or perform acks for this duration,
 * it will trigger a lookup to verify topic ownership and reconnect if changed.
 * 
 * Set to 0 to disable idle...

</details>



<!-- START COPILOT CODING AGENT SUFFIX -->

*This pull request was created from Copilot chat.*
>

<!-- START COPILOT CODING AGENT TIPS -->
---

💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more [Copilot coding agent tips](https://gh.io/copilot-coding-agent-tips) in the docs.

Copilot AI and others added 5 commits January 15, 2026 03:43
Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
…t magic numbers

Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
… idle detection

Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
… improve test reliability

Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
Copilot AI changed the title [WIP] Add partition-level idle detection for consumer to prevent message loss Add partition-level idle detection to prevent message loss on ownership transfer Jan 15, 2026
Copilot AI requested a review from nodece January 15, 2026 03:55
Copilot stopped work on behalf of nodece due to an error January 15, 2026 04:09
@nodece
Copy link
Owner

nodece commented Jan 15, 2026

@copilot
Please consider the producer. Please remove enablePartitionOwnershipCheck.
if (idleDetector != null) { this code too many.
idleDetector.markActive();
}
Please consider, broker Notification the client and client reconnect, check race condition.

Copilot AI and others added 2 commits January 15, 2026 04:14
…o-op detector pattern

Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
…ctor reconnect

Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
Copy link
Author

Copilot AI commented Jan 15, 2026

@copilot
Please consider the producer. Please remove enablePartitionOwnershipCheck.
if (idleDetector != null) { this code too many....

Done in commits 33e09e7 and 3891e64:

  1. Removed enablePartitionOwnershipCheck - Feature now controlled only by consumerIdleTimeoutMs (0 disables)
  2. Eliminated all if (idleDetector != null) checks - Implemented no-op detector pattern via factory method
  3. Race condition handling - Added state check before reconnection to avoid conflicts with broker notifications. ConnectionHandler already synchronizes concurrent reconnection attempts.
  4. Producer consideration - Noted for potential future enhancement, but not in scope for consumer-focused fix.

Co-authored-by: nodece <16235121+nodece@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants