Skip to content

Commit ab98674

Browse files
author
jdillinger
committed
feat kafka: fix negative ms_duration values
commit_hash:6f5d6eeabbfce6d91bb5632dd7dc675cf1140ad1
1 parent 5315214 commit ab98674

File tree

1 file changed

+3
-14
lines changed

1 file changed

+3
-14
lines changed

kafka/src/kafka/impl/consumer_impl.cpp

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -681,21 +681,10 @@ void ConsumerImpl::AccountPolledMessageStat(const Message& polled_message) {
681681
const auto message_timestamp = polled_message.GetTimestamp();
682682
if (message_timestamp) {
683683
const auto take_time = std::chrono::system_clock::now().time_since_epoch();
684-
const auto ms_duration =
684+
auto ms_duration =
685685
std::chrono::duration_cast<std::chrono::milliseconds>(take_time - message_timestamp.value()).count();
686-
// 1 day to ms
687-
if (ms_duration > 60 * 60 * 24 * 1000 || ms_duration < 0) {
688-
LOG_WARNING(
689-
"Erroneous wait time for a message in a topic `{}` partition `{}` offset `{}` key `{}` "
690-
"ms_duration `{}`ms take_time `{}`ns message_time `{}`ms",
691-
polled_message.GetTopic(),
692-
polled_message.GetPartition(),
693-
polled_message.GetOffset(),
694-
polled_message.GetKey(),
695-
ms_duration,
696-
take_time.count(),
697-
message_timestamp.value().count()
698-
);
686+
if (ms_duration < 0) {
687+
ms_duration = 0;
699688
}
700689
topic_stats->avg_ms_spent_time.GetCurrentCounter().Account(ms_duration);
701690
} else {

0 commit comments

Comments
 (0)