Skip to content

Commit 2a988fa

Browse files
author
xiangying
committed
[feat][client] pcip-5 Support Pull Consumer
1 parent 892df92 commit 2a988fa

17 files changed

+1426
-11
lines changed

pcip/pcip-5.md

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# PCIP-4: Pull Consumer Implementation for Apache Pulsar
2+
3+
# Background knowledge
4+
5+
- **Pulsar Consumers**: Pulsar currently supports push-based consumption models (exclusive/shared/failover/key-shared).
6+
This proposal adds pull-based consumption.
7+
- **Message Positioning**: Pulsar uses composite MessageIDs (ledgerId + entryId + partitionId), while systems like
8+
Kafka/RocketMQ use monotonic offsets.
9+
- **Offset Mapping**: https://github.com/apache/pulsar/pull/24220 can be used to convert between offsets and Pulsar's
10+
MessageIDs.
11+
12+
# Motivation
13+
14+
System Migration Requirement: The organization plans to migrate from RocketMQ to Pulsar, requiring a unified MQ client
15+
abstraction layer to conceal implementation details and support seamless engine replacement.
16+
17+
Interface Compatibility Issues:
18+
19+
- Pulsar lacks a native offset retrieval interface (pull/fetch model).
20+
- RocketMQ/Kafka use monotonically increasing numeric offsets to locate messages, whereas Pulsar employs a composite
21+
MessageID (ledgerId + entryId + partitionId).
22+
23+
Objective: Implement a RocketMQ-like Pull Consumer to support precise offset control and reduce migration costs.
24+
25+
# Goals
26+
27+
## In Scope
28+
29+
| Goal | Description |
30+
|----------------------------|---------------------------------------------------------------------------|
31+
| **Precise Offset Control** | Supports specifying partition, offset, pull count, and byte size. |
32+
| **Resource Efficiency** | Reuses Reader connections with LRU cache management. |
33+
| **Easy Integration** | Compatible with Pulsar’s existing API, requiring no Broker modifications. |
34+
35+
## Out of Scope
36+
37+
NA
38+
39+
# High Level Design
40+
41+
```mermaid
42+
graph TD
43+
A[PullConsumer] -->|pull| B[Offset Mapping]
44+
B -->|convert| C[MessageID]
45+
C -->|seek| D[Reader Pool]
46+
D -->|fetch| E[Broker]
47+
A -->|ack| F[Offset-Based Ack]
48+
F -->|convert| G[MessageID]
49+
G -->|cumulative ack| H[Consumer]
50+
```
51+
52+
Key components:
53+
54+
1. **`PulsarPullConsumer` interface**: Standard pull consumer API
55+
2. **Offset ↔ MessageID Cache**: Partition-scoped mapping layer
56+
3. **Reader Pool**: Managed resource pool with LRU eviction
57+
4. **Partition Locking**: Thread-safe access coordination
58+
59+
## Detailed Design
60+
61+
### Design & Implementation Details
62+
63+
**Core Interface** `PulsarPullConsumer`:
64+
65+
```java
66+
public interface PulsarPullConsumer<T> extends AutoCloseable {
67+
void start() throws PulsarClientException;
68+
69+
List<Message<T>> pull(PullRequest request);
70+
71+
void ack(long offset, int partition) throws PulsarClientException;
72+
73+
long searchOffset(int partition, long timestamp) throws PulsarAdminException;
74+
75+
long getConsumeStats(int partition) throws PulsarAdminException;
76+
77+
class PullRequest {
78+
private long offset;
79+
private int partition;
80+
private int maxMessages;
81+
private int maxBytes;
82+
private Duration timeout;
83+
}
84+
}
85+
```
86+
87+
**Reader Management** :
88+
89+
```java
90+
public class ReaderCache<T> {
91+
private final Map<String, LoadingCache<Long, Reader<T>>> readerCacheMap;
92+
93+
public Reader<T> getReader(String topic, long offset) {
94+
// 1. Acquire partition lock
95+
// 2. Get/create LRU cache (default: max 100 readers/partition)
96+
// 3. Remove reader from cache for exclusive use
97+
}
98+
99+
public void releaseReader(String topic, long nextOffset, Reader<T> reader) {
100+
// Return reader to cache if still connected
101+
}
102+
}
103+
```
104+
105+
**Offset Mapping**:
106+
107+
```java
108+
public class OffsetToMessageIdCache {
109+
private final Map<String, LoadingCache<Long, MessageId>> partitionCaches;
110+
111+
public MessageId getMessageIdByOffset(String topic, long offset) {
112+
// 1. Check caffeine cache (default: max 1000 entries/partition)
113+
// 2. On cache miss: pulsarAdmin.topics().getMessageIDByOffset()
114+
// 3. Populate cache
115+
}
116+
}
117+
```
118+
119+
### Public-facing Changes
120+
121+
#### Public API
122+
123+
**New Interfaces**:
124+
125+
```java
126+
// Entry point
127+
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
128+
nonPartitionedTopic, subscription,
129+
brokerCluster, Schema.BYTES,
130+
pulsarClient, pulsarAdmin);
131+
132+
// Usage
133+
List<Message<byte[]>> messages = pullConsumer1.pull(
134+
PulsarPullConsumer.PullRequest.builder()
135+
.offset(offset)
136+
.partition(PulsarPullConsumer.PARTITION_NONE)
137+
.maxMessages(10)
138+
.maxBytes(1024 * 1024)
139+
.timeout(java.time.Duration.ofSeconds(10))
140+
.build());
141+
```
142+
143+
## Get Started
144+
145+
### Quick Start
146+
147+
```java
148+
// 1. Create pull consumer
149+
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
150+
nonPartitionedTopic, subscription,
151+
brokerCluster, Schema.BYTES,
152+
pulsarClient, pulsarAdmin);
153+
154+
consumer.start(); // Initialize connections
155+
156+
// 2. Pull messages from partition 0 starting at offset 200
157+
List<Message<byte[]>> batch = pullConsumer1.pull(
158+
PulsarPullConsumer.PullRequest.builder()
159+
.offset(offset)
160+
.partition(PulsarPullConsumer.PARTITION_NONE)
161+
.maxMessages(10)
162+
.maxBytes(1024 * 1024)
163+
.timeout(java.time.Duration.ofSeconds(10))
164+
.build());
165+
);
166+
167+
// 3. Process messages batch.
168+
169+
forEach(msg ->{
170+
System.out.println("Received: "+new String(msg.getData()));
171+
// Store last offset
172+
});
173+
174+
// 4. Acknowledge up to last offset
175+
consumer.ack(250L,0);
176+
177+
// 5. Close resources
178+
consumer.close();
179+
```

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<testcontainers.version>1.20.1</testcontainers.version>
5151
<junit.version>4.13.1</junit.version>
5252
<mockito.version>5.12.0</mockito.version>
53+
<caffeine.version>3.2.0</caffeine.version>
5354
</properties>
5455

5556
<modules>

pulsar-client-common-contrib/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@
3030
<dependency>
3131
<groupId>org.apache.pulsar</groupId>
3232
<artifactId>pulsar-client-all</artifactId>
33+
<version>${pulsar.version}</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>com.github.ben-manes.caffeine</groupId>
37+
<artifactId>caffeine</artifactId>
38+
<version>${caffeine.version}</version>
3339
</dependency>
3440
</dependencies>
3541

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.pulsar.client.api;
15+
16+
import org.apache.pulsar.client.admin.PulsarAdminException;
17+
import org.apache.pulsar.client.common.ConsumeStats;
18+
import org.apache.pulsar.client.common.PullRequest;
19+
import org.apache.pulsar.client.common.PullResponse;
20+
21+
/**
22+
* Pull-based consumer interface with enhanced offset management capabilities.
23+
*
24+
* <p>Features:</p>
25+
* <ul>
26+
* <li>Precise offset control with partition-aware operations</li>
27+
* <li>Thread-safe design for concurrent access</li>
28+
* <li>Support for both partitioned and non-partitioned topics</li>
29+
* <li>Built-in offset to message ID mapping</li>
30+
* </ul>
31+
*
32+
* @param <T> message payload type
33+
*/
34+
public interface PulsarPullConsumer<T> extends AutoCloseable {
35+
36+
/**
37+
* Initializes consumer resources and establishes connections.
38+
*
39+
* @throws PulsarClientException if client initialization fails
40+
*/
41+
void start() throws PulsarClientException;
42+
43+
/**
44+
* Pulls messages from the specified partition starting from the given offset.
45+
*
46+
* @param request pull request configuration
47+
* @return immutable list of messages starting from the specified offset
48+
* @throws IllegalArgumentException for invalid request parameters
49+
*/
50+
PullResponse<T> pull(PullRequest request);
51+
52+
/**
53+
* Acknowledges all messages up to the specified offset (inclusive).
54+
*
55+
* @param offset target offset to acknowledge
56+
* @param partition partition index (use -1 for non-partitioned topics)
57+
* @throws PulsarClientException for acknowledgment failures
58+
* @throws IllegalArgumentException for invalid partition index
59+
*/
60+
void ack(long offset, int partition) throws PulsarClientException;
61+
62+
/**
63+
* Finds the latest message offset before or at the specified timestamp.
64+
*
65+
* @param partition partition index (use -1 for non-partitioned topics)
66+
* @param timestamp target timestamp in milliseconds
67+
* @return corresponding message offset
68+
* @throws PulsarAdminException for admin operation failures
69+
* @throws IllegalArgumentException for invalid partition index
70+
*/
71+
long searchOffset(int partition, long timestamp) throws PulsarAdminException;
72+
73+
/**
74+
* Retrieves consumption statistics for the specified partition.
75+
*
76+
* @param partition partition index (use -1 for non-partitioned topics)
77+
* @return current consumption offset
78+
* @throws PulsarAdminException for stats retrieval failures
79+
* @throws IllegalArgumentException for invalid partition index
80+
*/
81+
ConsumeStats getConsumeStats(int partition) throws PulsarAdminException;
82+
83+
/**
84+
* Releases all resources and closes connections gracefully.
85+
*/
86+
@Override
87+
void close();
88+
}

0 commit comments

Comments
 (0)