Skip to content

Commit 1d0f198

Browse files
Ticket #57 : Develop standard UI
1 parent e05a135 commit 1d0f198

File tree

66 files changed

+1910
-265
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1910
-265
lines changed
Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,44 @@
11
using CloudNative.CloudEvents.SystemTextJson;
22
using EventMesh.Runtime.Events;
33
using EventMesh.Runtime.Models;
4+
using EventMesh.Runtime.Stores;
45
using Microsoft.Extensions.Options;
56
using RabbitMQ.Client;
67
using RabbitMQ.Client.Events;
78
using System;
89
using System.Collections.Generic;
910
using System.Linq;
11+
using System.Net;
1012
using System.Threading;
1113
using System.Threading.Tasks;
1214

1315
namespace EventMesh.Runtime.AMQP
1416
{
1517
public class AMQPConsumer : IMessageConsumer
1618
{
17-
private readonly AMQPOptions _options;
1819
private readonly List<AMQPSubscriptionRecord> _records;
20+
private readonly IBrokerConfigurationStore _brokerConfigurationStore;
21+
private readonly AMQPOptions _opts;
1922
private IConnection _connection;
2023

2124
public event EventHandler<CloudEventArgs> CloudEventReceived;
2225

23-
public AMQPConsumer(IOptions<AMQPOptions> options)
26+
public AMQPConsumer(
27+
IBrokerConfigurationStore brokerConfigurationStore,
28+
IOptions<AMQPOptions> opts)
2429
{
25-
_options = options.Value;
2630
_records = new List<AMQPSubscriptionRecord>();
31+
_opts = opts.Value;
32+
_brokerConfigurationStore = brokerConfigurationStore;
2733
}
2834

2935
#region Actions
3036

3137
public Task Start(CancellationToken cancellationToken)
3238
{
39+
var options = GetOptions();
3340
var connectionFactory = new ConnectionFactory();
34-
_options.ConnectionFactory(connectionFactory);
41+
options.ConnectionFactory(connectionFactory);
3542
_connection = connectionFactory.CreateConnection();
3643
return Task.CompletedTask;
3744
}
@@ -46,35 +53,39 @@ public Task Stop(CancellationToken cancellationToken)
4653
return Task.CompletedTask;
4754
}
4855

49-
public Task Subscribe(string topicName, Client client, CancellationToken cancellationToken)
56+
public Task Subscribe(string topicName, Client client, string sessionId, CancellationToken cancellationToken)
5057
{
51-
if (client.ActiveSession.HasTopic(topicName, _options.BrokerName))
58+
var options = GetOptions();
59+
var activeSession = client.GetActiveSession(sessionId);
60+
if (activeSession.HasTopic(topicName, options.BrokerName))
5261
{
5362
return Task.CompletedTask;
5463
}
5564

56-
var channel = ListenTopic(topicName, client);
57-
client.ActiveSession.SubscribeTopic(topicName, _options.BrokerName);
58-
_records.Add(new AMQPSubscriptionRecord(channel, client.ClientId, client.ActiveSession, topicName));
65+
var channel = ListenTopic(options, topicName, client);
66+
activeSession.SubscribeTopic(topicName, options.BrokerName);
67+
_records.Add(new AMQPSubscriptionRecord(channel, client.ClientId, activeSession, topicName));
5968
return Task.CompletedTask;
6069
}
6170

62-
public Task Unsubscribe(string topicName, Client client, CancellationToken cancellationToken)
71+
public Task Unsubscribe(string topicName, Client client, string sessionId, CancellationToken cancellationToken)
6372
{
64-
if (!client.ActiveSession.HasTopic(topicName, _options.BrokerName))
73+
var options = GetOptions();
74+
var activeSession = client.GetActiveSession(sessionId);
75+
if (!activeSession.HasTopic(topicName, options.BrokerName))
6576
{
6677
return Task.CompletedTask;
6778
}
6879

69-
var subscription = _records.FirstOrDefault(r => r.ClientSession.Equals(client.ActiveSession) && r.TopicName == topicName);
80+
var subscription = _records.FirstOrDefault(r => r.ClientSession.Equals(activeSession) && r.TopicName == topicName);
7081
if (subscription == null)
7182
{
7283
return Task.CompletedTask;
7384
}
7485

7586
subscription.Model.Dispose();
7687
_records.Remove(subscription);
77-
client.ActiveSession.UnsubscribeTopic(topicName, _options.BrokerName);
88+
activeSession.UnsubscribeTopic(topicName, options.BrokerName);
7889
return Task.CompletedTask;
7990
}
8091

@@ -85,43 +96,48 @@ public void Dispose()
8596

8697
#endregion
8798

88-
private IModel ListenTopic(string topicName, Client client)
99+
private IModel ListenTopic(AMQPOptions options, string topicName, Client client)
89100
{
90-
var topic = client.GetTopic(topicName, _options.BrokerName);
101+
var topic = client.GetTopic(topicName, options.BrokerName);
91102
if (topic == null)
92103
{
93-
topic = client.AddTopic(topicName, _options.BrokerName);
104+
topic = client.AddTopic(topicName, options.BrokerName);
94105
}
95106

96107
var channel = _connection.CreateModel();
97108
var queue = channel.QueueDeclare(
98-
$"{_options.QueueName}-{topicName}",
109+
$"{options.QueueName}-{topicName}",
99110
true,
100111
false,
101112
false,
102113
new Dictionary<string, object>
103114
{
104115
{ "x-queue-type", "stream" }
105116
});
106-
channel.QueueBind(queue, _options.TopicName, topicName);
117+
channel.QueueBind(queue, options.TopicName, topicName);
107118
var consumer = new EventingBasicConsumer(channel);
108-
consumer.Received += (sender, e) => ReceiveMessage(sender, topicName, e);
119+
consumer.Received += (sender, e) => ReceiveMessage(sender, topicName, options.Source, options.BrokerName, e);
109120
// TODO : Update BasicQos.
110121
channel.BasicQos(0, 100, false);
111122
channel.BasicConsume(queue, false, string.Empty, new Dictionary<string, object> { { "x-stream-offset", topic.Offset } }, consumer);
112123
return channel;
113124
}
114125

115-
private void ReceiveMessage(object sender, string topicName, BasicDeliverEventArgs e)
126+
private void ReceiveMessage(object sender, string topicName, string source, string brokerName, BasicDeliverEventArgs e)
116127
{
117128
var jsonEventFormatter = new JsonEventFormatter();
118129
var model = (sender as EventingBasicConsumer).Model;
119-
var cloudEvent = e.ToCloudEvent(jsonEventFormatter, _options.Source, topicName);
130+
var cloudEvent = e.ToCloudEvent(jsonEventFormatter, source, topicName);
120131
var record = _records.FirstOrDefault(r => r.Model.Equals(model));
121132
if (CloudEventReceived != null)
122133
{
123-
CloudEventReceived(this, new CloudEventArgs(e.RoutingKey, _options.BrokerName, cloudEvent, record.ClientId, record.ClientSession));
134+
CloudEventReceived(this, new CloudEventArgs(e.RoutingKey, brokerName, cloudEvent, record.ClientId, record.ClientSession));
124135
}
125136
}
137+
138+
private AMQPOptions GetOptions()
139+
{
140+
return _brokerConfigurationStore.Get(_opts.BrokerName).ToAMQPOptions();
141+
}
126142
}
127143
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using EventMesh.Runtime.Models;
2+
using RabbitMQ.Client;
3+
using System.Collections.Generic;
4+
5+
namespace EventMesh.Runtime.AMQP
6+
{
7+
public static class BrokerConfigurationExtensions
8+
{
9+
public static AMQPOptions ToAMQPOptions(this BrokerConfiguration conf)
10+
{
11+
return new AMQPOptions
12+
{
13+
BrokerName = conf.Name,
14+
QueueName = conf.GetValue("queueName"),
15+
Source = conf.GetValue("source"),
16+
TopicName = conf.GetValue("topicName"),
17+
ConnectionFactory = (opt) =>
18+
{
19+
opt.HostName = conf.GetValue("connectionFactory.hostName");
20+
opt.Port = int.Parse(conf.GetValue("connectionFactory.port"));
21+
opt.UserName = conf.GetValue("connectionFactory.username");
22+
opt.Password = conf.GetValue("connectionFactory.password");
23+
}
24+
};
25+
}
26+
27+
public static BrokerConfiguration ToConfiguration(this AMQPOptions opts)
28+
{
29+
var connectionFactory = new ConnectionFactory();
30+
opts.ConnectionFactory(connectionFactory);
31+
return new BrokerConfiguration
32+
{
33+
Name = opts.BrokerName,
34+
Protocol = Constants.Protocol,
35+
Records = new List<BrokerConfigurationRecord>
36+
{
37+
new BrokerConfigurationRecord
38+
{
39+
Key = "queueName",
40+
Value = opts.QueueName
41+
},
42+
new BrokerConfigurationRecord
43+
{
44+
Key = "source",
45+
Value = opts.Source
46+
},
47+
new BrokerConfigurationRecord
48+
{
49+
Key = "topicName",
50+
Value = opts.TopicName
51+
},
52+
new BrokerConfigurationRecord
53+
{
54+
Key = "connectionFactory.hostName",
55+
Value = connectionFactory.HostName
56+
},
57+
new BrokerConfigurationRecord
58+
{
59+
Key = "connectionFactory.port",
60+
Value = connectionFactory.Port.ToString()
61+
},
62+
new BrokerConfigurationRecord
63+
{
64+
Key = "connectionFactory.username",
65+
Value = connectionFactory.UserName
66+
},
67+
new BrokerConfigurationRecord
68+
{
69+
Key = "connectionFactory.password",
70+
Value = connectionFactory.Password
71+
}
72+
}
73+
};
74+
}
75+
}
76+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace EventMesh.Runtime.AMQP
2+
{
3+
public static class Constants
4+
{
5+
public const string Protocol = "amqp";
6+
}
7+
}

src/EventMesh.Runtime.AMQP/EventMeshRuntimeHostBuilderExtensions.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,36 @@
11
using EventMesh.Runtime.AMQP;
2+
using EventMesh.Runtime.Stores;
23
using Microsoft.Extensions.DependencyInjection;
4+
using Microsoft.Extensions.Options;
35
using System;
46

57
namespace EventMesh.Runtime
68
{
79
public static class EventMeshRuntimeHostBuilderExtensions
810
{
9-
public static RuntimeHostBuilder AddAMQP(this RuntimeHostBuilder eventMeshRuntime, Action<AMQPOptions> amqpOptions = null)
11+
public static RuntimeHostBuilder AddAMQP(this RuntimeHostBuilder eventMeshRuntime, Action<AMQPOptions> callback = null)
1012
{
11-
if (amqpOptions != null)
13+
if (callback != null)
1214
{
13-
eventMeshRuntime.ServiceCollection.Configure(amqpOptions);
15+
eventMeshRuntime.ServiceCollection.Configure(callback);
1416
}
1517
else
1618
{
1719
eventMeshRuntime.ServiceCollection.Configure<AMQPOptions>(opt => { });
1820
}
1921

22+
eventMeshRuntime.AddInitScript((s) =>
23+
{
24+
var amqpOptions = s.GetRequiredService<IOptions<AMQPOptions>>().Value;
25+
var brokerConfigurationStore = s.GetRequiredService<IBrokerConfigurationStore>();
26+
var brokerConfiguration = brokerConfigurationStore.Get(amqpOptions.BrokerName);
27+
if(brokerConfiguration == null)
28+
{
29+
brokerConfiguration = amqpOptions.ToConfiguration();
30+
brokerConfigurationStore.Add(brokerConfiguration);
31+
brokerConfigurationStore.SaveChanges();
32+
}
33+
});
2034
eventMeshRuntime.ServiceCollection.AddSingleton<IMessageConsumer, AMQPConsumer>();
2135
return eventMeshRuntime;
2236
}

src/EventMesh.Runtime.Client/Scenario1SubscribeBridge.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ private static async Task AddBridge()
2525
private static async Task Subscribe()
2626
{
2727
Console.WriteLine("Subscribe to topic 'Test.Coucou'");
28+
const string clientId = "7127b7d9-a4b3-4728-b8d6-7c573503be98";
2829
// Create a session.
29-
await _runtimeClient.Hello(new UserAgent
30+
var helloResponse = await _runtimeClient.Hello(new UserAgent
3031
{
31-
ClientId = "7127b7d9-a4b3-4728-b8d6-7c573503be98",
32+
ClientId = clientId,
3233
Environment = "TST",
3334
Password = "password",
3435
Pid = 2000,
@@ -37,7 +38,7 @@ await _runtimeClient.Hello(new UserAgent
3738
BufferCloudEvents = 2
3839
});
3940
// Subscribe to a topic.
40-
await _runtimeClient.Subscribe(new List<SubscriptionItem>
41+
await _runtimeClient.Subscribe(clientId, helloResponse.SessionId, new List<SubscriptionItem>
4142
{
4243
new SubscriptionItem
4344
{
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using EventMesh.Runtime.Models;
2+
using Microsoft.EntityFrameworkCore;
3+
using Microsoft.EntityFrameworkCore.Metadata.Builders;
4+
5+
namespace EventMesh.Runtime.EF.Configurations
6+
{
7+
public class BrokerConfConfiguration : IEntityTypeConfiguration<BrokerConfiguration>
8+
{
9+
public void Configure(EntityTypeBuilder<BrokerConfiguration> builder)
10+
{
11+
builder.HasKey(b => b.Name);
12+
builder.HasMany(b => b.Records).WithOne().OnDelete(DeleteBehavior.Cascade);
13+
}
14+
}
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using EventMesh.Runtime.Models;
2+
using Microsoft.EntityFrameworkCore;
3+
using Microsoft.EntityFrameworkCore.Metadata.Builders;
4+
5+
namespace EventMesh.Runtime.EF.Configurations
6+
{
7+
public class BrokerConfigurationRecordConfiguration : IEntityTypeConfiguration<BrokerConfigurationRecord>
8+
{
9+
public void Configure(EntityTypeBuilder<BrokerConfigurationRecord> builder)
10+
{
11+
builder.Property<int>("Id").ValueGeneratedOnAdd();
12+
builder.HasKey("Id");
13+
}
14+
}
15+
}

src/EventMesh.Runtime.EF/Configurations/ClientConfiguration.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public void Configure(EntityTypeBuilder<Client> builder)
1111
builder.HasKey(c => c.ClientId);
1212
builder.HasMany(c => c.Sessions).WithOne().OnDelete(DeleteBehavior.Cascade);
1313
builder.HasMany(c => c.Topics).WithOne().OnDelete(DeleteBehavior.Cascade);
14+
builder.Ignore(c => c.ActiveSessions);
1415
}
1516
}
1617
}

src/EventMesh.Runtime.EF/Configurations/ClientSessionConfiguration.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ public class ClientSessionConfiguration : IEntityTypeConfiguration<ClientSession
88
{
99
public void Configure(EntityTypeBuilder<ClientSession> builder)
1010
{
11-
builder.Property<int>("Id").ValueGeneratedOnAdd();
12-
builder.HasKey("Id");
11+
builder.HasKey(cs => cs.Id);
1312
builder.Ignore(cs => cs.Endpoint);
1413
builder.Ignore(cs => cs.Purpose);
1514
builder.HasMany(cs => cs.Histories).WithOne().OnDelete(DeleteBehavior.Cascade);

src/EventMesh.Runtime.EF/EventMeshDBContext.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public EventMeshDBContext(DbContextOptions<EventMeshDBContext> dbContextOptions)
99

1010
public virtual DbSet<Client> Clients { get; set; }
1111
public virtual DbSet<BridgeServer> BridgeServers { get; set; }
12+
public virtual DbSet<BrokerConfiguration> BrokerConfigurations { get; set; }
1213

1314
protected override void OnModelCreating(ModelBuilder modelBuilder)
1415
{

0 commit comments

Comments
 (0)