Skip to content

Commit 679f58d

Browse files
authored
Merge pull request #241 from accelerated/timeout-overloads
Added timeout overloads for consumer and handle classes
2 parents 2ce0ae4 + b2b0d16 commit 679f58d

File tree

4 files changed

+152
-17
lines changed

4 files changed

+152
-17
lines changed

include/cppkafka/consumer.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,20 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
280280
* \return The topic partition list
281281
*/
282282
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;
283+
284+
/**
285+
* \brief Gets the offsets committed for the given topic/partition list with a timeout
286+
*
287+
* This translates into a call to rd_kafka_committed
288+
*
289+
* \param topic_partitions The topic/partition list to be queried
290+
*
291+
* \param timeout The timeout for this operation. Supersedes the default consumer timeout.
292+
*
293+
* \return The topic partition list
294+
*/
295+
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions,
296+
std::chrono::milliseconds timeout) const;
283297

284298
/**
285299
* \brief Gets the offset positions for the given topic/partition list

include/cppkafka/kafka_handle_base.h

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,20 @@ class CPPKAFKA_API KafkaHandleBase {
134134
* \return A pair of watermark offsets {low, high}
135135
*/
136136
OffsetTuple query_offsets(const TopicPartition& topic_partition) const;
137+
138+
/**
139+
* \brief Queries the offset for the given topic/partition with a given timeout
140+
*
141+
* This translates into a call to rd_kafka_query_watermark_offsets
142+
*
143+
* \param topic_partition The topic/partition to be queried
144+
*
145+
* \timeout The timeout for this operation. This supersedes the default handle timeout.
146+
*
147+
* \return A pair of watermark offsets {low, high}
148+
*/
149+
OffsetTuple query_offsets(const TopicPartition& topic_partition,
150+
std::chrono::milliseconds timeout) const;
137151

138152
/**
139153
* \brief Gets the rdkafka handle
@@ -177,6 +191,20 @@ class CPPKAFKA_API KafkaHandleBase {
177191
* \return The metadata
178192
*/
179193
Metadata get_metadata(bool all_topics = true) const;
194+
195+
/**
196+
* \brief Gets metadata for brokers, topics, partitions, etc with a timeout
197+
*
198+
* This translates into a call to rd_kafka_metadata
199+
*
200+
* \param all_topics Whether to fetch metadata about all topics or only locally known ones
201+
*
202+
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
203+
*
204+
* \return The metadata
205+
*/
206+
Metadata get_metadata(bool all_topics,
207+
std::chrono::milliseconds timeout) const;
180208

181209
/**
182210
* \brief Gets general metadata but only fetches metadata for the given topic rather than
@@ -189,6 +217,21 @@ class CPPKAFKA_API KafkaHandleBase {
189217
* \return The topic metadata
190218
*/
191219
TopicMetadata get_metadata(const Topic& topic) const;
220+
221+
/**
222+
* \brief Gets general metadata but only fetches metadata for the given topic rather than
223+
* all of them. Uses a timeout to limit the operation execution time.
224+
*
225+
* This translates into a call to rd_kafka_metadata
226+
*
227+
* \param topic The topic to fetch information for
228+
*
229+
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
230+
*
231+
* \return The topic metadata
232+
*/
233+
TopicMetadata get_metadata(const Topic& topic,
234+
std::chrono::milliseconds timeout) const;
192235

193236
/**
194237
* \brief Gets the consumer group information
@@ -198,13 +241,34 @@ class CPPKAFKA_API KafkaHandleBase {
198241
* \return The group information
199242
*/
200243
GroupInformation get_consumer_group(const std::string& name);
244+
245+
/**
246+
* \brief Gets the consumer group information with a timeout
247+
*
248+
* \param name The name of the consumer group to look up
249+
*
250+
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
251+
*
252+
* \return The group information
253+
*/
254+
GroupInformation get_consumer_group(const std::string& name,
255+
std::chrono::milliseconds timeout);
201256

202257
/**
203258
* \brief Gets all consumer groups
204259
*
205260
* \return A list of consumer groups
206261
*/
207262
GroupInformationList get_consumer_groups();
263+
264+
/**
265+
* \brief Gets all consumer groups with a timeout
266+
*
267+
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
268+
*
269+
* \return A list of consumer groups
270+
*/
271+
GroupInformationList get_consumer_groups(std::chrono::milliseconds timeout);
208272

209273
/**
210274
* \brief Gets topic/partition offsets based on timestamps
@@ -216,6 +280,20 @@ class CPPKAFKA_API KafkaHandleBase {
216280
* \return A topic partition list
217281
*/
218282
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;
283+
284+
/**
285+
* \brief Gets topic/partition offsets based on timestamps with a timeout
286+
*
287+
* This translates into a call to rd_kafka_offsets_for_times
288+
*
289+
* \param queries A map from topic/partition to the timestamp to be used
290+
*
291+
* \param timeout The timeout for this operation. This supersedes the default handle timeout.
292+
*
293+
* \return A topic partition list
294+
*/
295+
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries,
296+
std::chrono::milliseconds timeout) const;
219297

220298
/**
221299
* \brief Get the kafka handle name
@@ -283,8 +361,11 @@ class CPPKAFKA_API KafkaHandleBase {
283361
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
284362

285363
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
286-
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const;
287-
GroupInformationList fetch_consumer_groups(const char* name);
364+
Metadata get_metadata(bool all_topics,
365+
rd_kafka_topic_t* topic_ptr,
366+
std::chrono::milliseconds timeout) const;
367+
GroupInformationList fetch_consumer_groups(const char* name,
368+
std::chrono::milliseconds timeout);
288369
void save_topic_config(const std::string& topic_name, TopicConfiguration config);
289370

290371
std::chrono::milliseconds timeout_ms_;

src/consumer.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,15 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p
185185

186186
TopicPartitionList
187187
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const {
188+
return get_offsets_committed(topic_partitions, get_timeout());
189+
}
190+
191+
TopicPartitionList
192+
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions,
193+
milliseconds timeout) const {
188194
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
189195
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
190-
static_cast<int>(get_timeout().count()));
196+
static_cast<int>(timeout.count()));
191197
check_error(error, topic_list_handle.get());
192198
return convert(topic_list_handle);
193199
}

src/kafka_handle_base.cpp

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,24 +108,40 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config)
108108

109109
KafkaHandleBase::OffsetTuple
110110
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const {
111+
return query_offsets(topic_partition, timeout_ms_);
112+
}
113+
114+
KafkaHandleBase::OffsetTuple
115+
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition,
116+
milliseconds timeout) const {
111117
int64_t low;
112118
int64_t high;
113119
const string& topic = topic_partition.get_topic();
114120
const int partition = topic_partition.get_partition();
115-
const int timeout = static_cast<int>(timeout_ms_.count());
121+
const int timeout_ms = static_cast<int>(timeout.count());
116122
rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(),
117123
partition, &low, &high,
118-
timeout);
124+
timeout_ms);
119125
check_error(result);
120126
return make_tuple(low, high);
121127
}
122128

123129
Metadata KafkaHandleBase::get_metadata(bool all_topics) const {
124-
return get_metadata(all_topics, nullptr);
130+
return get_metadata(all_topics, nullptr, timeout_ms_);
131+
}
132+
133+
Metadata KafkaHandleBase::get_metadata(bool all_topics,
134+
milliseconds timeout) const {
135+
return get_metadata(all_topics, nullptr, timeout);
125136
}
126137

127138
TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
128-
Metadata md = get_metadata(false, topic.get_handle());
139+
return get_metadata(topic, timeout_ms_);
140+
}
141+
142+
TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic,
143+
milliseconds timeout) const {
144+
Metadata md = get_metadata(false, topic.get_handle(), timeout);
129145
auto topics = md.get_topics();
130146
if (topics.empty()) {
131147
throw ElementNotFound("topic metadata", topic.get_name());
@@ -134,29 +150,44 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
134150
}
135151

136152
GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
137-
auto result = fetch_consumer_groups(name.c_str());
153+
return get_consumer_group(name, timeout_ms_);
154+
}
155+
156+
GroupInformation KafkaHandleBase::get_consumer_group(const string& name,
157+
milliseconds timeout) {
158+
auto result = fetch_consumer_groups(name.c_str(), timeout);
138159
if (result.empty()) {
139160
throw ElementNotFound("consumer group information", name);
140161
}
141162
return move(result[0]);
142163
}
143164

144165
vector<GroupInformation> KafkaHandleBase::get_consumer_groups() {
145-
return fetch_consumer_groups(nullptr);
166+
return get_consumer_groups(timeout_ms_);
167+
}
168+
169+
vector<GroupInformation> KafkaHandleBase::get_consumer_groups(milliseconds timeout) {
170+
return fetch_consumer_groups(nullptr, timeout);
146171
}
147172

148173
TopicPartitionList
149174
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
175+
return get_offsets_for_times(queries, timeout_ms_);
176+
}
177+
178+
TopicPartitionList
179+
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries,
180+
milliseconds timeout) const {
150181
TopicPartitionList topic_partitions;
151182
for (const auto& query : queries) {
152183
const TopicPartition& topic_partition = query.first;
153184
topic_partitions.emplace_back(topic_partition.get_topic(), topic_partition.get_partition(),
154185
query.second.count());
155186
}
156187
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
157-
const int timeout = static_cast<int>(timeout_ms_.count());
188+
const int timeout_ms = static_cast<int>(timeout.count());
158189
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
159-
timeout);
190+
timeout_ms);
160191
check_error(result, topic_list_handle.get());
161192
return convert(topic_list_handle);
162193
}
@@ -193,19 +224,22 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf
193224
return Topic(topic);
194225
}
195226

196-
Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const {
227+
Metadata KafkaHandleBase::get_metadata(bool all_topics,
228+
rd_kafka_topic_t* topic_ptr,
229+
milliseconds timeout) const {
197230
const rd_kafka_metadata_t* metadata;
198-
const int timeout = static_cast<int>(timeout_ms_.count());
231+
const int timeout_ms = static_cast<int>(timeout.count());
199232
rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics,
200-
topic_ptr, &metadata, timeout);
233+
topic_ptr, &metadata, timeout_ms);
201234
check_error(error);
202235
return Metadata(metadata);
203236
}
204237

205-
vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name) {
238+
vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name,
239+
milliseconds timeout) {
206240
const rd_kafka_group_list* list = nullptr;
207-
const int timeout = static_cast<int>(timeout_ms_.count());
208-
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout);
241+
const int timeout_ms = static_cast<int>(timeout.count());
242+
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout_ms);
209243
check_error(result);
210244

211245
// Wrap this in a unique_ptr so it gets auto deleted

0 commit comments

Comments
 (0)