Skip to content

Commit f9f71d8

Browse files
committed
[feat][client] PCIP-5 Support Pull Consumer
1 parent c3cf0b2 commit f9f71d8

17 files changed

+1556
-11
lines changed

pcip/pcip-5.md

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# PCIP-5: Pull Consumer Implementation for Apache Pulsar
2+
3+
# Background knowledge
4+
5+
- **Pulsar Consumers**: Pulsar currently supports push-based consumption models (exclusive/shared/failover/key-shared).
6+
This proposal adds pull-based consumption.
7+
- **Message Positioning**: Pulsar uses composite MessageIDs (ledgerId + entryId + partitionId), while systems like
8+
Kafka/RocketMQ use monotonic offsets.
9+
- **Offset Mapping**: https://github.com/apache/pulsar/pull/24220 can be used to convert between offsets and Pulsar's
10+
MessageIDs.
11+
12+
# Motivation
13+
14+
System Migration Requirement: The organization plans to migrate from RocketMQ to Pulsar, requiring a unified MQ client
15+
abstraction layer to conceal implementation details and support seamless engine replacement.
16+
17+
Interface Compatibility Issues:
18+
19+
- Pulsar lacks a native offset retrieval interface (pull/fetch model).
20+
- RocketMQ/Kafka use monotonically increasing numeric offsets to locate messages, whereas Pulsar employs a composite
21+
MessageID (ledgerId + entryId + partitionId).
22+
23+
Objective: Implement a RocketMQ-like Pull Consumer to support precise offset control and reduce migration costs.
24+
25+
# Goals
26+
27+
## In Scope
28+
29+
| Goal | Description |
30+
|----------------------------|---------------------------------------------------------------------------|
31+
| **Precise Offset Control** | Supports specifying partition, offset, pull count, and byte size. |
32+
| **Resource Efficiency** | Reuses Reader connections with LRU cache management. |
33+
| **Easy Integration** | Compatible with Pulsar’s existing API, requiring no Broker modifications. |
34+
35+
## Out of Scope
36+
37+
NA
38+
39+
# High Level Design
40+
41+
```mermaid
42+
graph TD
43+
A[PullConsumer] -->|pull| B[Offset Mapping]
44+
B -->|convert| C[MessageID]
45+
C -->|seek| D[Reader Pool]
46+
D -->|fetch| E[Broker]
47+
A -->|ack| F[Offset-Based Ack]
48+
F -->|convert| G[MessageID]
49+
G -->|cumulative ack| H[Consumer]
50+
```
51+
52+
Key components:
53+
54+
1. **`PulsarPullConsumer` interface**: Standard pull consumer API
55+
2. **Offset ↔ MessageID Cache**: Partition-scoped mapping layer
56+
3. **Reader Pool**: Managed resource pool with LRU eviction
57+
4. **Partition Locking**: Thread-safe access coordination
58+
59+
## Detailed Design
60+
61+
### Design & Implementation Details
62+
63+
**Core Interface** `PulsarPullConsumer`:
64+
65+
```java
66+
public interface PulsarPullConsumer<T> extends AutoCloseable {
67+
void start() throws PulsarClientException;
68+
69+
List<Message<T>> pull(PullRequest request);
70+
71+
void ack(long offset, int partition) throws PulsarClientException;
72+
73+
long searchOffset(int partition, long timestamp) throws PulsarAdminException;
74+
75+
long getConsumeStats(int partition) throws PulsarAdminException;
76+
77+
class PullRequest {
78+
private long offset;
79+
private int partition;
80+
private int maxMessages;
81+
private int maxBytes;
82+
private Duration timeout;
83+
}
84+
}
85+
```
86+
87+
**Reader Management** :
88+
89+
```java
90+
public class ReaderCache<T> {
91+
private final Map<String, LoadingCache<Long, Reader<T>>> readerCacheMap;
92+
93+
public Reader<T> getReader(String topic, long offset) {
94+
// 1. Acquire partition lock
95+
// 2. Get/create LRU cache (default: max 100 readers/partition)
96+
// 3. Remove reader from cache for exclusive use
97+
}
98+
99+
public void releaseReader(String topic, long nextOffset, Reader<T> reader) {
100+
// Return reader to cache if still connected
101+
}
102+
}
103+
```
104+
105+
**Offset Mapping**:
106+
107+
```java
108+
public class OffsetToMessageIdCache {
109+
private final Map<String, LoadingCache<Long, MessageId>> partitionCaches;
110+
111+
public MessageId getMessageIdByOffset(String topic, long offset) {
112+
// 1. Check caffeine cache (default: max 1000 entries/partition)
113+
// 2. On cache miss: pulsarAdmin.topics().getMessageIDByOffset()
114+
// 3. Populate cache
115+
}
116+
}
117+
```
118+
119+
### Public-facing Changes
120+
121+
#### Public API
122+
123+
**New Interfaces**:
124+
125+
```java
126+
// Entry point
127+
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
128+
nonPartitionedTopic, subscription,
129+
brokerCluster, Schema.BYTES,
130+
pulsarClient, pulsarAdmin);
131+
132+
// Usage
133+
List<Message<byte[]>> messages = pullConsumer1.pull(
134+
PulsarPullConsumer.PullRequest.builder()
135+
.offset(offset)
136+
.partition(PulsarPullConsumer.PARTITION_NONE)
137+
.maxMessages(10)
138+
.maxBytes(1024 * 1024)
139+
.timeout(java.time.Duration.ofSeconds(10))
140+
.build());
141+
```
142+
143+
## Get Started
144+
145+
### Quick Start
146+
147+
```java
148+
// 1. Create pull consumer
149+
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
150+
nonPartitionedTopic, subscription,
151+
brokerCluster, Schema.BYTES,
152+
pulsarClient, pulsarAdmin);
153+
154+
consumer.start(); // Initialize connections
155+
156+
// 2. Pull messages from partition 0 starting at offset 200
157+
List<Message<byte[]>> batch = pullConsumer1.pull(
158+
PulsarPullConsumer.PullRequest.builder()
159+
.offset(offset)
160+
.partition(PulsarPullConsumer.PARTITION_NONE)
161+
.maxMessages(10)
162+
.maxBytes(1024 * 1024)
163+
.timeout(java.time.Duration.ofSeconds(10))
164+
.build());
165+
);
166+
167+
// 3. Process messages batch.
168+
169+
forEach(msg ->{
170+
System.out.println("Received: "+new String(msg.getData()));
171+
// Store last offset
172+
});
173+
174+
// 4. Acknowledge up to last offset
175+
consumer.ack(250L,0);
176+
177+
// 5. Close resources
178+
consumer.close();
179+
```

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
<testcontainers.version>1.20.1</testcontainers.version>
6363
<junit.version>4.13.1</junit.version>
6464
<mockito.version>5.12.0</mockito.version>
65+
<caffeine.version>3.2.0</caffeine.version>
6566
</properties>
6667

6768
<dependencyManagement>

pulsar-client-common-contrib/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,21 @@
2626
<artifactId>pulsar-client-common-contrib</artifactId>
2727

2828
<dependencies>
29+
<dependency>
30+
<groupId>com.github.ben-manes.caffeine</groupId>
31+
<artifactId>caffeine</artifactId>
32+
<version>${caffeine.version}</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.pulsar</groupId>
36+
<artifactId>pulsar-client-admin</artifactId>
37+
<version>${pulsar.version}</version>
38+
<scope>test</scope>
39+
</dependency>
2940
<dependency>
3041
<groupId>org.apache.pulsar</groupId>
3142
<artifactId>pulsar-client-all</artifactId>
43+
<version>${pulsar.version}</version>
3244
</dependency>
3345
</dependencies>
3446
<inceptionYear>2024</inceptionYear>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.pulsar.client.api;
15+
16+
import org.apache.pulsar.client.admin.PulsarAdminException;
17+
import org.apache.pulsar.client.common.ConsumeStats;
18+
import org.apache.pulsar.client.common.PullRequest;
19+
import org.apache.pulsar.client.common.PullResponse;
20+
import org.apache.pulsar.common.api.proto.CommandAck;
21+
22+
/**
23+
* Pull-based consumer interface with enhanced offset management capabilities.
24+
*
25+
* <p>Features:
26+
*
27+
* <ul>
28+
* <li>Precise offset control with partition-aware operations
29+
* <li>Thread-safe design for concurrent access
30+
* <li>Support for both partitioned and non-partitioned topics
31+
* <li>Built-in offset to message ID mapping
32+
* </ul>
33+
*
34+
* @param <T> message payload type
35+
*/
36+
public interface PulsarPullConsumer<T> extends AutoCloseable {
37+
38+
/**
39+
* Initializes consumer resources and establishes connections.
40+
*
41+
* @throws PulsarClientException if client initialization fails
42+
*/
43+
void start() throws PulsarClientException;
44+
45+
/**
46+
* Pulls messages from the specified partition starting from the given offset.
47+
*
48+
* @param request pull request configuration
49+
* @return immutable list of messages starting from the specified offset
50+
* @throws IllegalArgumentException for invalid request parameters
51+
*/
52+
PullResponse<T> pull(PullRequest request);
53+
54+
/**
55+
* Acknowledges all messages up to the specified offset (inclusive).
56+
*
57+
* @param offset target offset to acknowledge
58+
* @param partition partition index (use -1 for non-partitioned topics)
59+
* @throws PulsarClientException for acknowledgment failures
60+
* @throws IllegalArgumentException for invalid partition index
61+
*/
62+
void ack(long offset, int partition) throws PulsarClientException;
63+
64+
/**
65+
* Acknowledges all messages up to the specified offset (inclusive).
66+
*
67+
* @param offset target offset to acknowledge
68+
* @param partition partition index (use -1 for non-partitioned topics)
69+
* @param ackType ackType Individual(0),Cumulative(1);
70+
* @throws PulsarClientException for acknowledgment failures
71+
* @throws IllegalArgumentException for invalid partition index
72+
*/
73+
void ack(long offset, int partition, CommandAck.AckType ackType) throws PulsarClientException;
74+
75+
/**
76+
* Finds the latest message offset before or at the specified timestamp.
77+
*
78+
* @param partition partition index (use -1 for non-partitioned topics)
79+
* @param timestamp target timestamp in milliseconds
80+
* @return corresponding message offset
81+
* @throws PulsarAdminException for admin operation failures
82+
* @throws IllegalArgumentException for invalid partition index
83+
*/
84+
long searchOffset(int partition, long timestamp) throws PulsarAdminException;
85+
86+
/**
87+
* Retrieves consumption statistics for the specified partition.
88+
*
89+
* @param partition partition index (use -1 for non-partitioned topics)
90+
* @return current consumption offset
91+
* @throws PulsarAdminException for stats retrieval failures
92+
* @throws IllegalArgumentException for invalid partition index
93+
*/
94+
ConsumeStats getConsumeStats(int partition) throws PulsarAdminException;
95+
96+
/** Releases all resources and closes connections gracefully. */
97+
@Override
98+
void close();
99+
}

0 commit comments

Comments
 (0)