Skip to content

Commit 0b15a1d

Browse files
author
jdillinger
committed
feat kafka: adding a log for problematic messages
commit_hash:830e6e8e7f11e05fe8f967c0174ebac30184a039
1 parent c7b7dd4 commit 0b15a1d

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

kafka/src/kafka/impl/consumer_impl.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,20 @@ void ConsumerImpl::AccountPolledMessageStat(const Message& polled_message) {
666666
const auto take_time = std::chrono::system_clock::now().time_since_epoch();
667667
const auto ms_duration =
668668
std::chrono::duration_cast<std::chrono::milliseconds>(take_time - message_timestamp.value()).count();
669+
// 1 day to ms
670+
if (ms_duration > 60 * 60 * 24 * 1000 || ms_duration < 0) {
671+
LOG_WARNING(
672+
"Erroneous wait time for a message in a topic `{}` partition `{}` offset `{}` key `{}` "
673+
"ms_duration `{}`ms take_time `{}`ns message_time `{}`ms",
674+
polled_message.GetTopic(),
675+
polled_message.GetPartition(),
676+
polled_message.GetOffset(),
677+
polled_message.GetKey(),
678+
ms_duration,
679+
take_time.count(),
680+
message_timestamp.value().count()
681+
);
682+
}
669683
topic_stats->avg_ms_spent_time.GetCurrentCounter().Account(ms_duration);
670684
} else {
671685
LOG_WARNING(

0 commit comments

Comments
 (0)