@@ -205,6 +205,8 @@ class KafkaConsumer: public KafkaClient
205205 * Returns the polled records.
206206 * Note: 1) The result could be fetched through ConsumerRecord (with member function `error`).
207207 * 2) Make sure the `ConsumerRecord` be destructed before the `KafkaConsumer.close()`.
208+ * Throws KafkaException with errors:
209+ * - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Unknow partition
208210 */
209211 std::vector<consumer::ConsumerRecord> poll (std::chrono::milliseconds timeout);
210212
@@ -213,6 +215,8 @@ class KafkaConsumer: public KafkaClient
213215 * Returns the number of polled records (which have been saved into parameter `output`).
214216 * Note: 1) The result could be fetched through ConsumerRecord (with member function `error`).
215217 * 2) Make sure the `ConsumerRecord` be destructed before the `KafkaConsumer.close()`.
218+ * Throws KafkaException with errors:
219+ * - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Unknow partition
216220 */
217221 std::size_t poll (std::chrono::milliseconds timeout, std::vector<consumer::ConsumerRecord>& output);
218222
@@ -286,8 +290,8 @@ class KafkaConsumer: public KafkaClient
286290
287291 std::string _groupId;
288292
289- unsigned int _maxPollRecords = 500 ; // From "max.poll.records" property, and here is the default for batch-poll
290- bool _enableAutoCommit = false ; // From "enable.auto.commit" property
293+ std:: size_t _maxPollRecords = 500 ; // From "max.poll.records" property, and here is the default for batch-poll
294+ bool _enableAutoCommit = false ; // From "enable.auto.commit" property
291295
292296 rd_kafka_queue_unique_ptr _rk_queue;
293297
@@ -379,7 +383,7 @@ KafkaConsumer::KafkaConsumer(const Properties &properties, EventsPollingOption e
379383 if (auto maxPollRecordsProperty = properties.getProperty (consumer::Config::MAX_POLL_RECORDS))
380384 {
381385 const std::string maxPollRecords = *maxPollRecordsProperty;
382- _maxPollRecords = std::stoi (maxPollRecords);
386+ _maxPollRecords = static_cast < std::size_t >( std:: stoi (maxPollRecords) );
383387 }
384388 _properties.put (consumer::Config::MAX_POLL_RECORDS, std::to_string (_maxPollRecords));
385389
@@ -812,11 +816,15 @@ KafkaConsumer::pollMessages(int timeoutMs, std::vector<consumer::ConsumerRecord>
812816
813817 // Poll messages with librdkafka's API
814818 std::vector<rd_kafka_message_t *> msgPtrArray (_maxPollRecords);
815- std::size_t msgReceived = rd_kafka_consume_batch_queue (_rk_queue.get (), timeoutMs, msgPtrArray.data (), _maxPollRecords);
819+ auto msgReceived = rd_kafka_consume_batch_queue (_rk_queue.get (), timeoutMs, msgPtrArray.data (), _maxPollRecords);
820+ if (msgReceived < 0 )
821+ {
822+ KAFKA_THROW_ERROR (Error (rd_kafka_last_error ()));
823+ }
816824
817825 // Wrap messages with ConsumerRecord
818826 output.clear ();
819- output.reserve (msgReceived);
827+ output.reserve (static_cast <std:: size_t >( msgReceived) );
820828 std::for_each (msgPtrArray.begin (), msgPtrArray.begin () + msgReceived, [&output](rd_kafka_message_t * rkMsg) { output.emplace_back (rkMsg); });
821829
822830 // Store the offsets for all these polled messages (for "enable.auto.commit=true" case)
0 commit comments