Skip to content

Commit 809f918

Browse files
author
xiangying
committed
[feat][client] Support Pull Consumer
1 parent 892df92 commit 809f918

File tree

7 files changed

+431
-0
lines changed

7 files changed

+431
-0
lines changed

pulsar-client-common-contrib/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@
3030
<dependency>
3131
<groupId>org.apache.pulsar</groupId>
3232
<artifactId>pulsar-client-all</artifactId>
33+
<version>${pulsar.version}}</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>com.github.ben-manes.caffeine</groupId>
37+
<artifactId>caffeine</artifactId>
38+
<version>${caffeine.version}</version>
3339
</dependency>
3440
</dependencies>
3541

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.apache.pulsar.client.api;
2+
3+
import java.io.IOException;
4+
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
6+
import org.apache.pulsar.client.admin.PulsarAdminException;
7+
8+
public interface PulsarPullConsumer<T> {
9+
/**
10+
* Initializes the consumer and establishes connection to brokers.
11+
*
12+
* @throws PulsarClientException if client setup fails
13+
* @throws PulsarAdminException if admin operations fail
14+
*/
15+
void start() throws PulsarClientException, PulsarAdminException;
16+
17+
/**
18+
* Pulls messages from specified partition starting from the given offset (inclusive).
19+
*
20+
* @param offset the starting offset to consume from (inclusive)
21+
* @param partition the target partition index, -1 means non-partitioned topic
22+
* @param maxNum maximum number of messages to return
23+
* @param maxSize maximum total size (bytes) of messages to return
24+
* @param timeout maximum wait time per message
25+
* @param timeUnit timeout unit
26+
* @return list of messages starting from the specified offset
27+
* @throws PulsarClientException if read operations fail
28+
* @throws PulsarAdminException if offset mapping fails
29+
*/
30+
List<Message<?>> pull(long offset, int partition, int maxNum, int maxSize,
31+
int timeout, TimeUnit timeUnit)
32+
throws PulsarClientException, PulsarAdminException;
33+
34+
/**
35+
* Acknowledges consumption of all messages up to and including the specified offset.
36+
*
37+
* @param offset the offset to acknowledge (inclusive)
38+
* @param partition the target partition index, -1 means non-partitioned topic
39+
* @throws PulsarClientException if acknowledgment fails
40+
*/
41+
void ack(long offset, int partition) throws PulsarClientException;
42+
43+
/**
44+
* Finds the latest message offset before or at the specified timestamp.
45+
*
46+
* @param topic target topic name
47+
* @param partition the target partition index, -1 means non-partitioned topic
48+
* @param timestamp target timestamp (milliseconds since epoch)
49+
* @return the closest offset where message timestamp ≤ specified timestamp
50+
* @throws PulsarAdminException if message lookup fails
51+
*/
52+
long searchOffset(String topic, int partition, long timestamp) throws PulsarAdminException;
53+
54+
/**
55+
* Retrieves consumption statistics for a consumer group.
56+
*
57+
* @param topic target topic name
58+
* @param partition the target partition index, -1 means non-partitioned topic
59+
* @param group consumer group name
60+
* @return the current consumed offset for the group
61+
* @throws PulsarAdminException if statistics retrieval fails
62+
*/
63+
long getConsumeStats(String topic, Integer partition, String group) throws PulsarAdminException;
64+
65+
/**
66+
* Releases all resources and closes connections.
67+
* Implements AutoCloseable for try-with-resources support.
68+
*
69+
* @throws IOException if graceful shutdown fails
70+
*/
71+
void close() throws IOException;
72+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package org.apache.pulsar.client.api.impl;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.TimeUnit;
9+
import org.apache.pulsar.client.admin.PulsarAdmin;
10+
import org.apache.pulsar.client.admin.PulsarAdminException;
11+
import org.apache.pulsar.client.api.Consumer;
12+
import org.apache.pulsar.client.api.Message;
13+
import org.apache.pulsar.client.api.MessageId;
14+
import org.apache.pulsar.client.api.MessageIdAdv;
15+
import org.apache.pulsar.client.api.PulsarPullConsumer;
16+
import org.apache.pulsar.client.api.PulsarClient;
17+
import org.apache.pulsar.client.api.PulsarClientException;
18+
import org.apache.pulsar.client.api.Reader;
19+
import org.apache.pulsar.client.api.Schema;
20+
import org.apache.pulsar.client.util.OffsetToMessageIdCache;
21+
import org.apache.pulsar.client.util.OffsetToMessageIdCacheProvider;
22+
import org.apache.pulsar.client.util.ReaderCache;
23+
import org.apache.pulsar.client.util.ReaderCacheProvider;
24+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
25+
26+
public class PulsarPullConsumerImpl<T> implements PulsarPullConsumer<T> {
27+
private final String topic;
28+
private final String subscription;
29+
private final String brokerCluster;
30+
private final Schema<T> schema;
31+
private static final String PARTITION_SPLICER = "-partition-";
32+
private int partitionNum;
33+
34+
Map<String, Consumer<?>> consumerMap;
35+
private final OffsetToMessageIdCache offsetToMessageIdCache;
36+
private final ReaderCache<T> readerCache;
37+
38+
private final PulsarAdmin pulsarAdmin;
39+
private final PulsarClient pulsarClient;
40+
41+
public PulsarPullConsumerImpl(String topic, String subscription, Schema<T> schema, PulsarClient client,
42+
PulsarAdmin admin, String brokerCluster) {
43+
this.topic = topic;
44+
this.subscription = subscription;
45+
this.brokerCluster = brokerCluster;
46+
this.schema = schema;
47+
this.consumerMap = new ConcurrentHashMap<>();
48+
this.pulsarClient = client;
49+
this.pulsarAdmin = admin;
50+
this.offsetToMessageIdCache = OffsetToMessageIdCacheProvider.getOffsetToMessageIdCache(admin, brokerCluster);
51+
this.readerCache = ReaderCacheProvider.getReaderCache(brokerCluster, schema, client, offsetToMessageIdCache);
52+
}
53+
54+
@Override
55+
public void start() throws PulsarClientException, PulsarAdminException {
56+
discoverPartition();
57+
}
58+
59+
private void discoverPartition() throws PulsarClientException, PulsarAdminException {
60+
PartitionedTopicMetadata partitionedTopicMetadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
61+
this.partitionNum = partitionedTopicMetadata.partitions;
62+
if (partitionNum == 0) {
63+
Consumer<?> consumer = pulsarClient.newConsumer(schema)
64+
.topic(topic)
65+
.subscriptionName(subscription)
66+
.subscribe();
67+
consumerMap.put(topic, consumer);
68+
return;
69+
}
70+
71+
for (int i = 0; i < partitionNum; i++) {
72+
String partitionTopic = getPartitionTopicName(topic, i);
73+
Consumer<?> consumer = pulsarClient.newConsumer(schema)
74+
.topic(partitionTopic)
75+
.subscriptionName(subscription)
76+
.subscribe();
77+
consumerMap.put(partitionTopic, consumer);
78+
}
79+
}
80+
81+
@Override
82+
public List<Message<?>> pull(long offset, int partition, int maxNum, int maxSize,
83+
int timeout, TimeUnit timeUnit)
84+
throws PulsarClientException {
85+
List<Message<?>> messages = new ArrayList<>();
86+
int totalSize = 0;
87+
String partitionTopic = getPartitionTopicName(topic, partition);
88+
Reader<T> reader = readerCache.getReaderByOffset(partitionTopic, offset);
89+
90+
Message<?> lastMessage = null;
91+
long startTime = System.nanoTime();
92+
93+
while (messages.size() < maxNum && totalSize < maxSize) {
94+
long elapsed = System.nanoTime() - startTime;
95+
long remainingNanos = Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, timeUnit) - elapsed);
96+
97+
Message<?> message = reader.readNext(Math.toIntExact(TimeUnit.MILLISECONDS.convert(remainingNanos, TimeUnit.NANOSECONDS)),
98+
TimeUnit.MILLISECONDS);
99+
if (message == null) break;
100+
messages.add(message);
101+
lastMessage = message;
102+
totalSize += message.getData().length;
103+
}
104+
if (lastMessage != null) {
105+
readerCache.putReaderByOffset(partitionTopic, lastMessage.getIndex().get() + 1, reader);
106+
}
107+
return messages;
108+
}
109+
110+
@Override
111+
public void ack(long offset, int partition) throws PulsarClientException {
112+
String partitionTopic = getPartitionTopicName(topic, partition);
113+
Consumer<?> consumer = consumerMap.get(partitionTopic);
114+
MessageId messageId = offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset);
115+
consumer.acknowledgeCumulative(messageId);
116+
}
117+
118+
@Override
119+
public long searchOffset(String topic, int partition, long timestamp) throws PulsarAdminException {
120+
String partitionTopic = getPartitionTopicName(topic, partition);
121+
MessageIdAdv messageId = (MessageIdAdv) pulsarAdmin.topics().getMessageIdByTimestamp(partitionTopic, timestamp);
122+
List<Message<byte[]>> messages = pulsarAdmin.topics()
123+
.getMessagesById(partitionTopic, messageId.getLedgerId(), messageId.getEntryId());
124+
if (messages == null || messages.isEmpty()) {
125+
throw new IllegalArgumentException("The message is not found");
126+
}
127+
Message<byte[]> message = messages.get(messages.size() - 1);
128+
if (message.getIndex().isEmpty()) {
129+
throw new IllegalArgumentException("The message index is empty");
130+
}
131+
offsetToMessageIdCache.putMessageIdByOffset(partitionTopic, message.getIndex().get(), message.getMessageId());
132+
return message.getIndex().get();
133+
}
134+
135+
@Override
136+
public long getConsumeStats(String topic, Integer partition, String group) throws PulsarAdminException {
137+
String partitionTopic = getPartitionTopicName(topic, partition);
138+
139+
// Get partition stats using proper partition identifier
140+
String messageIdString = pulsarAdmin.topics().getPartitionedInternalStats(topic)
141+
.partitions.get(partitionTopic)
142+
.cursors
143+
.get(subscription)
144+
.markDeletePosition;
145+
146+
// Handle potential format errors
147+
if (!messageIdString.contains(":")) {
148+
throw new PulsarAdminException("Invalid message ID format: " + messageIdString);
149+
}
150+
151+
String[] ids = messageIdString.split(":");
152+
try {
153+
long ledgerId = Long.parseLong(ids[0]);
154+
long entryId = Long.parseLong(ids[1]);
155+
156+
// Use partition-specific topic to retrieve message
157+
List<Message<byte[]>> messages = pulsarAdmin.topics().getMessagesById(partitionTopic, ledgerId, entryId);
158+
if (messages == null || messages.isEmpty()) {
159+
throw new PulsarAdminException("Message not found for offset: " + messageIdString);
160+
}
161+
Message<?> message = messages.get(messages.size() - 1);
162+
if (message.getIndex().isEmpty()) {
163+
throw new PulsarAdminException("Message index is empty for offset: " + messageIdString);
164+
}
165+
offsetToMessageIdCache.putMessageIdByOffset(partitionTopic, message.getIndex().get(), message.getMessageId());
166+
return message.getIndex().get();
167+
} catch (NumberFormatException e) {
168+
throw new PulsarAdminException("Invalid message ID components: " + messageIdString, e);
169+
}
170+
}
171+
172+
// Add resource cleanup method
173+
@Override
174+
public void close() throws IOException {
175+
// Close all consumers
176+
for (Consumer<?> consumer : consumerMap.values()) {
177+
consumer.close();
178+
}
179+
180+
offsetToMessageIdCache.cleanup();
181+
}
182+
183+
private String getPartitionTopicName(String topic, int partition) {
184+
if (partitionNum > 0 && partition < 0) {
185+
throw new IllegalArgumentException("Partition index must be non-negative for partitioned topics");
186+
} else if (partition >= partitionNum) {
187+
throw new IllegalArgumentException("Partition index out of bounds: " + partition +
188+
" for topic " + topic + " with " + partitionNum + " partitions");
189+
}
190+
return partition >= 0 ? topic + PARTITION_SPLICER + partition : topic;
191+
}
192+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.apache.pulsar.client.util;
2+
3+
import com.github.benmanes.caffeine.cache.Caffeine;
4+
import com.github.benmanes.caffeine.cache.LoadingCache;
5+
import java.time.Duration;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import org.apache.pulsar.client.admin.PulsarAdmin;
10+
import org.apache.pulsar.client.api.MessageId;
11+
12+
public class OffsetToMessageIdCache {
13+
private static final int MAX_CACHE_SIZE = 1000;
14+
private static final Duration EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5);
15+
16+
private final PulsarAdmin pulsarAdmin;
17+
private final Map<String, LoadingCache<Long, MessageId>> caffeineCacheMap = new ConcurrentHashMap<>();
18+
19+
public OffsetToMessageIdCache(PulsarAdmin pulsarAdmin) {
20+
this.pulsarAdmin = Objects.requireNonNull(pulsarAdmin, "PulsarAdmin must not be null");
21+
}
22+
23+
private LoadingCache<Long, MessageId> createCache(String partitionTopic) {
24+
return Caffeine.newBuilder()
25+
.maximumSize(MAX_CACHE_SIZE)
26+
.expireAfterAccess(EXPIRE_AFTER_ACCESS)
27+
.recordStats()
28+
.build(key -> loadMessageId(partitionTopic, key));
29+
}
30+
31+
private MessageId loadMessageId(String partitionTopic, Long offset) {
32+
// try {
33+
// // todo: waiting for https://github.com/apache/pulsar/pull/24220
34+
// return pulsarAdmin.topics().getMessageIDByOffsetAndPartitionID(partitionTopic, offset);
35+
// } catch (PulsarAdminException e) {
36+
// throw new CompletionException("Failed to load message ID", e);
37+
// }
38+
return null;
39+
}
40+
41+
public MessageId getMessageIdByOffset(String partitionTopic, long offset) {
42+
LoadingCache<Long, MessageId> cache = caffeineCacheMap.computeIfAbsent(
43+
partitionTopic,
44+
this::createCache
45+
);
46+
return cache.get(offset);
47+
}
48+
49+
public void putMessageIdByOffset(String partitionTopic, long offset, MessageId messageId) {
50+
LoadingCache<Long, MessageId> caffeineCache = caffeineCacheMap.computeIfAbsent(partitionTopic,
51+
k -> createCache(partitionTopic));
52+
caffeineCache.put(offset, messageId);
53+
}
54+
55+
public void cleanup() {
56+
for (LoadingCache<Long, MessageId> cache : caffeineCacheMap.values()) {
57+
cache.cleanUp();
58+
}
59+
}
60+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.apache.pulsar.client.util;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
import org.apache.pulsar.client.admin.PulsarAdmin;
6+
7+
public class OffsetToMessageIdCacheProvider {
8+
private static final Map<String, OffsetToMessageIdCache> CACHE_MAP = new ConcurrentHashMap<>();
9+
10+
public static OffsetToMessageIdCache getOffsetToMessageIdCache(PulsarAdmin pulsarAdmin, String brokerCluster) {
11+
return CACHE_MAP.computeIfAbsent(brokerCluster, key -> new OffsetToMessageIdCache(pulsarAdmin));
12+
}
13+
}

0 commit comments

Comments
 (0)