Skip to content
Open
278 changes: 278 additions & 0 deletions pip/pip-423.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
# PIP-423: Add a new admin API to acknowledge a single message

# Background knowledge

* **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends.
* **Batch Messages:** To improve throughput, Pulsar producers can batch multiple individual messages into a single entry that is written to BookKeeper. In this case, the `MessageId` also contains a `batchIndex` to identify a specific message within the batch. The entry's metadata stores the total number of messages in the batch.
* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). The cursor is responsible for ensuring that acknowledged messages are not redelivered.
* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message, or even a single message within a batch. When a message is acknowledged individually, the broker's `ManagedCursor` persistently tracks this "acknowledgement hole" to ensure that acknowledged messages are not redelivered after a broker or consumer restart. This proposal leverages this existing, robust mechanism.
* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. Since the message is marked as consumed by the cursor, the `DelayedDeliveryTracker` will not dispatch it.
* **Existing `skip` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal provides a more precise way to skip messages by their specific `MessageId`.

# Motivation

Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance:

* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely by acknowledging it pre-emptively.
* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. This also applies to removing a single bad message from within a larger batch.
* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's delivery queue.

The existing `skip(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by their specific `MessageId` (including `ledgerId`, `entryId`, and optional `batchIndex`), providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog.

# Goals

## In Scope

* Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI command to support skipping specific messages for a subscription.
* The target message(s) will be identified by their `ledgerId`, `entryId`, and an optional `batchIndex` for messages within a batch.
* The implementation will leverage Pulsar's existing, robust `AckType.Individual` mechanism for persistence and reliability.
* This feature will only be supported for subscription types that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`).
* Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription.
* Support for both partitioned and non-partitioned topics.

## Out of Scope

* Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated endpoint will be created for clarity.
* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation that must be run on each cluster where the skip is needed.

# High Level Design

The proposed solution introduces a new admin API that triggers Pulsar's individual acknowledgement capability on demand for specific messages.

1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request specifies the topic, target subscription, and a list of message identifiers. These identifiers can be provided as a triplet of `ledgerId:entryId:batchIndex` or as Base64-encoded `MessageId` byte arrays.

2. **Broker Receives Request:** The Pulsar broker receives the admin request for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses the flexible JSON payload and validates the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` authorization rule.

3. **Delegate to Subscription:** The broker, after validating topic ownership and permissions, invokes a new method `skipMessages(List<SkipEntry> entries)` on the target `PersistentSubscription` object. For partitioned topics, the request is scattered to all partition brokers, and each partition broker performs this action.

4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the following occurs:
* It verifies that the subscription's type supports individual acknowledgements.
* For messages specified without a `batchIndex`, it constructs a `Position` object for the entire entry.
* For messages specified with a `batchIndex`, it first reads the entry from BookKeeper to get the batch metadata (e.g., batch size). It then constructs a `Position` object that includes an "ack set" (a bitset) indicating which messages within the batch are being acknowledged.
* It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for all the constructed `Position` objects.

5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these individual acknowledgements, which are persisted to metadata storage.
* For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered.
* For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` attempts to schedule it. The message is thus effectively **cancelled**.

This design is simple and robust as it builds upon the broker's proven message acknowledgement foundation while providing a clean, dedicated administrative API for this precise operational task.

# Detailed Design

## Design & Implementation Details

The implementation introduces a new flexible request DTO, extends the `Subscription` interface, and implements the core logic in `PersistentSubscription`.

1. **New Request DTO:** A new class `SkipMessageIdsRequest` is created to handle polymorphic JSON deserialization on the broker. This allows the API to accept multiple formats for specifying message IDs.

2. **Subscription Interface Extension:** The `Subscription` interface is extended with a new method. `SkipEntry` is an internal record holding the `ledgerId`, `entryId`, and an optional list of `batchIndexes`.
```java
// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
public interface Subscription extends MessageExpirer {
// ... existing methods
CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> skipMessages(List<SkipEntry> entries);
// ... existing methods
}
```

3. **PersistentSubscription Implementation:** The `PersistentSubscription` class provides the concrete implementation. It differentiates between full-entry acknowledgements and partial (batch) acknowledgements.

```java
// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@Override
public CompletableFuture<Void> skipMessages(List<SkipEntry> entries) {
if (Subscription.isCumulativeAckMode(getType())) {
return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type."));
}

// Separate full-entry acks from partial (batchIndex) acks
List<Position> fullEntryPositions = new ArrayList<>();
Map<String, List<Integer>> partialAckIndexByPos = new HashMap<>();
// ... logic to populate these collections from 'entries'

// If there are partial acks, read the corresponding entries to get batch metadata
if (!partialAckIndexByPos.isEmpty()) {
Set<Position> positionsToLoad = ...; // positions for entries with batch acks
cursor.asyncReplayEntries(positionsToLoad, new AsyncCallbacks.ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> readEntries, Object ctx) {
// ... logic for each entry:
// 1. Parse MessageMetadata to get batch size.
// 2. Validate batch indexes.
// 3. Create a BitSet representing the ack state.
// 4. Create a Position with the ack set using AckSetStateUtil.createPositionWithAckSet().
// 5. Add this special position to a final list.

// Finally, acknowledge all positions (full and partial)
acknowledgeMessage(finalPositionsList, AckType.Individual, properties);
result.complete(null);
}
// ... handle failures
});
} else {
// Only full-entry acks are present, no need to read entries
acknowledgeMessage(fullEntryPositions, AckType.Individual, properties);
return CompletableFuture.completedFuture(null);
}
return result;
}
```

4. **Admin API Logic:** The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method that accepts the `SkipMessageIdsRequest` object, aggregates the requests into `SkipEntry` objects, and calls the `subscription.skipMessages` method. It also contains the logic to fan out the request to each partition for partitioned topics.

## Public-facing Changes

### Public API

A new REST endpoint is added for skipping specific messages.

* **Path:** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds`
* **Path Parameters:**
* All path parameters identify the target subscription.
* **HTTP Body Parameters (JSON):** A flexible JSON object that supports multiple formats.
* **Format 1: Structured Message IDs (with batch index support)**
* **Description:** Recommended format, supports batch indexes.
* **Example Body:**
```json
{
"type": "messageId",
"messageIds": [
{ "ledgerId": 12345, "entryId": 100 },
{ "ledgerId": 12346, "entryId": 200, "batchIndex": 5 }
]
}
```
* **Format 2: Base64 Encoded Message IDs**
* **Description:** A simple list of Base64 encoded byte arrays from `MessageId.toByteArray()`.
* **Example Body:**
```json
[
"CLlgEAQwAA==",
"CLlgEAYwAA=="
]
```
* **Response Codes:**
* `204 No Content`: The operation was successful.
* `400 Bad Request`: Invalid JSON payload or invalid message ID format (e.g., non-numeric ledgerId, invalid batchIndex).
* `403 Forbidden`: The client is not authorized to perform a `SKIP` operation on this topic.
* `404 Not Found`: The topic or subscription does not exist.
* `405 Method Not Allowed`: The subscription type does not support individual acknowledgement (e.g., `Exclusive`, `Failover`).
* `500 Internal Server Error`: An unexpected error occurred in the broker.

### Binary protocol

No changes are made to the Pulsar binary protocol.

### Configuration

**No new configuration parameters are introduced. However, this feature's performance and behavior under heavy load are directly influenced by existing `ManagedLedger` configurations that govern the persistence of acknowledgement holes.**

**Administrators should be aware of these settings if they expect a high volume of message cancellations:**

```
# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=10000

# Maximum number of partially acknowledged batch messages per subscription that will have their batch
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
# deleted indexes will be cleared while redelivering the messages to consumers.
managedLedgerMaxBatchDeletedIndexToPersist=10000

# When storing acknowledgement state, choose a more compact serialization format that stores
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
managedLedgerPersistIndividualAckAsLongArray=true

# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
managedLedgerUnackedRangesOpenCacheSetEnabled=true

# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# MetadataStore.
managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000

# ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY).
# If value is NONE, then save the ManagedCursorInfo bytes data directly without compression.
# Using compression reduces the size of persistent cursor (subscription) metadata. This enables using a higher
# managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the overall metadata stored in
# the metadata store such as ZooKeeper.
managedCursorInfoCompressionType=NONE

# ManagedCursorInfo compression size threshold (bytes), only compress metadata when origin size more then this value.
# 0 means compression will always apply.
managedCursorInfoCompressionThresholdInBytes=16384

# ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY).
# If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly without compression.
# Using compression reduces the size of the persistent topic metadata. When a topic contains a large number of
# individual ledgers in BookKeeper or tiered storage, compression helps prevent the metadata size from exceeding
# the maximum size of a metadata store entry (ZNode in ZooKeeper). This also reduces the overall metadata stored
# in the metadata store such as ZooKeeper.
managedLedgerInfoCompressionType=NONE

# ManagedLedgerInfo compression size threshold (bytes), only compress metadata when origin size more then this value.
# 0 means compression will always apply.
managedLedgerInfoCompressionThresholdInBytes=16384
```

### CLI

A new CLI command is added to `pulsar-admin topics`.

* **Command:** `skip-messages`
* **Description:** Skip some messages for a subscription by their message IDs.
* **Usage:** `pulsar-admin topics skip-messages <topic-name> [options]`
* **Options:**
* `-s, --subscription <subName>` (required): The subscription to skip messages on.
* `--messageId-triplet <ledgerId:entryId[:batchIndex]>` (repeatable): The message ID to skip, specified as a triplet. `batchIndex` is optional.
* `--messageId-base64 <base64-encoded-id>` (repeatable): A Base64-encoded `MessageId`.
* **Example:**
```bash
# Skip a single message for subscription 'my-sub'
pulsar-admin topics skip-messages persistent://public/default/my-topic \
-s my-sub --messageId-triplet 12345:100

# Skip a single message within a batch (batchIndex 3)
pulsar-admin topics skip-messages persistent://public/default/my-topic \
-s my-sub --messageId-triplet 12345:101:3

# Skip multiple messages using different options
pulsar-admin topics skip-messages persistent://public/default/my-topic \
-s my-sub --messageId-triplet 12345:102 --messageId-base64 "CLlgEAQwAA=="
```

# Backward & Forward Compatibility

## Upgrade

This feature is purely additive. It introduces a new admin endpoint and CLI command.
* Upgrading brokers will enable this new functionality. Existing clients and operations are unaffected.
* Clients must be upgraded to a version that includes the new `pulsar-admin` command to use this feature.

## Downgrade / Rollback

* If brokers are downgraded to a version without this feature, the new admin endpoint will no longer be available, and calls to it will fail.
* No persistent state is changed in a way that would prevent a downgrade. Acknowledgement holes created by this API are stored in the same format as those created by regular consumer acknowledgements and will be understood by older broker versions.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

This operation is local to a subscription's cursor within a single cluster. It has no direct impact on geo-replication. To skip a message on a replicated topic in multiple clusters, the admin command must be executed against each cluster individually.

# Alternatives

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/lo182ztgrkzlq6mbkytj8krd050yvb9w
* Mailing List voting thread: https://lists.apache.org/thread/7jbc3h42no9whjrpd6q0kmsyw985d7zo