Skip to content

Commit 555ae3d

Browse files
author
xiangying
committed
pcip-4
1 parent 484c192 commit 555ae3d

File tree

1 file changed

+179
-0
lines changed

1 file changed

+179
-0
lines changed

pcip/pcip-4.md

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# PCIP-4: Pull Consumer Implementation for Apache Pulsar
2+
3+
# Background knowledge
4+
5+
- **Pulsar Consumers**: Pulsar currently supports push-based consumption models (exclusive/shared/failover/key-shared).
6+
This proposal adds pull-based consumption.
7+
- **Message Positioning**: Pulsar uses composite MessageIDs (ledgerId + entryId + partitionId), while systems like
8+
Kafka/RocketMQ use monotonic offsets.
9+
- **Offset Mapping**: https://github.com/apache/pulsar/pull/24220 can be used to convert between offsets and Pulsar's
10+
MessageIDs.
11+
12+
# Motivation
13+
14+
System Migration Requirement: The organization plans to migrate from RocketMQ to Pulsar, requiring a unified MQ client
15+
abstraction layer to conceal implementation details and support seamless engine replacement.
16+
17+
Interface Compatibility Issues:
18+
19+
- Pulsar lacks a native offset retrieval interface (pull/fetch model).
20+
- RocketMQ/Kafka use monotonically increasing numeric offsets to locate messages, whereas Pulsar employs a composite
21+
MessageID (ledgerId + entryId + partitionId).
22+
23+
Objective: Implement a RocketMQ-like Pull Consumer to support precise offset control and reduce migration costs.
24+
25+
# Goals
26+
27+
## In Scope
28+
29+
| Goal | Description |
30+
|----------------------------|---------------------------------------------------------------------------|
31+
| **Precise Offset Control** | Supports specifying partition, offset, pull count, and byte size. |
32+
| **Resource Efficiency** | Reuses Reader connections with LRU cache management. |
33+
| **Easy Integration** | Compatible with Pulsar’s existing API, requiring no Broker modifications. |
34+
35+
## Out of Scope
36+
37+
NA
38+
39+
# High Level Design
40+
41+
```mermaid
42+
graph TD
43+
A[PullConsumer] -->|pull| B[Offset Mapping]
44+
B -->|convert| C[MessageID]
45+
C -->|seek| D[Reader Pool]
46+
D -->|fetch| E[Broker]
47+
A -->|ack| F[Offset-Based Ack]
48+
F -->|convert| G[MessageID]
49+
G -->|cumulative ack| H[Consumer]
50+
```
51+
52+
Key components:
53+
54+
1. **`PulsarPullConsumer` interface**: Standard pull consumer API
55+
2. **Offset ↔ MessageID Cache**: Partition-scoped mapping layer
56+
3. **Reader Pool**: Managed resource pool with LRU eviction
57+
4. **Partition Locking**: Thread-safe access coordination
58+
59+
## Detailed Design
60+
61+
### Design & Implementation Details
62+
63+
**Core Interface** `PulsarPullConsumer`:
64+
65+
```java
66+
public interface PulsarPullConsumer<T> extends AutoCloseable {
67+
void start() throws PulsarClientException;
68+
69+
List<Message<T>> pull(PullRequest request);
70+
71+
void ack(long offset, int partition) throws PulsarClientException;
72+
73+
long searchOffset(int partition, long timestamp) throws PulsarAdminException;
74+
75+
long getConsumeStats(int partition) throws PulsarAdminException;
76+
77+
class PullRequest {
78+
private long offset;
79+
private int partition;
80+
private int maxMessages;
81+
private int maxBytes;
82+
private Duration timeout;
83+
}
84+
}
85+
```
86+
87+
**Reader Management** :
88+
89+
```java
90+
public class ReaderCache<T> {
91+
private final Map<String, LoadingCache<Long, Reader<T>>> readerCacheMap;
92+
93+
public Reader<T> getReader(String topic, long offset) {
94+
// 1. Acquire partition lock
95+
// 2. Get/create LRU cache (default: max 100 readers/partition)
96+
// 3. Remove reader from cache for exclusive use
97+
}
98+
99+
public void releaseReader(String topic, long nextOffset, Reader<T> reader) {
100+
// Return reader to cache if still connected
101+
}
102+
}
103+
```
104+
105+
**Offset Mapping**:
106+
107+
```java
108+
public class OffsetToMessageIdCache {
109+
private final Map<String, LoadingCache<Long, MessageId>> partitionCaches;
110+
111+
public MessageId getMessageIdByOffset(String topic, long offset) {
112+
// 1. Check caffeine cache (default: max 1000 entries/partition)
113+
// 2. On cache miss: pulsarAdmin.topics().getMessageIDByOffset()
114+
// 3. Populate cache
115+
}
116+
}
117+
```
118+
119+
### Public-facing Changes
120+
121+
#### Public API
122+
123+
**New Interfaces**:
124+
125+
```java
126+
// Entry point
127+
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
128+
nonPartitionedTopic, subscription,
129+
brokerCluster, Schema.BYTES,
130+
pulsarClient, pulsarAdmin);
131+
132+
// Usage
133+
List<Message<byte[]>> messages = pullConsumer1.pull(
134+
PulsarPullConsumer.PullRequest.builder()
135+
.offset(offset)
136+
.partition(PulsarPullConsumer.PARTITION_NONE)
137+
.maxMessages(10)
138+
.maxBytes(1024 * 1024)
139+
.timeout(java.time.Duration.ofSeconds(10))
140+
.build());
141+
```
142+
143+
## Get Started
144+
145+
### Quick Start
146+
147+
```java
148+
// 1. Create pull consumer
149+
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
150+
nonPartitionedTopic, subscription,
151+
brokerCluster, Schema.BYTES,
152+
pulsarClient, pulsarAdmin);
153+
154+
consumer.start(); // Initialize connections
155+
156+
// 2. Pull messages from partition 0 starting at offset 200
157+
List<Message<byte[]>> batch = pullConsumer1.pull(
158+
PulsarPullConsumer.PullRequest.builder()
159+
.offset(offset)
160+
.partition(PulsarPullConsumer.PARTITION_NONE)
161+
.maxMessages(10)
162+
.maxBytes(1024 * 1024)
163+
.timeout(java.time.Duration.ofSeconds(10))
164+
.build());
165+
);
166+
167+
// 3. Process messages batch.
168+
169+
forEach(msg ->{
170+
System.out.println("Received: "+new String(msg.getData()));
171+
// Store last offset
172+
});
173+
174+
// 4. Acknowledge up to last offset
175+
consumer.ack(250L,0);
176+
177+
// 5. Close resources
178+
consumer.close();
179+
```

0 commit comments

Comments
 (0)