Skip to content

Commit ce40d17

Browse files
author
xiangying
committed
[feat][client] Support Pull Consumer
1 parent 892df92 commit ce40d17

File tree

4 files changed

+351
-0
lines changed

4 files changed

+351
-0
lines changed

pulsar-client-common-contrib/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@
3030
<dependency>
3131
<groupId>org.apache.pulsar</groupId>
3232
<artifactId>pulsar-client-all</artifactId>
33+
<version>${pulsar.version}}</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>com.github.ben-manes.caffeine</groupId>
37+
<artifactId>caffeine</artifactId>
38+
<version>${caffeine.version}</version>
3339
</dependency>
3440
</dependencies>
3541

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.apache.pulsar.client.api;
2+
3+
import java.io.IOException;
4+
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
6+
import org.apache.pulsar.client.admin.PulsarAdminException;
7+
8+
public interface PullConsumer<T> {
9+
/**
10+
* Initializes the consumer and establishes connection to brokers.
11+
*
12+
* @throws PulsarClientException if client setup fails
13+
* @throws PulsarAdminException if admin operations fail
14+
*/
15+
void start() throws PulsarClientException, PulsarAdminException;
16+
17+
/**
18+
* Pulls messages from specified partition starting from the given offset (inclusive).
19+
*
20+
* @param offset the starting offset to consume from (inclusive)
21+
* @param partition the target partition index
22+
* @param maxNum maximum number of messages to return
23+
* @param maxSize maximum total size (bytes) of messages to return
24+
* @param timeout maximum wait time per message
25+
* @param timeUnit timeout unit
26+
* @return list of messages starting from the specified offset
27+
* @throws PulsarClientException if read operations fail
28+
* @throws PulsarAdminException if offset mapping fails
29+
*/
30+
List<Message<?>> pull(long offset, int partition, int maxNum, int maxSize,
31+
int timeout, TimeUnit timeUnit)
32+
throws PulsarClientException, PulsarAdminException;
33+
34+
/**
35+
* Acknowledges consumption of all messages up to and including the specified offset.
36+
*
37+
* @param offset the offset to acknowledge (inclusive)
38+
* @param partition the target partition index
39+
* @throws PulsarClientException if acknowledgment fails
40+
*/
41+
void ack(long offset, int partition) throws PulsarClientException;
42+
43+
/**
44+
* Finds the latest message offset before or at the specified timestamp.
45+
*
46+
* @param topic target topic name
47+
* @param partition target partition identifier
48+
* @param timestamp target timestamp (milliseconds since epoch)
49+
* @return the closest offset where message timestamp ≤ specified timestamp
50+
* @throws PulsarAdminException if message lookup fails
51+
*/
52+
long searchOffset(String topic, String partition, long timestamp) throws PulsarAdminException;
53+
54+
/**
55+
* Retrieves consumption statistics for a consumer group.
56+
*
57+
* @param topic target topic name
58+
* @param partition target partition identifier
59+
* @param group consumer group name
60+
* @return the current consumed offset for the group
61+
* @throws PulsarAdminException if statistics retrieval fails
62+
*/
63+
long getConsumeStats(String topic, String partition, String group) throws PulsarAdminException;
64+
65+
/**
66+
* Releases all resources and closes connections.
67+
* Implements AutoCloseable for try-with-resources support.
68+
*
69+
* @throws IOException if graceful shutdown fails
70+
*/
71+
void close() throws IOException;
72+
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package org.apache.pulsar.client.api.impl;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.TimeUnit;
9+
import org.apache.pulsar.client.admin.PulsarAdmin;
10+
import org.apache.pulsar.client.admin.PulsarAdminException;
11+
import org.apache.pulsar.client.api.Consumer;
12+
import org.apache.pulsar.client.api.Message;
13+
import org.apache.pulsar.client.api.MessageId;
14+
import org.apache.pulsar.client.api.MessageIdAdv;
15+
import org.apache.pulsar.client.api.PullConsumer;
16+
import org.apache.pulsar.client.api.PulsarClient;
17+
import org.apache.pulsar.client.api.PulsarClientException;
18+
import org.apache.pulsar.client.api.Reader;
19+
import org.apache.pulsar.client.api.Schema;
20+
import org.apache.pulsar.client.util.OffsetToMessageIdCache;
21+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
22+
23+
public class PullConsumerImpl<T> implements PullConsumer<T> {
24+
public String topic;
25+
public String subscription;
26+
public Schema<T> schema;
27+
private static final String PARTITION_SPLICER = "-partition-";
28+
29+
Map<Integer/*partition*/, Consumer<?>> consumerMap;
30+
Map<Integer/*partition*/, Reader<?>> readerMap;
31+
Map<Integer/*partition*/, Message<?>> lastConsumedOffsetMap;
32+
private final OffsetToMessageIdCache offsetToMessageIdCache;
33+
private final PulsarAdmin pulsarAdmin;
34+
private final PulsarClient pulsarClient;
35+
private int currentPartitionNum = 0;
36+
37+
public PullConsumerImpl(String topic, String subscription, Schema<T> schema, PulsarClient client,
38+
PulsarAdmin admin) {
39+
this.topic = topic;
40+
this.subscription = subscription;
41+
this.schema = schema;
42+
this.consumerMap = new ConcurrentHashMap<>();
43+
this.readerMap = new ConcurrentHashMap<>();
44+
this.lastConsumedOffsetMap = new ConcurrentHashMap<>();
45+
this.pulsarClient = client;
46+
this.pulsarAdmin = admin;
47+
this.offsetToMessageIdCache = new OffsetToMessageIdCache(admin);
48+
}
49+
50+
@Override
51+
public void start() throws PulsarClientException, PulsarAdminException {
52+
discoverPartition();
53+
}
54+
55+
// todo: handle no-partition topic
56+
private void discoverPartition() throws PulsarClientException, PulsarAdminException {
57+
PartitionedTopicMetadata partitionedTopicMetadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
58+
int numPartitions = partitionedTopicMetadata.partitions;
59+
// Check if the number of partitions has changed
60+
for (int i = currentPartitionNum; i < numPartitions; i++) {
61+
String partitionTopic = topic + PARTITION_SPLICER + i;
62+
Consumer<?> consumer = pulsarClient.newConsumer(schema)
63+
.topic(partitionTopic)
64+
.subscriptionName(subscription)
65+
.subscribe();
66+
consumerMap.put(i, consumer);
67+
68+
Reader<?> reader = pulsarClient.newReader(schema)
69+
.topic(partitionTopic)
70+
.startMessageId(MessageId.earliest)
71+
.create();
72+
readerMap.put(i, reader);
73+
}
74+
this.currentPartitionNum = numPartitions;
75+
}
76+
77+
@Override
78+
public List<Message<?>> pull(long offset, int partition, int maxNum, int maxSize,
79+
int timeout, TimeUnit timeUnit)
80+
throws PulsarClientException, PulsarAdminException {
81+
List<Message<?>> messages = new ArrayList<>();
82+
int totalSize = 0;
83+
String partitionTopic = topic + PARTITION_SPLICER + partition;
84+
Reader<?> reader = readerMap.get(partition);
85+
// Null check for reader to handle partition initialization issues
86+
if (reader == null) {
87+
discoverPartition();
88+
reader = readerMap.get(partition);
89+
if (reader == null) {
90+
throw new PulsarClientException("Reader not found for partition: " + partition);
91+
}
92+
}
93+
94+
// Handle initial consumption or offset discontinuity
95+
if (lastConsumedOffsetMap.get(partition) == null
96+
|| lastConsumedOffsetMap.get(partition).getIndex().isEmpty()) {
97+
// When seeking to a specific offset, consumption starts FROM that offset.
98+
// Therefore, seek to previous offset to start at the desired position.
99+
MessageId messageId = offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset - 1);
100+
reader.seek(messageId);
101+
} else if (lastConsumedOffsetMap.get(partition).getIndex().get() != offset - 1) {
102+
// If current offset is not continuous with previous consumption,
103+
// seek to the exact offset
104+
MessageId messageId = offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset - 1);
105+
reader.seek(messageId);
106+
}
107+
108+
Message<?> lastMessage = null;
109+
long startTime = System.nanoTime();
110+
111+
while (messages.size() < maxNum && totalSize < maxSize) {
112+
long elapsed = System.nanoTime() - startTime;
113+
long remainingNanos = Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, timeUnit) - elapsed);
114+
115+
Message<?> message = reader.readNext(Math.toIntExact(TimeUnit.MILLISECONDS.convert(remainingNanos, TimeUnit.NANOSECONDS)),
116+
TimeUnit.MILLISECONDS);
117+
if (message == null) break;
118+
messages.add(message);
119+
lastMessage = message;
120+
totalSize += message.getData().length;
121+
}
122+
123+
if (lastMessage != null) {
124+
lastConsumedOffsetMap.put(partition, lastMessage);
125+
}
126+
return messages;
127+
}
128+
129+
@Override
130+
public void ack(long offset, int partition) throws PulsarClientException {
131+
String partitionTopic = topic + PARTITION_SPLICER + partition;
132+
Consumer<?> consumer = consumerMap.get(partition);
133+
MessageId messageId = offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset);
134+
consumer.acknowledgeCumulative(messageId);
135+
}
136+
137+
@Override
138+
public long searchOffset(String topic, String partition, long timestamp) throws PulsarAdminException {
139+
String partitionTopic = topic + PARTITION_SPLICER + partition;
140+
MessageIdAdv messageId = (MessageIdAdv) pulsarAdmin.topics().getMessageIdByTimestamp(partitionTopic, timestamp);
141+
Message<?> message = pulsarAdmin.topics()
142+
.getMessagesById(partitionTopic, messageId.getLedgerId(), messageId.getEntryId())
143+
.getLast();
144+
if (message == null || message.getIndex().isEmpty()) {
145+
throw new IllegalArgumentException("The message is not found");
146+
}
147+
offsetToMessageIdCache.putMessageIdByOffset(partitionTopic, message.getIndex().get(), message.getMessageId());
148+
return message.getIndex().get();
149+
}
150+
151+
@Override
152+
public long getConsumeStats(String topic, String partition, String group) throws PulsarAdminException {
153+
String partitionTopic = topic + PARTITION_SPLICER + partition;
154+
155+
// Get partition stats using proper partition identifier
156+
String messageIdString = pulsarAdmin.topics().getPartitionedInternalStats(topic)
157+
.partitions.get(partitionTopic)
158+
.cursors
159+
.get(subscription)
160+
.markDeletePosition;
161+
162+
// Handle potential format errors
163+
if (!messageIdString.contains(":")) {
164+
throw new PulsarAdminException("Invalid message ID format: " + messageIdString);
165+
}
166+
167+
String[] ids = messageIdString.split(":");
168+
try {
169+
long ledgerId = Long.parseLong(ids[0]);
170+
long entryId = Long.parseLong(ids[1]);
171+
172+
// Use partition-specific topic to retrieve message
173+
Message<?> message = pulsarAdmin.topics().getMessagesById(partitionTopic, ledgerId, entryId).getLast();
174+
if (message == null || message.getIndex().isEmpty()) {
175+
throw new PulsarAdminException("Message not found for offset: " + messageIdString);
176+
}
177+
178+
offsetToMessageIdCache.putMessageIdByOffset(partitionTopic, message.getIndex().get(), message.getMessageId());
179+
return message.getIndex().get();
180+
} catch (NumberFormatException e) {
181+
throw new PulsarAdminException("Invalid message ID components: " + messageIdString, e);
182+
}
183+
}
184+
185+
// Add resource cleanup method
186+
@Override
187+
public void close() throws IOException {
188+
// Close all consumers
189+
for (Consumer<?> consumer : consumerMap.values()) {
190+
consumer.close();
191+
}
192+
193+
// Close all readers
194+
for (Reader<?> reader : readerMap.values()) {
195+
reader.close();
196+
}
197+
offsetToMessageIdCache.cleanup();
198+
}
199+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.apache.pulsar.client.util;
2+
3+
import com.github.benmanes.caffeine.cache.Caffeine;
4+
import com.github.benmanes.caffeine.cache.LoadingCache;
5+
import com.github.benmanes.caffeine.cache.stats.CacheStats;
6+
import java.time.Duration;
7+
import java.util.Map;
8+
import java.util.Objects;
9+
import java.util.concurrent.CompletionException;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import org.apache.pulsar.client.admin.PulsarAdmin;
12+
import org.apache.pulsar.client.admin.PulsarAdminException;
13+
import org.apache.pulsar.client.api.MessageId;
14+
15+
public class OffsetToMessageIdCache {
16+
private static final int MAX_CACHE_SIZE = 1000;
17+
private static final Duration EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5);
18+
19+
private final PulsarAdmin pulsarAdmin;
20+
private final Map<String, LoadingCache<Long, MessageId>> caffeineCacheMap = new ConcurrentHashMap<>();
21+
22+
public OffsetToMessageIdCache(PulsarAdmin pulsarAdmin) {
23+
this.pulsarAdmin = Objects.requireNonNull(pulsarAdmin, "PulsarAdmin must not be null");
24+
}
25+
26+
private LoadingCache<Long, MessageId> createCache(String partitionTopic) {
27+
return Caffeine.newBuilder()
28+
.maximumSize(MAX_CACHE_SIZE)
29+
.expireAfterAccess(EXPIRE_AFTER_ACCESS)
30+
.recordStats()
31+
.build(key -> loadMessageId(partitionTopic, key));
32+
}
33+
34+
private MessageId loadMessageId(String partitionTopic, Long offset) {
35+
try {
36+
// todo: waiting for https://github.com/apache/pulsar/pull/24220
37+
return pulsarAdmin.topics().getMessageIDByOffsetAndPartitionID(partitionTopic, offset);
38+
} catch (PulsarAdminException e) {
39+
throw new CompletionException("Failed to load message ID", e);
40+
}
41+
}
42+
43+
public MessageId getMessageIdByOffset(String partitionTopic, long offset) {
44+
LoadingCache<Long, MessageId> cache = caffeineCacheMap.computeIfAbsent(
45+
partitionTopic,
46+
this::createCache
47+
);
48+
return cache.get(offset);
49+
}
50+
51+
public void putMessageIdByOffset(String partitionTopic, long offset, MessageId messageId) {
52+
LoadingCache<Long, MessageId> caffeineCache = caffeineCacheMap.computeIfAbsent(partitionTopic,
53+
k -> createCache(partitionTopic));
54+
caffeineCache.put(offset, messageId);
55+
}
56+
57+
public double getGlobalHitRate() {
58+
long totalHits = 0;
59+
long totalRequests = 0;
60+
for (LoadingCache<Long, MessageId> cache : caffeineCacheMap.values()) {
61+
CacheStats stats = cache.stats();
62+
totalHits += stats.hitCount();
63+
totalRequests += stats.requestCount(); // 等同于 hitCount + missCount
64+
}
65+
66+
return (totalRequests == 0) ? 0.0 : (double) totalHits / totalRequests;
67+
}
68+
69+
public void cleanup() {
70+
for (LoadingCache<Long, MessageId> cache : caffeineCacheMap.values()) {
71+
cache.cleanUp();
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)