Skip to content

Commit 0c73f8d

Browse files
committed
refactor MQ log message
1 parent 365fbe6 commit 0c73f8d

File tree

1 file changed

+52
-6
lines changed

1 file changed

+52
-6
lines changed

src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
using System.Collections.Generic;
33
using System.Text;
44
using System.Threading;
5-
using System.Threading.Tasks;
65
using RabbitMQ.Client;
76
using RabbitMQ.Client.Events;
87

@@ -32,7 +31,7 @@ public RabbitMQConsumerClient(string queueName,
3231

3332
public event EventHandler<MessageContext> OnMessageReceived;
3433

35-
public event EventHandler<string> OnError;
34+
public event EventHandler<LogMessageEventArgs> OnLog;
3635

3736
public void Subscribe(IEnumerable<string> topics)
3837
{
@@ -47,9 +46,17 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
4746
var consumer = new EventingBasicConsumer(_channel);
4847
consumer.Received += OnConsumerReceived;
4948
consumer.Shutdown += OnConsumerShutdown;
49+
consumer.Registered += OnConsumerRegistered;
50+
consumer.Unregistered += OnConsumerUnregistered;
51+
consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
52+
5053
_channel.BasicConsume(_queueName, false, consumer);
54+
5155
while (true)
52-
Task.Delay(timeout, cancellationToken).GetAwaiter().GetResult();
56+
{
57+
cancellationToken.ThrowIfCancellationRequested();
58+
cancellationToken.WaitHandle.WaitOne(timeout);
59+
}
5360
}
5461

5562
public void Commit()
@@ -69,9 +76,9 @@ public void Dispose()
6976

7077
private void InitClient()
7178
{
72-
var connection = _connectionChannelPool.GetConnection();
79+
var _connection = _connectionChannelPool.GetConnection();
7380

74-
_channel = connection.CreateModel();
81+
_channel = _connection.CreateModel();
7582

7683
_channel.ExchangeDeclare(
7784
_exchageName,
@@ -84,6 +91,38 @@ private void InitClient()
8491
_channel.QueueDeclare(_queueName, true, false, false, arguments);
8592
}
8693

94+
#region events
95+
96+
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
97+
{
98+
var args = new LogMessageEventArgs
99+
{
100+
LogType = MqLogType.ConsumerCancelled,
101+
Reason = e.ConsumerTag
102+
};
103+
OnLog?.Invoke(sender, args);
104+
}
105+
106+
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
107+
{
108+
var args = new LogMessageEventArgs
109+
{
110+
LogType = MqLogType.ConsumerUnregistered,
111+
Reason = e.ConsumerTag
112+
};
113+
OnLog?.Invoke(sender, args);
114+
}
115+
116+
private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
117+
{
118+
var args = new LogMessageEventArgs
119+
{
120+
LogType = MqLogType.ConsumerRegistered,
121+
Reason = e.ConsumerTag
122+
};
123+
OnLog?.Invoke(sender, args);
124+
}
125+
87126
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
88127
{
89128
_deliveryTag = e.DeliveryTag;
@@ -98,7 +137,14 @@ private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
98137

99138
private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
100139
{
101-
OnError?.Invoke(sender, e.Cause?.ToString());
140+
var args = new LogMessageEventArgs
141+
{
142+
LogType = MqLogType.ConsumerShutdown,
143+
Reason = e.ToString()
144+
};
145+
OnLog?.Invoke(sender, args);
102146
}
147+
148+
#endregion
103149
}
104150
}

0 commit comments

Comments
 (0)