Skip to content

Commit 2ce0ae4

Browse files
authored
Merge pull request #238 from accelerated/poll_strategy
Added member functions for static consumers
2 parents 7d097df + a4532ed commit 2ce0ae4

File tree

2 files changed

+52
-14
lines changed

2 files changed

+52
-14
lines changed

include/cppkafka/utils/poll_strategy_base.h

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,36 @@ class CPPKAFKA_API PollStrategyBase : public PollInterface {
8484
*/
8585
Consumer& get_consumer() final;
8686

87+
/**
88+
* \brief Creates partitions queues associated with the supplied partitions.
89+
*
90+
* This method contains a default implementation. It adds all the new queues belonging
91+
* to the provided partition list and calls reset_state().
92+
* To be used with static consumers.
93+
*
94+
* \param partitions Assigned topic partitions.
95+
*/
96+
virtual void assign(TopicPartitionList& partitions);
97+
98+
/**
99+
* \brief Removes partitions queues associated with the supplied partitions.
100+
*
101+
* This method contains a default implementation. It removes all the queues
102+
* belonging to the provided partition list and calls reset_state().
103+
* To be used with static consumers.
104+
*
105+
* \param partitions Revoked topic partitions.
106+
*/
107+
virtual void revoke(const TopicPartitionList& partitions);
108+
109+
/**
110+
* \brief Removes all partitions queues associated with the supplied partitions.
111+
*
112+
* This method contains a default implementation. It removes all the queues
113+
* currently assigned and calls reset_state(). To be used with static consumers.
114+
*/
115+
virtual void revoke();
116+
87117
protected:
88118
/**
89119
* \brief Get the queues from all assigned partitions
@@ -111,8 +141,8 @@ class CPPKAFKA_API PollStrategyBase : public PollInterface {
111141
/**
112142
* \brief Function to be called when a new partition assignment takes place
113143
*
114-
* This method contains a default implementation. It adds all the new queues belonging
115-
* to the provided partition list and calls reset_state().
144+
* This method contains a default implementation. It calls assign()
145+
* and invokes the user assignment callback.
116146
*
117147
* \param partitions Assigned topic partitions
118148
*/
@@ -121,8 +151,8 @@ class CPPKAFKA_API PollStrategyBase : public PollInterface {
121151
/**
122152
* \brief Function to be called when an old partition assignment gets revoked
123153
*
124-
* This method contains a default implementation. It removes all the queues
125-
* belonging to the provided partition list and calls reset_state().
154+
* This method contains a default implementation. It calls revoke()
155+
* and invokes the user revocation callback.
126156
*
127157
* \param partitions Revoked topic partitions
128158
*/

src/utils/poll_strategy_base.cpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,29 +89,37 @@ void PollStrategyBase::reset_state() {
8989

9090
}
9191

92-
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
92+
void PollStrategyBase::assign(TopicPartitionList& partitions) {
9393
// populate partition queues
9494
for (const auto& partition : partitions) {
9595
// get the queue associated with this partition
9696
partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()});
9797
}
9898
reset_state();
99+
}
100+
101+
void PollStrategyBase::revoke(const TopicPartitionList& partitions) {
102+
for (const auto &partition : partitions) {
103+
partition_queues_.erase(partition);
104+
}
105+
reset_state();
106+
}
107+
108+
void PollStrategyBase::revoke() {
109+
partition_queues_.clear();
110+
reset_state();
111+
}
112+
113+
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
114+
assign(partitions);
99115
// call original consumer callback if any
100116
if (assignment_callback_) {
101117
assignment_callback_(partitions);
102118
}
103119
}
104120

105121
void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) {
106-
for (const auto& partition : partitions) {
107-
// get the queue associated with this partition
108-
auto toppar_it = partition_queues_.find(partition);
109-
if (toppar_it != partition_queues_.end()) {
110-
// remove this queue from the list
111-
partition_queues_.erase(toppar_it);
112-
}
113-
}
114-
reset_state();
122+
revoke(partitions);
115123
// call original consumer callback if any
116124
if (revocation_callback_) {
117125
revocation_callback_(partitions);

0 commit comments

Comments
 (0)