Skip to content

Commit 1a0ce69

Browse files
authored
[feat] Support consumer seek by timestamp and reader seek for C Api (#118)
Fixes #82 ### Motivation #82 ### Modifications * Support seek by timestamp for the consumer * Support seek by messageid and timestamp for the reader
1 parent ac3033d commit 1a0ce69

File tree

5 files changed

+379
-0
lines changed

5 files changed

+379
-0
lines changed

include/pulsar/c/consumer.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,51 @@ PULSAR_PUBLIC pulsar_result resume_message_listener(pulsar_consumer_t *consumer)
241241
*/
242242
PULSAR_PUBLIC void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer);
243243

244+
/**
245+
* Reset the subscription associated with this consumer to a specific message id.
246+
*
247+
* @param consumer The consumer
248+
* @param messageId The message id can either be a specific message or represent the first or last messages in
249+
* the topic.
250+
* @param callback The callback for this async operation
251+
* @param ctx The context for the callback
252+
*/
244253
PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
245254
pulsar_result_callback callback, void *ctx);
246255

256+
/**
257+
* Reset the subscription asynchronously associated with this consumer to a specific message id.
258+
*
259+
* @param consumer The consumer
260+
* @param messageId The message id can either be a specific message or represent the first or last messages in
261+
* the topic.
262+
* @return Operation result
263+
*/
247264
PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId);
248265

266+
/**
267+
* Reset the subscription associated with this consumer to a specific message publish time.
268+
*
269+
* @param consumer The consumer
270+
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
271+
* be Unix time in milliseconds.
272+
* @param callback The callback for this async operation
273+
* @param ctx The context for the callback
274+
*/
275+
PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp,
276+
pulsar_result_callback callback, void *ctx);
277+
278+
/**
279+
* Reset the subscription asynchronously associated with this consumer to a specific message publish time.
280+
*
281+
* @param consumer The consumer
282+
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
283+
* be Unix time in milliseconds.
284+
* @return Operation result
285+
*/
286+
PULSAR_PUBLIC pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer,
287+
uint64_t timestamp);
288+
249289
PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer);
250290

251291
PULSAR_PUBLIC pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,

include/pulsar/c/reader.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,50 @@ PULSAR_PUBLIC pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, pul
5959
PULSAR_PUBLIC pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader,
6060
pulsar_message_t **msg, int timeoutMs);
6161

62+
/**
63+
* Reset the subscription associated with this reader to a specific message id.
64+
*
65+
* @param reader The reader
66+
* @param messageId The message id can either be a specific message or represent the first or last messages in
67+
* the topic.
68+
* @param callback The callback for this async operation
69+
* @param ctx The context for the callback
70+
*/
71+
PULSAR_PUBLIC void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId,
72+
pulsar_result_callback callback, void *ctx);
73+
74+
/**
75+
* Reset the subscription asynchronously associated with this reader to a specific message id.
76+
*
77+
* @param reader The reader
78+
* @param messageId The message id can either be a specific message or represent the first or last messages in
79+
* the topic.
80+
* @return Operation result
81+
*/
82+
PULSAR_PUBLIC pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId);
83+
84+
/**
85+
* Reset the subscription associated with this reader to a specific message publish time.
86+
*
87+
* @param reader The reader
88+
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
89+
* be Unix time in milliseconds.
90+
* @param callback The callback for this async operation
91+
* @param ctx The context for the callback
92+
*/
93+
PULSAR_PUBLIC void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp,
94+
pulsar_result_callback callback, void *ctx);
95+
96+
/**
97+
* Reset the subscription asynchronously associated with this reader to a specific message publish time.
98+
*
99+
* @param reader The reader
100+
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
101+
* be Unix time in milliseconds.
102+
* @return Operation result
103+
*/
104+
PULSAR_PUBLIC pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp);
105+
62106
PULSAR_PUBLIC pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
63107

64108
PULSAR_PUBLIC void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback,

lib/c/c_Consumer.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,16 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_i
156156
return (pulsar_result)consumer->consumer.seek(messageId->messageId);
157157
}
158158

159+
void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp,
160+
pulsar_result_callback callback, void *ctx) {
161+
consumer->consumer.seekAsync(timestamp,
162+
std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
163+
}
164+
165+
pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, uint64_t timestamp) {
166+
return (pulsar_result)consumer->consumer.seek(timestamp);
167+
}
168+
159169
int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return consumer->consumer.isConnected(); }
160170

161171
pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,

lib/c/c_Reader.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,26 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
4545
return (pulsar_result)res;
4646
}
4747

48+
void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId,
49+
pulsar_result_callback callback, void *ctx) {
50+
reader->reader.seekAsync(messageId->messageId,
51+
std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
52+
}
53+
54+
pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId) {
55+
return (pulsar_result)reader->reader.seek(messageId->messageId);
56+
}
57+
58+
void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp,
59+
pulsar_result_callback callback, void *ctx) {
60+
reader->reader.seekAsync(timestamp,
61+
std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
62+
}
63+
64+
pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp) {
65+
return (pulsar_result)reader->reader.seek(timestamp);
66+
}
67+
4868
pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); }
4969

5070
void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) {

0 commit comments

Comments
 (0)