@@ -24,7 +24,7 @@ public KafkaConsumerClient(string groupId, KafkaOptions options)
2424
2525 public event EventHandler < MessageContext > OnMessageReceived ;
2626
27- public event EventHandler < string > OnError ;
27+ public event EventHandler < LogMessageEventArgs > OnLog ;
2828
2929 public void Subscribe ( IEnumerable < string > topics )
3030 {
@@ -34,7 +34,6 @@ public void Subscribe(IEnumerable<string> topics)
3434 if ( _consumerClient == null )
3535 InitKafkaClient ( ) ;
3636
37- //_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0)));
3837 _consumerClient . Subscribe ( topics ) ;
3938 }
4039
@@ -76,12 +75,17 @@ private void InitKafkaClient()
7675 _consumerClient . OnError += ConsumerClient_OnError ;
7776 }
7877
78+
7979 private void ConsumerClient_OnConsumeError ( object sender , Message e )
8080 {
8181 var message = e . Deserialize < Null , string > ( null , StringDeserializer ) ;
82-
83- OnError ? . Invoke ( sender , $ "An error occurred during consume the message; Topic:'{ e . Topic } '," +
84- $ "Message:'{ message . Value } ', Reason:'{ e . Error } '.") ;
82+ var logArgs = new LogMessageEventArgs
83+ {
84+ LogType = MqLogType . ConsumeError ,
85+ Reason = $ "An error occurred during consume the message; Topic:'{ e . Topic } '," +
86+ $ "Message:'{ message . Value } ', Reason:'{ e . Error } '."
87+ } ;
88+ OnLog ? . Invoke ( sender , logArgs ) ;
8589 }
8690
8791 private void ConsumerClient_OnMessage ( object sender , Message < Null , string > e )
@@ -98,7 +102,12 @@ private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
98102
99103 private void ConsumerClient_OnError ( object sender , Error e )
100104 {
101- OnError ? . Invoke ( sender , e . ToString ( ) ) ;
105+ var logArgs = new LogMessageEventArgs
106+ {
107+ LogType = MqLogType . ServerConnError ,
108+ Reason = e . ToString ( )
109+ } ;
110+ OnLog ? . Invoke ( sender , logArgs ) ;
102111 }
103112
104113 #endregion private methods
0 commit comments