Skip to content

Commit f9d2466

Browse files
Ticket #57 : Add new UI + fix some issues
1 parent 1d0f198 commit f9d2466

39 files changed

+481
-253
lines changed

src/EventMesh.Runtime.AMQP/AMQPConsumer.cs

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,33 @@
77
using RabbitMQ.Client.Events;
88
using System;
99
using System.Collections.Generic;
10-
using System.Linq;
11-
using System.Net;
1210
using System.Threading;
1311
using System.Threading.Tasks;
1412

1513
namespace EventMesh.Runtime.AMQP
1614
{
1715
public class AMQPConsumer : IMessageConsumer
1816
{
19-
private readonly List<AMQPSubscriptionRecord> _records;
17+
private readonly List<string> _subscribedTopics = new List<string>();
2018
private readonly IBrokerConfigurationStore _brokerConfigurationStore;
2119
private readonly AMQPOptions _opts;
20+
private readonly IClientStore _clientStore;
21+
private readonly RuntimeOptions _runtimeOpts;
22+
private static object _obj = new object();
2223
private IConnection _connection;
2324

2425
public event EventHandler<CloudEventArgs> CloudEventReceived;
2526

2627
public AMQPConsumer(
2728
IBrokerConfigurationStore brokerConfigurationStore,
28-
IOptions<AMQPOptions> opts)
29+
IClientStore clientStore,
30+
IOptions<AMQPOptions> opts,
31+
IOptions<RuntimeOptions> runtimeOpts)
2932
{
30-
_records = new List<AMQPSubscriptionRecord>();
3133
_opts = opts.Value;
3234
_brokerConfigurationStore = brokerConfigurationStore;
35+
_clientStore = clientStore;
36+
_runtimeOpts = runtimeOpts.Value;
3337
}
3438

3539
#region Actions
@@ -62,9 +66,18 @@ public Task Subscribe(string topicName, Client client, string sessionId, Cancell
6266
return Task.CompletedTask;
6367
}
6468

65-
var channel = ListenTopic(options, topicName, client);
69+
var topic = client.GetTopic(topicName, options.BrokerName);
70+
if (topic == null)
71+
{
72+
topic = client.AddTopic(topicName, options.BrokerName);
73+
}
74+
75+
Task.Run(() =>
76+
{
77+
Thread.Sleep(_runtimeOpts.WaitLocalSubscriptionIntervalMS);
78+
ListenTopic(options, topicName, topic);
79+
});
6680
activeSession.SubscribeTopic(topicName, options.BrokerName);
67-
_records.Add(new AMQPSubscriptionRecord(channel, client.ClientId, activeSession, topicName));
6881
return Task.CompletedTask;
6982
}
7083

@@ -77,14 +90,6 @@ public Task Unsubscribe(string topicName, Client client, string sessionId, Cance
7790
return Task.CompletedTask;
7891
}
7992

80-
var subscription = _records.FirstOrDefault(r => r.ClientSession.Equals(activeSession) && r.TopicName == topicName);
81-
if (subscription == null)
82-
{
83-
return Task.CompletedTask;
84-
}
85-
86-
subscription.Model.Dispose();
87-
_records.Remove(subscription);
8893
activeSession.UnsubscribeTopic(topicName, options.BrokerName);
8994
return Task.CompletedTask;
9095
}
@@ -96,12 +101,11 @@ public void Dispose()
96101

97102
#endregion
98103

99-
private IModel ListenTopic(AMQPOptions options, string topicName, Client client)
104+
private IModel ListenTopic(AMQPOptions options, string topicName, ClientTopic topic)
100105
{
101-
var topic = client.GetTopic(topicName, options.BrokerName);
102-
if (topic == null)
106+
if(_subscribedTopics.Contains(topicName))
103107
{
104-
topic = client.AddTopic(topicName, options.BrokerName);
108+
return null;
105109
}
106110

107111
var channel = _connection.CreateModel();
@@ -120,18 +124,23 @@ private IModel ListenTopic(AMQPOptions options, string topicName, Client client)
120124
// TODO : Update BasicQos.
121125
channel.BasicQos(0, 100, false);
122126
channel.BasicConsume(queue, false, string.Empty, new Dictionary<string, object> { { "x-stream-offset", topic.Offset } }, consumer);
127+
_subscribedTopics.Add(topicName);
123128
return channel;
124129
}
125130

126131
private void ReceiveMessage(object sender, string topicName, string source, string brokerName, BasicDeliverEventArgs e)
127132
{
128-
var jsonEventFormatter = new JsonEventFormatter();
129-
var model = (sender as EventingBasicConsumer).Model;
130-
var cloudEvent = e.ToCloudEvent(jsonEventFormatter, source, topicName);
131-
var record = _records.FirstOrDefault(r => r.Model.Equals(model));
132-
if (CloudEventReceived != null)
133+
lock(_obj)
133134
{
134-
CloudEventReceived(this, new CloudEventArgs(e.RoutingKey, brokerName, cloudEvent, record.ClientId, record.ClientSession));
135+
var jsonEventFormatter = new JsonEventFormatter();
136+
var model = (sender as EventingBasicConsumer).Model;
137+
var cloudEvent = e.ToCloudEvent(jsonEventFormatter, source, topicName);
138+
var activeClients = _clientStore.GetAllBySubscribedTopics(brokerName, topicName);
139+
foreach (var client in activeClients)
140+
{
141+
var clientSession = client.GetActiveSessionByTopic(brokerName, topicName);
142+
CloudEventReceived(this, new CloudEventArgs(topicName, brokerName, cloudEvent, client.ClientId, clientSession));
143+
}
135144
}
136145
}
137146

src/EventMesh.Runtime.AMQP/AMQPSubscriptionRecord.cs

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/EventMesh.Runtime.Client/Program.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
using System;
2-
using System.Threading.Tasks;
1+
using System.Threading.Tasks;
32

43
namespace EventMesh.Runtime.Client
54
{
65
class Program
76
{
87
static async Task<int> Main(string[] args)
98
{
10-
await Scenario1SubscribeBridge.Launch();
11-
Console.WriteLine("Please press enter to quit the application ...");
12-
Console.ReadLine();
9+
// await Scenario1CreateSubSession.Launch();
10+
await Scenario2SubscribeToOneTopic.Launch();
1311
return 1;
1412
}
1513
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using EventMesh.Runtime.Messages;
2+
using System;
3+
using System.Threading.Tasks;
4+
5+
namespace EventMesh.Runtime.Client
6+
{
7+
public static class Scenario1CreateSubSession
8+
{
9+
private static RuntimeClient _runtimeClient;
10+
11+
public static async Task Launch(string hostName = "localhost", int port = 4000, string clientId = "clientId")
12+
{
13+
_runtimeClient = new RuntimeClient(hostName, port);
14+
var sessionId = await CreateSession(clientId);
15+
Console.WriteLine($"session is created with id '{sessionId}'");
16+
Console.WriteLine("Please press enter to quit the application ...");
17+
Console.ReadLine();
18+
await _runtimeClient.Disconnect(clientId, sessionId);
19+
}
20+
21+
private static async Task<string> CreateSession(string clientId)
22+
{
23+
var helloResponse = await _runtimeClient.Hello(new UserAgent
24+
{
25+
ClientId = clientId,
26+
Environment = "TST",
27+
Password = "password",
28+
Pid = 2000,
29+
BufferCloudEvents = 2,
30+
Version = "0",
31+
Purpose = UserAgentPurpose.SUB
32+
});
33+
return helloResponse.SessionId;
34+
}
35+
}
36+
}

src/EventMesh.Runtime.Client/Scenario1SubscribeBridge.cs

Lines changed: 0 additions & 54 deletions
This file was deleted.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using EventMesh.Runtime.Messages;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
7+
namespace EventMesh.Runtime.Client
8+
{
9+
public static class Scenario2SubscribeToOneTopic
10+
{
11+
private static RuntimeClient _runtimeClient;
12+
13+
public static async Task Launch(string hostName = "localhost", int port = 4000, string clientId = "clientId", string topicName = "Person.*")
14+
{
15+
_runtimeClient = new RuntimeClient(hostName, port);
16+
var sessionId = await CreateSession(clientId);
17+
Console.WriteLine($"session is created with id '{sessionId}'");
18+
var subscriptionResult = await SubscribeTopic(clientId, sessionId, topicName);
19+
Console.WriteLine("Please press enter to quit the application ...");
20+
Console.ReadLine();
21+
subscriptionResult.Stop();
22+
await _runtimeClient.Disconnect(clientId, sessionId);
23+
}
24+
25+
private static async Task<string> CreateSession(string clientId)
26+
{
27+
var helloResponse = await _runtimeClient.Hello(new UserAgent
28+
{
29+
ClientId = clientId,
30+
Environment = "TST",
31+
Password = "password",
32+
Pid = 2000,
33+
BufferCloudEvents = 1,
34+
Version = "0",
35+
Purpose = UserAgentPurpose.SUB
36+
});
37+
return helloResponse.SessionId;
38+
}
39+
40+
private static async Task<SubscriptionResult> SubscribeTopic(string clientId, string sessionId, string topicName)
41+
{
42+
return await _runtimeClient.Subscribe(clientId, sessionId, new List<SubscriptionItem>
43+
{
44+
new SubscriptionItem
45+
{
46+
Topic = topicName,
47+
}
48+
}, (msg) =>
49+
{
50+
var cloudEvts = string.Join(",", msg.CloudEvents.Select(c => c.Data));
51+
Console.WriteLine($"Receive '{msg.CloudEvents.Count()}' messages: {cloudEvts}, urn : {string.Join(',', msg.BridgeServers.Select(b => b.Urn))}");
52+
});
53+
}
54+
}
55+
}

src/EventMesh.Runtime.EF/Stores/EFClientStore.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using Microsoft.EntityFrameworkCore;
44
using System.Collections.Generic;
55
using System.Linq;
6-
using System.Net;
76

87
namespace EventMesh.Runtime.EF.Stores
98
{
@@ -47,7 +46,20 @@ public IEnumerable<Client> GetAll()
4746
{
4847
return _dbContext.Clients
4948
.Include(c => c.Sessions)
50-
.Include(c => c.Topics).ToList();
49+
.Include(c => c.Topics)
50+
.ToList();
51+
}
52+
53+
public IEnumerable<Client> GetAllBySubscribedTopics(string brokerName, string topicName)
54+
{
55+
return _dbContext.Clients
56+
.Include(c => c.Sessions).ThenInclude(c => c.Histories)
57+
.Include(c => c.Sessions).ThenInclude(c => c.Topics)
58+
.Include(c => c.Sessions).ThenInclude(c => c.PendingCloudEvents)
59+
.Include(c => c.Sessions).ThenInclude(c => c.Bridges)
60+
.Include(c => c.Topics)
61+
.Where(c => c.Sessions.Any(s => s.State == ClientSessionState.ACTIVE && s.Topics.Any(t => t.BrokerName == brokerName && t.Name == topicName)))
62+
.ToList();
5163
}
5264

5365
public Client GetByActiveSession(string clientId, string sessionId)

src/EventMesh.Runtime.Website/Data/WeatherForecast.cs

Lines changed: 0 additions & 15 deletions
This file was deleted.

src/EventMesh.Runtime.Website/Data/WeatherForecastService.cs

Lines changed: 0 additions & 25 deletions
This file was deleted.

0 commit comments

Comments
 (0)