File tree Expand file tree Collapse file tree 1 file changed +11
-5
lines changed Expand file tree Collapse file tree 1 file changed +11
-5
lines changed Original file line number Diff line number Diff line change @@ -132,16 +132,22 @@ ConsumeMessagesUntilTimeout(Kafka::KafkaConsumer& consumer,
132132 do
133133 {
134134 auto polled = consumer.poll (POLL_INTERVAL);
135+
136+ for (const auto & record: polled)
137+ {
138+ auto errorCode = record.error ();
139+ if (errorCode && errorCode.value () != RD_KAFKA_RESP_ERR__PARTITION_EOF)
140+ {
141+ std::cerr << " [" << Kafka::Utility::getCurrentTime () << " ] met error[" << errorCode.value () << " |" << errorCode.message () << " ] while polling messages!" << std::endl;
142+ EXPECT_FALSE (errorCode);
143+ }
144+ }
145+
135146 records.insert (records.end (), std::make_move_iterator (polled.begin ()), std::make_move_iterator (polled.end ()));
136147 } while (std::chrono::steady_clock::now () < end);
137148
138149 std::cout << " [" << Kafka::Utility::getCurrentTime () << " ] " << consumer.name () << " polled " << records.size () << " messages" << std::endl;
139150
140- EXPECT_TRUE (std::none_of (records.cbegin (), records.cend (),
141- [](const auto & record) {
142- return record.error () && record.error ().value () != RD_KAFKA_RESP_ERR__PARTITION_EOF;
143- }));
144-
145151 return records;
146152}
147153
You can’t perform that action at this time.
0 commit comments