Skip to content

Commit d30c051

Browse files
rtaylor01yupliner
authored andcommitted
Created ConsumerCancelled event on ConsumerCancellation class to be fired when a consumer has been canced by the broker
1 parent 9035a9c commit d30c051

File tree

5 files changed

+40
-8
lines changed

5 files changed

+40
-8
lines changed

Source/EasyNetQ/Consumer/ConsumerCancellation.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1-
using System;
1+

2+
using System;
23

34
namespace EasyNetQ.Consumer
45
{
56
public class ConsumerCancellation : IDisposable
67
{
8+
9+
public delegate void ConsumerCancel(object queue);
10+
// This event will fire whenever this class is disposed. It is necessary in order to catch
11+
// when the consumer is canceled by the broker as well as the user.
12+
public event ConsumerCancel ConsumerCancelled;
13+
714
private readonly Action onCancellation;
815

916
public ConsumerCancellation(Action onCancellation)
@@ -17,5 +24,10 @@ public void Dispose()
1724
{
1825
onCancellation();
1926
}
27+
28+
public void OnCancel(object queue)
29+
{
30+
ConsumerCancelled?.Invoke(queue);
31+
}
2032
}
2133
}

Source/EasyNetQ/Consumer/ExclusiveConsumer.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public class ExclusiveConsumer : IConsumer
2727

2828
private readonly IList<IDisposable> disposables = new List<IDisposable>();
2929

30+
private ConsumerCancellation consumerCancellation;
31+
3032
public ExclusiveConsumer(
3133
IQueue queue,
3234
Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage,
@@ -58,8 +60,9 @@ public IDisposable StartConsuming()
5860
disposables.Add(Timers.Start(s => StartConsumingInternal(), RestartConsumingPeriod, RestartConsumingPeriod));
5961

6062
StartConsumingInternal();
61-
62-
return new ConsumerCancellation(Dispose);
63+
64+
consumerCancellation = new ConsumerCancellation(Dispose);
65+
return consumerCancellation;
6366
}
6467

6568
private void StartConsumingInternal()
@@ -115,7 +118,9 @@ public void Dispose()
115118
return;
116119

117120
disposed = true;
118-
121+
122+
consumerCancellation.OnCancel(queue);
123+
119124
eventBus.Publish(new StoppedConsumingEvent(this));
120125

121126
foreach (var disposal in disposables)

Source/EasyNetQ/Consumer/PersistentConsumer.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public class PersistentConsumer : IConsumer
2222

2323
private readonly IList<IDisposable> subscriptions = new List<IDisposable>();
2424

25+
private ConsumerCancellation consumerCancellation;
26+
2527
public PersistentConsumer(
2628
IQueue queue,
2729
Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage,
@@ -52,7 +54,8 @@ public IDisposable StartConsuming()
5254

5355
StartConsumingInternal();
5456

55-
return new ConsumerCancellation(Dispose);
57+
consumerCancellation = new ConsumerCancellation(Dispose);
58+
return consumerCancellation;
5659
}
5760

5861
private void StartConsumingInternal()
@@ -101,6 +104,8 @@ public void Dispose()
101104

102105
disposed = true;
103106

107+
consumerCancellation.OnCancel(queue);
108+
104109
eventBus.Publish(new StoppedConsumingEvent(this));
105110

106111
foreach (var subscription in subscriptions)

Source/EasyNetQ/Consumer/PersistentMultipleConsumer.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public class PersistentMultipleConsumer : IConsumer
2323

2424
private readonly IList<IDisposable> subscriptions = new List<IDisposable>();
2525

26+
private ConsumerCancellation consumerCancellation;
27+
2628
public PersistentMultipleConsumer(
2729
ICollection<Tuple<IQueue, Func<byte[], MessageProperties, MessageReceivedInfo, Task>>> queueConsumerPairs,
2830
IPersistentConnection connection,
@@ -50,7 +52,8 @@ public IDisposable StartConsuming()
5052

5153
StartConsumingInternal();
5254

53-
return new ConsumerCancellation(Dispose);
55+
consumerCancellation = new ConsumerCancellation(Dispose);
56+
return consumerCancellation;
5457
}
5558

5659
private void StartConsumingInternal()
@@ -92,7 +95,9 @@ public void Dispose()
9295
if (disposed) return;
9396

9497
disposed = true;
95-
98+
99+
consumerCancellation.OnCancel(queueConsumerPairs);
100+
96101
eventBus.Publish(new StoppedConsumingEvent(this));
97102

98103
foreach (var subscription in subscriptions)

Source/EasyNetQ/Consumer/TransientConsumer.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public class TransientConsumer : IConsumer
1616

1717
private IInternalConsumer internalConsumer;
1818

19+
private ConsumerCancellation consumerCancellation;
20+
1921
public TransientConsumer(
2022
IQueue queue,
2123
Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage,
@@ -58,7 +60,8 @@ public IDisposable StartConsuming()
5860
else
5961
eventBus.Publish(new StartConsumingFailedEvent(this, queue));
6062

61-
return new ConsumerCancellation(Dispose);
63+
consumerCancellation = new ConsumerCancellation(Dispose);
64+
return consumerCancellation;
6265
}
6366

6467
private bool disposed;
@@ -68,6 +71,8 @@ public void Dispose()
6871
if (disposed) return;
6972
disposed = true;
7073

74+
consumerCancellation.OnCancel(queue);
75+
7176
eventBus.Publish(new StoppedConsumingEvent(this));
7277

7378
internalConsumer?.Dispose();

0 commit comments

Comments
 (0)