Skip to content

Commit 5c72f3f

Browse files
acceleratedmfontanini
authored andcommitted
Added pause/resume for producers (#87)
* Added pause/resume for producers * Moved pause/resume functions to KafkaHandleBase
1 parent 069ea3d commit 5c72f3f

File tree

6 files changed

+39
-1
lines changed

6 files changed

+39
-1
lines changed

include/cppkafka/consumer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
101101
using AssignmentCallback = std::function<void(TopicPartitionList&)>;
102102
using RevocationCallback = std::function<void(const TopicPartitionList&)>;
103103
using RebalanceErrorCallback = std::function<void(Error)>;
104+
using KafkaHandleBase::pause;
104105

105106
/**
106107
* \brief Creates an instance of a consumer.
@@ -202,7 +203,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
202203
/**
203204
* \brief Resumes all consumption
204205
*/
205-
void resume();
206+
void resume();
206207

207208
/**
208209
* \brief Commits the current partition assignment

include/cppkafka/kafka_handle_base.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ class CPPKAFKA_API KafkaHandleBase {
7676
*/
7777
void pause_partitions(const TopicPartitionList& topic_partitions);
7878

79+
/**
80+
* \brief Pauses consumption/production for this topic
81+
*
82+
* \param topic The topic name
83+
*/
84+
void pause(const std::string& topic);
85+
7986
/**
8087
* \brief Resumes consumption/production from the given topic/partition list
8188
*
@@ -84,6 +91,13 @@ class CPPKAFKA_API KafkaHandleBase {
8491
* \param topic_partitions The topic/partition list to resume consuming/producing from/to
8592
*/
8693
void resume_partitions(const TopicPartitionList& topic_partitions);
94+
95+
/**
96+
* \brief Resumes consumption/production for this topic
97+
*
98+
* \param topic The topic name
99+
*/
100+
void resume(const std::string& topic);
87101

88102
/**
89103
* \brief Sets the timeout for operations that require a timeout

include/cppkafka/producer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class Message;
7878
*/
7979
class CPPKAFKA_API Producer : public KafkaHandleBase {
8080
public:
81+
using KafkaHandleBase::pause;
8182
/**
8283
* The policy to use for the payload. The default policy is COPY_PAYLOAD
8384
*/

include/cppkafka/topic_partition_list.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
namespace cppkafka {
4242

4343
class TopicPartition;
44+
class PartitionMetadata;
4445

4546
using TopicPartitionsListPtr = std::unique_ptr<rd_kafka_topic_partition_list_t,
4647
decltype(&rd_kafka_topic_partition_list_destroy)>;
@@ -53,6 +54,8 @@ using TopicPartitionList = std::vector<TopicPartition>;
5354
CPPKAFKA_API TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions);
5455
CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions);
5556
CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions);
57+
CPPKAFKA_API TopicPartitionList convert(const std::string& topic,
58+
const std::vector<PartitionMetadata>& partition_metadata);
5659
CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);
5760

5861
// Extracts a partition list subset belonging to the provided topics (case-insensitive)

src/kafka_handle_base.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,21 @@ void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partition
6464
check_error(error);
6565
}
6666

67+
void KafkaHandleBase::pause(const std::string& topic) {
68+
pause_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
69+
}
70+
6771
void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) {
6872
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
6973
rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(),
7074
topic_list_handle.get());
7175
check_error(error);
7276
}
7377

78+
void KafkaHandleBase::resume(const std::string& topic) {
79+
resume_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
80+
}
81+
7482
void KafkaHandleBase::set_timeout(milliseconds timeout) {
7583
timeout_ms_ = timeout;
7684
}

src/topic_partition_list.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "topic_partition_list.h"
3333
#include "topic_partition.h"
3434
#include "exceptions.h"
35+
#include "metadata.h"
3536

3637
using std::vector;
3738
using std::set;
@@ -66,6 +67,16 @@ TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions) {
6667
return output;
6768
}
6869

70+
TopicPartitionList convert(const std::string& topic,
71+
const std::vector<PartitionMetadata>& partition_metadata)
72+
{
73+
TopicPartitionList output;
74+
for (const auto& meta : partition_metadata) {
75+
output.emplace_back(topic, meta.get_id());
76+
}
77+
return output;
78+
}
79+
6980
TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
7081
return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
7182
}

0 commit comments

Comments
 (0)