Skip to content

Commit 2288025

Browse files
Fixes kafka consume excepiton for GroupLoadInProress errcode (#1085)
* Fixes and add options of kafka consume excepiton for GroupLoadInProress errcode. #1084 * clean code * Add kafka consume exception logs.
1 parent 0b81a18 commit 2288025

File tree

4 files changed

+32
-3
lines changed

4 files changed

+32
-3
lines changed

src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ public class KafkaOptions
2424
public KafkaOptions()
2525
{
2626
MainConfig = new Dictionary<string, string>();
27+
RetriableErrorCodes = new List<ErrorCode>
28+
{
29+
ErrorCode.GroupLoadInProress
30+
};
2731
}
2832

2933
/// <summary>
@@ -43,5 +47,10 @@ public KafkaOptions()
4347
/// If you need to get offset and partition and so on.., you can use this function to write additional header into <see cref="CapHeader"/>
4448
/// </summary>
4549
public Func<ConsumeResult<string, byte[]>, List<KeyValuePair<string, string>>>? CustomHeaders { get; set; }
50+
51+
/// <summary>
52+
/// New retriable error code (refer to https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafkacpp_8h.html#a4c6b7af48c215724c323c60ea4080dbf)
53+
/// </summary>
54+
public IList<ErrorCode> RetriableErrorCodes { get; set; }
4655
}
4756
}

src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.Kafka
1717
{
1818
public class KafkaConsumerClient : IConsumerClient
1919
{
20-
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
20+
private static readonly SemaphoreSlim ConnectionLock = new(initialCount: 1, maxCount: 1);
2121

2222
private readonly string _groupId;
2323
private readonly KafkaOptions _kafkaOptions;
@@ -33,7 +33,7 @@ public KafkaConsumerClient(string groupId, IOptions<KafkaOptions> options)
3333

3434
public event EventHandler<LogMessageEventArgs>? OnLog;
3535

36-
public BrokerAddress BrokerAddress => new ("Kafka", _kafkaOptions.Servers);
36+
public BrokerAddress BrokerAddress => new("Kafka", _kafkaOptions.Servers);
3737

3838
public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
3939
{
@@ -89,7 +89,23 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
8989

9090
while (true)
9191
{
92-
var consumerResult = _consumerClient!.Consume(cancellationToken);
92+
ConsumeResult<string, byte[]> consumerResult;
93+
94+
try
95+
{
96+
consumerResult = _consumerClient!.Consume(cancellationToken);
97+
}
98+
catch (ConsumeException e) when (_kafkaOptions.RetriableErrorCodes.Contains(e.Error.Code))
99+
{
100+
var logArgs = new LogMessageEventArgs
101+
{
102+
LogType = MqLogType.ConsumeRetries,
103+
Reason = e.Error.ToString()
104+
};
105+
OnLog?.Invoke(null, logArgs);
106+
107+
continue;
108+
}
93109

94110
if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue;
95111

src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ private void WriteLog(object sender, LogMessageEventArgs logmsg)
304304
case MqLogType.ConsumeError:
305305
_logger.LogError("Kafka client consume error. --> " + logmsg.Reason);
306306
break;
307+
case MqLogType.ConsumeRetries:
308+
_logger.LogWarning("Kafka client consume exception, retying... --> " + logmsg.Reason);
309+
break;
307310
case MqLogType.ServerConnError:
308311
_isHealthy = false;
309312
_logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason);

src/DotNetCore.CAP/Transport/MqLogType.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public enum MqLogType
1515

1616
//Kafka
1717
ConsumeError,
18+
ConsumeRetries,
1819
ServerConnError,
1920

2021
//AzureServiceBus

0 commit comments

Comments
 (0)