-
Notifications
You must be signed in to change notification settings - Fork 881
Open
Description
I've been trying to find out what is happening with the code which consumes from the same topic. Looking through previous conversations, I know I need to +1 the Offset if O'm passing a list of commits to the client. That helps, but I still have a few partitions that still report a lag with Code 1 below compared to Code 2.
Code 1 - Always has a lag, sometimes more than 1
foreach (var batch in messages)
{
if (!cancellationToken.IsCancellationRequested)
{
var batchMessage = System.Text.Json.JsonSerializer.Deserialize<KafkaLogWrapper<List<TaskModel>>>(batch.Message.Value);
await ProcessBatchMessage(new List<KafkaLogWrapper<List<TaskModel>>>() { batchMessage },cancellationToken);
}
}
_consumer.Commit(messages.Select(x=> new TopicPartitionOffset(x.TopicPartition, x.Offset + 1, x.LeaderEpoch )));
//commit across all partitions, add +1 to mirror single commit from kafka clientCode 2 - Doesn't have a lag.
foreach (var batch in messages)
{
if (!cancellationToken.IsCancellationRequested)
{
var batchMessage = System.Text.Json.JsonSerializer.Deserialize<KafkaLogWrapper<List<TaskModel>>>(batch.Message.Value);
await ProcessBatchMessage(new List<KafkaLogWrapper<List<TaskModel>>>() { batchMessage },cancellationToken);
_consumer.Commit(batch); //commit this message
}
} I don't know what could cause the log or what logging I should add to resolve this? I feel like I'm missing some understanding of how the commits work that is causing this and making our metrics look off when the consumer reports null consume calls showing it has nothing to process.
Metadata
Metadata
Assignees
Labels
No labels