Skip to content

Commit 365fbe6

Browse files
committed
refactor mq log
1 parent ac21368 commit 365fbe6

File tree

2 files changed

+41
-15
lines changed

2 files changed

+41
-15
lines changed

src/DotNetCore.CAP/IConsumerClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ public interface IConsumerClient : IDisposable
3333

3434
event EventHandler<MessageContext> OnMessageReceived;
3535

36-
event EventHandler<string> OnError;
36+
event EventHandler<LogMessageEventArgs> OnLog;
3737
}
3838
}

src/DotNetCore.CAP/IConsumerHandler.Default.cs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ internal class ConsumerHandler : IConsumerHandler
2323
private readonly IServiceProvider _serviceProvider;
2424

2525
private Task _compositeTask;
26+
2627
private bool _disposed;
2728

2829
public ConsumerHandler(
@@ -44,31 +45,29 @@ public void Start()
4445

4546
foreach (var matchGroup in groupingMatches)
4647
Task.Factory.StartNew(() =>
47-
{
48-
using (var client = _consumerClientFactory.Create(matchGroup.Key))
49-
{
50-
RegisterMessageProcessor(client);
48+
{
49+
using (var client = _consumerClientFactory.Create(matchGroup.Key))
50+
{
51+
RegisterMessageProcessor(client);
5152

52-
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
53+
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
5354

54-
client.Listening(_pollingDelay, _cts.Token);
55-
}
56-
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
57-
_compositeTask = Task.CompletedTask;
55+
client.Listening(_pollingDelay, _cts.Token);
56+
}
57+
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
58+
59+
_compositeTask = Task.CompletedTask;
5860
}
5961

6062
public void Dispose()
6163
{
6264
if (_disposed)
6365
return;
6466
_disposed = true;
65-
66-
_logger.ServerShuttingDown();
6767
_cts.Cancel();
68-
6968
try
7069
{
71-
_compositeTask.Wait(TimeSpan.FromSeconds(10));
70+
_compositeTask.Wait(TimeSpan.FromSeconds(2));
7271
}
7372
catch (AggregateException ex)
7473
{
@@ -105,7 +104,34 @@ private void RegisterMessageProcessor(IConsumerClient client)
105104
Pulse();
106105
};
107106

108-
client.OnError += (sender, reason) => { _logger.MessageQueueError(reason); };
107+
client.OnLog += WriteLog;
108+
}
109+
110+
private void WriteLog(object sender, LogMessageEventArgs logmsg)
111+
{
112+
switch (logmsg.LogType)
113+
{
114+
case MqLogType.ConsumerCancelled:
115+
_logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason);
116+
break;
117+
case MqLogType.ConsumerRegistered:
118+
_logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason);
119+
break;
120+
case MqLogType.ConsumerUnregistered:
121+
_logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason);
122+
break;
123+
case MqLogType.ConsumerShutdown:
124+
_logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason);
125+
break;
126+
case MqLogType.ConsumeError:
127+
_logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason);
128+
break;
129+
case MqLogType.ServerConnError:
130+
_logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason);
131+
break;
132+
default:
133+
throw new ArgumentOutOfRangeException();
134+
}
109135
}
110136

111137
private static void StoreMessage(IServiceScope serviceScope, MessageContext messageContext)

0 commit comments

Comments
 (0)