Skip to content

Commit 484c192

Browse files
author
xiangying
committed
[feat][client] Support Pull Consumer
1 parent 809f918 commit 484c192

File tree

6 files changed

+513
-218
lines changed

6 files changed

+513
-218
lines changed
Lines changed: 118 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,148 @@
11
package org.apache.pulsar.client.api;
22

3-
import java.io.IOException;
3+
import java.time.Duration;
44
import java.util.List;
5-
import java.util.concurrent.TimeUnit;
5+
import lombok.Data;
66
import org.apache.pulsar.client.admin.PulsarAdminException;
77

8-
public interface PulsarPullConsumer<T> {
8+
/**
9+
* Pull-based consumer interface with enhanced offset management capabilities.
10+
*
11+
* <p>Features:</p>
12+
* <ul>
13+
* <li>Precise offset control with partition-aware operations</li>
14+
* <li>Thread-safe design for concurrent access</li>
15+
* <li>Support for both partitioned and non-partitioned topics</li>
16+
* <li>Built-in offset to message ID mapping</li>
17+
* </ul>
18+
*
19+
* @param <T> message payload type
20+
*/
21+
public interface PulsarPullConsumer<T> extends AutoCloseable {
22+
int PARTITION_NONE = -1;
23+
Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofSeconds(30);
24+
925
/**
10-
* Initializes the consumer and establishes connection to brokers.
26+
* Initializes consumer resources and establishes connections.
1127
*
12-
* @throws PulsarClientException if client setup fails
13-
* @throws PulsarAdminException if admin operations fail
28+
* @throws PulsarClientException if client initialization fails
1429
*/
15-
void start() throws PulsarClientException, PulsarAdminException;
30+
void start() throws PulsarClientException;
1631

1732
/**
18-
* Pulls messages from specified partition starting from the given offset (inclusive).
33+
* Pulls messages from the specified partition starting from the given offset.
1934
*
20-
* @param offset the starting offset to consume from (inclusive)
21-
* @param partition the target partition index, -1 means non-partitioned topic
22-
* @param maxNum maximum number of messages to return
23-
* @param maxSize maximum total size (bytes) of messages to return
24-
* @param timeout maximum wait time per message
25-
* @param timeUnit timeout unit
26-
* @return list of messages starting from the specified offset
27-
* @throws PulsarClientException if read operations fail
28-
* @throws PulsarAdminException if offset mapping fails
35+
* @param request pull request configuration
36+
* @return immutable list of messages starting from the specified offset
37+
* @throws IllegalArgumentException for invalid request parameters
2938
*/
30-
List<Message<?>> pull(long offset, int partition, int maxNum, int maxSize,
31-
int timeout, TimeUnit timeUnit)
32-
throws PulsarClientException, PulsarAdminException;
39+
List<Message<T>> pull(PullRequest request);
3340

3441
/**
35-
* Acknowledges consumption of all messages up to and including the specified offset.
42+
* Acknowledges all messages up to the specified offset (inclusive).
3643
*
37-
* @param offset the offset to acknowledge (inclusive)
38-
* @param partition the target partition index, -1 means non-partitioned topic
39-
* @throws PulsarClientException if acknowledgment fails
44+
* @param offset target offset to acknowledge
45+
* @param partition partition index (use {@link #PARTITION_NONE} for non-partitioned topics)
46+
* @throws PulsarClientException for acknowledgment failures
47+
* @throws IllegalArgumentException for invalid partition index
4048
*/
4149
void ack(long offset, int partition) throws PulsarClientException;
4250

4351
/**
4452
* Finds the latest message offset before or at the specified timestamp.
4553
*
46-
* @param topic target topic name
47-
* @param partition the target partition index, -1 means non-partitioned topic
48-
* @param timestamp target timestamp (milliseconds since epoch)
49-
* @return the closest offset where message timestamp ≤ specified timestamp
50-
* @throws PulsarAdminException if message lookup fails
54+
* @param partition partition index (use {@link #PARTITION_NONE} for non-partitioned topics)
55+
* @param timestamp target timestamp in milliseconds
56+
* @return corresponding message offset
57+
* @throws PulsarAdminException for admin operation failures
58+
* @throws IllegalArgumentException for invalid partition index
5159
*/
52-
long searchOffset(String topic, int partition, long timestamp) throws PulsarAdminException;
60+
long searchOffset(int partition, long timestamp) throws PulsarAdminException;
5361

5462
/**
55-
* Retrieves consumption statistics for a consumer group.
63+
* Retrieves consumption statistics for the specified partition.
5664
*
57-
* @param topic target topic name
58-
* @param partition the target partition index, -1 means non-partitioned topic
59-
* @param group consumer group name
60-
* @return the current consumed offset for the group
61-
* @throws PulsarAdminException if statistics retrieval fails
65+
* @param partition partition index (use {@link #PARTITION_NONE} for non-partitioned topics)
66+
* @return current consumption offset
67+
* @throws PulsarAdminException for stats retrieval failures
68+
* @throws IllegalArgumentException for invalid partition index
6269
*/
63-
long getConsumeStats(String topic, Integer partition, String group) throws PulsarAdminException;
70+
long getConsumeStats(int partition) throws PulsarAdminException;
6471

6572
/**
66-
* Releases all resources and closes connections.
67-
* Implements AutoCloseable for try-with-resources support.
68-
*
69-
* @throws IOException if graceful shutdown fails
73+
* Releases all resources and closes connections gracefully.
74+
*/
75+
@Override
76+
void close();
77+
78+
/**
79+
* Configuration object for pull requests.
7080
*/
71-
void close() throws IOException;
81+
@Data
82+
class PullRequest {
83+
private final long offset;
84+
private final int partition;
85+
private final int maxMessages;
86+
private final int maxBytes;
87+
private final Duration timeout;
88+
89+
private PullRequest(Builder builder) {
90+
this.offset = builder.offset;
91+
this.partition = builder.partition;
92+
this.maxMessages = builder.maxMessages;
93+
this.maxBytes = builder.maxBytes;
94+
this.timeout = builder.timeout;
95+
}
96+
97+
public static Builder builder() {
98+
return new Builder();
99+
}
100+
101+
public static class Builder {
102+
private long offset = -1L;
103+
private int partition = PARTITION_NONE;
104+
private int maxMessages = 100;
105+
private int maxBytes = 10_485_760; // 10MB
106+
private Duration timeout = DEFAULT_OPERATION_TIMEOUT;
107+
108+
public Builder offset(long offset) {
109+
this.offset = offset;
110+
return this;
111+
}
112+
113+
public Builder partition(int partition) {
114+
this.partition = partition;
115+
return this;
116+
}
117+
118+
public Builder maxMessages(int maxMessages) {
119+
this.maxMessages = maxMessages;
120+
return this;
121+
}
122+
123+
public Builder maxBytes(int maxBytes) {
124+
this.maxBytes = maxBytes;
125+
return this;
126+
}
127+
128+
public Builder timeout(Duration timeout) {
129+
this.timeout = timeout;
130+
return this;
131+
}
132+
133+
public PullRequest build() {
134+
validate();
135+
return new PullRequest(this);
136+
}
137+
138+
private void validate() {
139+
if (offset < 0) {
140+
throw new IllegalArgumentException("Offset must be non-negative");
141+
}
142+
if (maxMessages <= 0 || maxBytes <= 0) {
143+
throw new IllegalArgumentException("Max messages/bytes must be positive");
144+
}
145+
}
146+
}
147+
}
72148
}

0 commit comments

Comments
 (0)