Skip to content

Commit 37bdf5b

Browse files
Support seek operation on a multi-topics consumer (apache#426)
### Motivation See apache/pulsar-client-python#213 ### Modifications Add a new `forEachValue` overload that allows users to count the number of rest running tasks through `SharedFuture` to `SynchronizedHashMap`. Leverage this overload in seek operations when the argument is a timestamp, or a MessageId that represents earliest or latest. When the argument is a MessageId whose `getTopicName()` method returns a correct topic name, seek on the internal consumer of that topic. Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to `ConsumerSeekTest` to cover these cases.
1 parent 3f0b33b commit 37bdf5b

File tree

6 files changed

+385
-136
lines changed

6 files changed

+385
-136
lines changed

lib/MultiTopicsConsumerImpl.cc

Lines changed: 47 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -338,41 +338,23 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback)
338338
}
339339
state_ = Closing;
340340

341-
std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
342341
auto self = get_shared_this_ptr();
343-
int numConsumers = 0;
344342
consumers_.forEachValue(
345-
[&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
346-
numConsumers++;
347-
consumer->unsubscribeAsync([self, consumerUnsubed, callback](Result result) {
348-
self->handleUnsubscribedAsync(result, consumerUnsubed, callback);
343+
[this, self, callback](const ConsumerImplPtr& consumer, SharedFuture future) {
344+
consumer->unsubscribeAsync([this, self, callback, future](Result result) {
345+
if (result != ResultOk) {
346+
state_ = Failed;
347+
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
348+
<< result << " subscription - " << subscriptionName_);
349+
}
350+
if (future.tryComplete()) {
351+
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - "
352+
<< consumerStr_);
353+
callback((state_ != Failed) ? ResultOk : ResultUnknownError);
354+
}
349355
});
350-
});
351-
if (numConsumers == 0) {
352-
// No need to unsubscribe, since the list matching the regex was empty
353-
callback(ResultOk);
354-
}
355-
}
356-
357-
void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
358-
std::shared_ptr<std::atomic<int>> consumerUnsubed,
359-
ResultCallback callback) {
360-
(*consumerUnsubed)++;
361-
362-
if (result != ResultOk) {
363-
state_ = Failed;
364-
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
365-
<< result << " subscription - " << subscriptionName_);
366-
}
367-
368-
if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
369-
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_);
370-
Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
371-
// The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if
372-
// unsubscribe succeeds.
373-
callback(result1);
374-
return;
375-
}
356+
},
357+
[callback] { callback(ResultOk); });
376358
}
377359

378360
void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
@@ -899,50 +881,52 @@ std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::v
899881
return topicNamePtr;
900882
}
901883

902-
void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
903-
callback(ResultOperationNotSupported);
904-
}
905-
906-
void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
907-
if (state_ != Ready) {
908-
callback(ResultAlreadyClosed);
909-
return;
910-
}
911-
884+
void MultiTopicsConsumerImpl::beforeSeek() {
912885
duringSeek_.store(true, std::memory_order_release);
913886
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); });
914887
unAckedMessageTrackerPtr_->clear();
915888
incomingMessages_.clear();
916889
incomingMessagesSize_ = 0L;
890+
}
891+
892+
void MultiTopicsConsumerImpl::afterSeek() {
893+
duringSeek_.store(false, std::memory_order_release);
894+
auto self = get_shared_this_ptr();
895+
listenerExecutor_->postWork([this, self] {
896+
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
897+
});
898+
}
899+
900+
void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
901+
if (msgId == MessageId::earliest() || msgId == MessageId::latest()) {
902+
return seekAllAsync(msgId, callback);
903+
}
917904

905+
auto optConsumer = consumers_.find(msgId.getTopicName());
906+
if (!optConsumer) {
907+
LOG_ERROR(getName() << "cannot seek a message id whose topic \"" + msgId.getTopicName() +
908+
"\" is not subscribed");
909+
callback(ResultOperationNotSupported);
910+
return;
911+
}
912+
913+
beforeSeek();
918914
auto weakSelf = weak_from_this();
919-
auto numConsumersLeft = std::make_shared<std::atomic<int64_t>>(consumers_.size());
920-
auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result result) {
915+
optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result result) {
921916
auto self = weakSelf.lock();
922-
if (PULSAR_UNLIKELY(!self)) {
923-
callback(result);
924-
return;
925-
}
926-
if (result != ResultOk) {
927-
*numConsumersLeft = 0; // skip the following callbacks
917+
if (self) {
918+
afterSeek();
928919
callback(result);
929-
return;
930-
}
931-
if (--*numConsumersLeft > 0) {
932-
return;
920+
} else {
921+
callback(ResultAlreadyClosed);
933922
}
934-
duringSeek_.store(false, std::memory_order_release);
935-
listenerExecutor_->postWork([this, self] {
936-
consumers_.forEachValue(
937-
[](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
938-
});
939-
callback(ResultOk);
940-
};
941-
consumers_.forEachValue([timestamp, &wrappedCallback](const ConsumerImplPtr& consumer) {
942-
consumer->seekAsync(timestamp, wrappedCallback);
943923
});
944924
}
945925

926+
void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
927+
seekAllAsync(timestamp, callback);
928+
}
929+
946930
void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
947931
consumers_.forEachValue([enabled](const ConsumerImplPtr& consumer) {
948932
consumer->setNegativeAcknowledgeEnabledForTesting(enabled);

lib/MultiTopicsConsumerImpl.h

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#include <vector>
2626

2727
#include "Commands.h"
28-
#include "ConsumerImplBase.h"
28+
#include "ConsumerImpl.h"
2929
#include "ConsumerInterceptors.h"
3030
#include "Future.h"
3131
#include "Latch.h"
@@ -38,7 +38,6 @@
3838
namespace pulsar {
3939
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
4040

41-
class ConsumerImpl;
4241
using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
4342
class ClientImpl;
4443
using ClientImplPtr = std::shared_ptr<ClientImpl>;
@@ -152,8 +151,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
152151
void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
153152
std::shared_ptr<std::atomic<int>> partitionsNeedCreate,
154153
ConsumerSubResultPromisePtr topicSubResultPromise);
155-
void handleUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
156-
ResultCallback callback);
157154
void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
158155
int numberPartitions, TopicNamePtr topicNamePtr,
159156
std::string& topicPartitionName, ResultCallback callback);
@@ -179,6 +176,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
179176
return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
180177
}
181178

179+
template <typename SeekArg>
180+
#if __cplusplus >= 202002L
181+
requires std::convertible_to<SeekArg, uint64_t> ||
182+
std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId>
183+
#endif
184+
void seekAllAsync(const SeekArg& seekArg, ResultCallback callback);
185+
186+
void beforeSeek();
187+
void afterSeek();
188+
182189
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
183190
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
184191
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
@@ -187,5 +194,42 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
187194
};
188195

189196
typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
197+
198+
template <typename SeekArg>
199+
#if __cplusplus >= 202002L
200+
requires std::convertible_to<SeekArg, uint64_t> ||
201+
std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId>
202+
#endif
203+
inline void MultiTopicsConsumerImpl::seekAllAsync(const SeekArg& seekArg, ResultCallback callback) {
204+
if (state_ != Ready) {
205+
callback(ResultAlreadyClosed);
206+
return;
207+
}
208+
beforeSeek();
209+
auto weakSelf = weak_from_this();
210+
auto failed = std::make_shared<std::atomic_bool>(false);
211+
consumers_.forEachValue(
212+
[this, weakSelf, &seekArg, callback, failed](const ConsumerImplPtr& consumer, SharedFuture future) {
213+
consumer->seekAsync(seekArg, [this, weakSelf, callback, failed, future](Result result) {
214+
auto self = weakSelf.lock();
215+
if (!self || failed->load(std::memory_order_acquire)) {
216+
callback(result);
217+
return;
218+
}
219+
if (result != ResultOk) {
220+
failed->store(true, std::memory_order_release); // skip the following callbacks
221+
afterSeek();
222+
callback(result);
223+
return;
224+
}
225+
if (future.tryComplete()) {
226+
afterSeek();
227+
callback(ResultOk);
228+
}
229+
});
230+
},
231+
[callback] { callback(ResultOk); });
232+
}
233+
190234
} // namespace pulsar
191235
#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER

lib/SynchronizedHashMap.h

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,27 @@
1818
*/
1919
#pragma once
2020

21+
#include <atomic>
2122
#include <boost/optional.hpp>
2223
#include <functional>
24+
#include <memory>
2325
#include <mutex>
2426
#include <unordered_map>
2527
#include <utility>
2628
#include <vector>
2729

2830
namespace pulsar {
2931

32+
class SharedFuture {
33+
public:
34+
SharedFuture(size_t size) : count_(std::make_shared<std::atomic_size_t>(size)) {}
35+
36+
bool tryComplete() const { return --*count_ == 0; }
37+
38+
private:
39+
std::shared_ptr<std::atomic_size_t> count_;
40+
};
41+
3042
// V must be default constructible and copyable
3143
template <typename K, typename V>
3244
class SynchronizedHashMap {
@@ -60,10 +72,57 @@ class SynchronizedHashMap {
6072
}
6173
}
6274

63-
void forEachValue(std::function<void(const V&)> f) const {
64-
Lock lock(mutex_);
65-
for (const auto& kv : data_) {
66-
f(kv.second);
75+
template <typename ValueFunc>
76+
#if __cplusplus >= 202002L
77+
requires requires(ValueFunc&& each, const V& value) {
78+
each(value);
79+
}
80+
#endif
81+
void forEachValue(ValueFunc&& each) {
82+
Lock lock{mutex_};
83+
for (auto&& kv : data_) {
84+
each(kv.second);
85+
}
86+
}
87+
88+
// This override provides a convenient approach to execute tasks on each consumer concurrently and
89+
// supports checking if all tasks are done in the `each` callback.
90+
//
91+
// All map values will be passed as the 1st argument to the `each` function. The 2nd argument is a shared
92+
// future whose `tryComplete` method marks this task as completed. If users want to check if all task are
93+
// completed in the `each` function, this method must be called.
94+
//
95+
// For example, given a `SynchronizedHashMap<int, std::string>` object `m` and the following call:
96+
//
97+
// ```c++
98+
// m.forEachValue([](const std::string& s, SharedFuture future) {
99+
// std::cout << s << std::endl;
100+
// if (future.tryComplete()) {
101+
// std::cout << "done" << std::endl;
102+
// }
103+
// }, [] { std::cout << "empty map" << std::endl; });
104+
// ```
105+
//
106+
// If the map is empty, only "empty map" will be printed. Otherwise, all values will be printed
107+
// and "done" will be printed after that.
108+
template <typename ValueFunc, typename EmptyFunc>
109+
#if __cplusplus >= 202002L
110+
requires requires(ValueFunc&& each, const V& value, SharedFuture count, EmptyFunc emptyFunc) {
111+
each(value, count);
112+
emptyFunc();
113+
}
114+
#endif
115+
void forEachValue(ValueFunc&& each, EmptyFunc&& emptyFunc) {
116+
std::unique_lock<MutexType> lock{mutex_};
117+
if (data_.empty()) {
118+
lock.unlock();
119+
emptyFunc();
120+
return;
121+
}
122+
SharedFuture future{data_.size()};
123+
for (auto&& kv : data_) {
124+
const auto& value = kv.second;
125+
each(value, future);
67126
}
68127
}
69128

0 commit comments

Comments
 (0)