77using RabbitMQ . Client . Events ;
88using System ;
99using System . Collections . Generic ;
10+ using System . Linq ;
1011using System . Threading ;
1112using System . Threading . Tasks ;
1213
1314namespace EventMesh . Runtime . AMQP
1415{
1516 public class AMQPConsumer : IMessageConsumer
1617 {
17- private readonly List < string > _subscribedTopics = new List < string > ( ) ;
18+ private readonly List < AMQPSubscriptionRecord > _subscriptions = new List < AMQPSubscriptionRecord > ( ) ;
1819 private readonly IBrokerConfigurationStore _brokerConfigurationStore ;
1920 private readonly AMQPOptions _opts ;
2021 private readonly IClientStore _clientStore ;
2122 private readonly RuntimeOptions _runtimeOpts ;
22- private static object _obj = new object ( ) ;
2323 private IConnection _connection ;
2424
2525 public event EventHandler < CloudEventArgs > CloudEventReceived ;
@@ -38,6 +38,14 @@ public AMQPConsumer(
3838
3939 #region Actions
4040
41+ public string BrokerName
42+ {
43+ get
44+ {
45+ return _opts . BrokerName ;
46+ }
47+ }
48+
4149 public Task Start ( CancellationToken cancellationToken )
4250 {
4351 var options = GetOptions ( ) ;
@@ -75,7 +83,7 @@ public Task Subscribe(string topicName, Client client, string sessionId, Cancell
7583 Task . Run ( ( ) =>
7684 {
7785 Thread . Sleep ( _runtimeOpts . WaitLocalSubscriptionIntervalMS ) ;
78- ListenTopic ( options , topicName , topic ) ;
86+ ListenTopic ( options , topicName , topic , client . ClientId , activeSession . Id ) ;
7987 } ) ;
8088 activeSession . SubscribeTopic ( topicName , options . BrokerName ) ;
8189 return Task . CompletedTask ;
@@ -90,7 +98,9 @@ public Task Unsubscribe(string topicName, Client client, string sessionId, Cance
9098 return Task . CompletedTask ;
9199 }
92100
93- activeSession . UnsubscribeTopic ( topicName , options . BrokerName ) ;
101+ var subscription = _subscriptions . First ( s => s . ClientSessionId == sessionId && s . ClientId == client . ClientId && s . TopicName == topicName ) ;
102+ subscription . Channel . BasicCancel ( subscription . ConsumerTag ) ;
103+ _subscriptions . Remove ( subscription ) ;
94104 return Task . CompletedTask ;
95105 }
96106
@@ -101,11 +111,11 @@ public void Dispose()
101111
102112 #endregion
103113
104- private IModel ListenTopic ( AMQPOptions options , string topicName , ClientTopic topic )
114+ private void ListenTopic ( AMQPOptions options , string topicName , ClientTopic topic , string clientId , string clientSessionId )
105115 {
106- if ( _subscribedTopics . Contains ( topicName ) )
116+ if ( _subscriptions . Any ( s => s . BrokerName == options . BrokerName && s . TopicName == topicName && s . ClientSessionId == clientSessionId && s . ClientId == clientId ) )
107117 {
108- return null ;
118+ return ;
109119 }
110120
111121 var channel = _connection . CreateModel ( ) ;
@@ -120,28 +130,26 @@ private IModel ListenTopic(AMQPOptions options, string topicName, ClientTopic to
120130 } ) ;
121131 channel . QueueBind ( queue , options . TopicName , topicName ) ;
122132 var consumer = new EventingBasicConsumer ( channel ) ;
123- consumer . Received += ( sender , e ) => ReceiveMessage ( sender , topicName , options . Source , options . BrokerName , e ) ;
133+ consumer . Received += ( sender , e ) => ReceiveMessage ( sender , clientId , clientSessionId , topicName , options . Source , options . BrokerName , e ) ;
124134 // TODO : Update BasicQos.
125135 channel . BasicQos ( 0 , 100 , false ) ;
126- channel . BasicConsume ( queue , false , string . Empty , new Dictionary < string , object > { { "x-stream-offset" , topic . Offset } } , consumer ) ;
127- _subscribedTopics . Add ( topicName ) ;
128- return channel ;
136+ var tag = channel . BasicConsume ( queue , false , string . Empty , new Dictionary < string , object > { { "x-stream-offset" , topic . Offset } } , consumer ) ;
137+ _subscriptions . Add ( new AMQPSubscriptionRecord ( topicName , options . BrokerName , clientId , clientSessionId , channel , tag ) ) ;
129138 }
130139
131- private void ReceiveMessage ( object sender , string topicName , string source , string brokerName , BasicDeliverEventArgs e )
140+ private void ReceiveMessage ( object sender , string clientId , string clientSessionId , string topicName , string source , string brokerName , BasicDeliverEventArgs e )
132141 {
133- lock ( _obj )
142+ var jsonEventFormatter = new JsonEventFormatter ( ) ;
143+ var model = ( sender as EventingBasicConsumer ) . Model ;
144+ var cloudEvent = e . ToCloudEvent ( jsonEventFormatter , source , topicName ) ;
145+ var client = _clientStore . GetByActiveSession ( clientId , clientSessionId ) ;
146+ if ( client == null )
134147 {
135- var jsonEventFormatter = new JsonEventFormatter ( ) ;
136- var model = ( sender as EventingBasicConsumer ) . Model ;
137- var cloudEvent = e . ToCloudEvent ( jsonEventFormatter , source , topicName ) ;
138- var activeClients = _clientStore . GetAllBySubscribedTopics ( brokerName , topicName ) ;
139- foreach ( var client in activeClients )
140- {
141- var clientSession = client . GetActiveSessionByTopic ( brokerName , topicName ) ;
142- CloudEventReceived ( this , new CloudEventArgs ( topicName , brokerName , cloudEvent , client . ClientId , clientSession ) ) ;
143- }
148+ return ;
144149 }
150+
151+ var clientSession = client . GetActiveSessionByTopic ( brokerName , topicName ) ;
152+ CloudEventReceived ( this , new CloudEventArgs ( topicName , brokerName , cloudEvent , client . ClientId , clientSession ) ) ;
145153 }
146154
147155 private AMQPOptions GetOptions ( )
0 commit comments