Skip to content

Commit 440bf01

Browse files
author
xiangying
committed
[feat][client] Support Pull Consumer
1 parent 892df92 commit 440bf01

File tree

10 files changed

+872
-24
lines changed

10 files changed

+872
-24
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<testcontainers.version>1.20.1</testcontainers.version>
5151
<junit.version>4.13.1</junit.version>
5252
<mockito.version>5.12.0</mockito.version>
53+
<caffeine.version>3.2.0</caffeine.version>
5354
</properties>
5455

5556
<modules>

pulsar-client-common-contrib/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@
3030
<dependency>
3131
<groupId>org.apache.pulsar</groupId>
3232
<artifactId>pulsar-client-all</artifactId>
33+
<version>${pulsar.version}</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>com.github.ben-manes.caffeine</groupId>
37+
<artifactId>caffeine</artifactId>
38+
<version>${caffeine.version}</version>
3339
</dependency>
3440
</dependencies>
3541

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package org.apache.pulsar.client.api;
2+
3+
import java.time.Duration;
4+
import java.util.List;
5+
import lombok.Data;
6+
import org.apache.pulsar.client.admin.PulsarAdminException;
7+
8+
/**
9+
* Pull-based consumer interface with enhanced offset management capabilities.
10+
*
11+
* <p>Features:</p>
12+
* <ul>
13+
* <li>Precise offset control with partition-aware operations</li>
14+
* <li>Thread-safe design for concurrent access</li>
15+
* <li>Support for both partitioned and non-partitioned topics</li>
16+
* <li>Built-in offset to message ID mapping</li>
17+
* </ul>
18+
*
19+
* @param <T> message payload type
20+
*/
21+
public interface PulsarPullConsumer<T> extends AutoCloseable {
22+
int PARTITION_NONE = -1;
23+
Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofSeconds(30);
24+
25+
/**
26+
* Initializes consumer resources and establishes connections.
27+
*
28+
* @throws PulsarClientException if client initialization fails
29+
*/
30+
void start() throws PulsarClientException;
31+
32+
/**
33+
* Pulls messages from the specified partition starting from the given offset.
34+
*
35+
* @param request pull request configuration
36+
* @return immutable list of messages starting from the specified offset
37+
* @throws IllegalArgumentException for invalid request parameters
38+
*/
39+
List<Message<T>> pull(PullRequest request);
40+
41+
/**
42+
* Acknowledges all messages up to the specified offset (inclusive).
43+
*
44+
* @param offset target offset to acknowledge
45+
* @param partition partition index (use {@link #PARTITION_NONE} for non-partitioned topics)
46+
* @throws PulsarClientException for acknowledgment failures
47+
* @throws IllegalArgumentException for invalid partition index
48+
*/
49+
void ack(long offset, int partition) throws PulsarClientException;
50+
51+
/**
52+
* Finds the latest message offset before or at the specified timestamp.
53+
*
54+
* @param partition partition index (use {@link #PARTITION_NONE} for non-partitioned topics)
55+
* @param timestamp target timestamp in milliseconds
56+
* @return corresponding message offset
57+
* @throws PulsarAdminException for admin operation failures
58+
* @throws IllegalArgumentException for invalid partition index
59+
*/
60+
long searchOffset(int partition, long timestamp) throws PulsarAdminException;
61+
62+
/**
63+
* Retrieves consumption statistics for the specified partition.
64+
*
65+
* @param partition partition index (use {@link #PARTITION_NONE} for non-partitioned topics)
66+
* @return current consumption offset
67+
* @throws PulsarAdminException for stats retrieval failures
68+
* @throws IllegalArgumentException for invalid partition index
69+
*/
70+
long getConsumeStats(int partition) throws PulsarAdminException;
71+
72+
/**
73+
* Releases all resources and closes connections gracefully.
74+
*/
75+
@Override
76+
void close();
77+
78+
/**
79+
* Configuration object for pull requests.
80+
*/
81+
@Data
82+
class PullRequest {
83+
private final long offset;
84+
private final int partition;
85+
private final int maxMessages;
86+
private final int maxBytes;
87+
private final Duration timeout;
88+
89+
private PullRequest(Builder builder) {
90+
this.offset = builder.offset;
91+
this.partition = builder.partition;
92+
this.maxMessages = builder.maxMessages;
93+
this.maxBytes = builder.maxBytes;
94+
this.timeout = builder.timeout;
95+
}
96+
97+
public static Builder builder() {
98+
return new Builder();
99+
}
100+
101+
public static class Builder {
102+
private long offset = -1L;
103+
private int partition = PARTITION_NONE;
104+
private int maxMessages = 100;
105+
private int maxBytes = 10_485_760; // 10MB
106+
private Duration timeout = DEFAULT_OPERATION_TIMEOUT;
107+
108+
public Builder offset(long offset) {
109+
this.offset = offset;
110+
return this;
111+
}
112+
113+
public Builder partition(int partition) {
114+
this.partition = partition;
115+
return this;
116+
}
117+
118+
public Builder maxMessages(int maxMessages) {
119+
this.maxMessages = maxMessages;
120+
return this;
121+
}
122+
123+
public Builder maxBytes(int maxBytes) {
124+
this.maxBytes = maxBytes;
125+
return this;
126+
}
127+
128+
public Builder timeout(Duration timeout) {
129+
this.timeout = timeout;
130+
return this;
131+
}
132+
133+
public PullRequest build() {
134+
validate();
135+
return new PullRequest(this);
136+
}
137+
138+
private void validate() {
139+
if (maxMessages <= 0 || maxBytes <= 0) {
140+
throw new IllegalArgumentException("Max messages/bytes must be positive");
141+
}
142+
}
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)