Skip to content

Commit fcc02c9

Browse files
Ticket #58 : Support apache kafka
1 parent 8d84de1 commit fcc02c9

26 files changed

+540
-71
lines changed

EventMesh.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventMesh.Runtime.EF", "src
2323
EndProject
2424
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventMesh.Runtime.Website", "src\EventMesh.Runtime.Website\EventMesh.Runtime.Website.csproj", "{99809234-C3D5-45ED-8699-E1DD1337682B}"
2525
EndProject
26+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventMesh.Runtime.Kafka", "src\EventMesh.Runtime.Kafka\EventMesh.Runtime.Kafka.csproj", "{0918AA9B-1D81-4110-BB76-AAB9269F9D10}"
27+
EndProject
2628
Global
2729
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2830
Debug|Any CPU = Debug|Any CPU
@@ -57,6 +59,10 @@ Global
5759
{99809234-C3D5-45ED-8699-E1DD1337682B}.Debug|Any CPU.Build.0 = Debug|Any CPU
5860
{99809234-C3D5-45ED-8699-E1DD1337682B}.Release|Any CPU.ActiveCfg = Release|Any CPU
5961
{99809234-C3D5-45ED-8699-E1DD1337682B}.Release|Any CPU.Build.0 = Release|Any CPU
62+
{0918AA9B-1D81-4110-BB76-AAB9269F9D10}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
63+
{0918AA9B-1D81-4110-BB76-AAB9269F9D10}.Debug|Any CPU.Build.0 = Debug|Any CPU
64+
{0918AA9B-1D81-4110-BB76-AAB9269F9D10}.Release|Any CPU.ActiveCfg = Release|Any CPU
65+
{0918AA9B-1D81-4110-BB76-AAB9269F9D10}.Release|Any CPU.Build.0 = Release|Any CPU
6066
EndGlobalSection
6167
GlobalSection(SolutionProperties) = preSolution
6268
HideSolutionNode = FALSE
@@ -69,6 +75,7 @@ Global
6975
{3F30FA61-B630-4EB3-959C-9AD5FEEDB2E4} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}
7076
{5A4AE409-D86A-435A-B6DA-2FA57896067C} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
7177
{99809234-C3D5-45ED-8699-E1DD1337682B} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}
78+
{0918AA9B-1D81-4110-BB76-AAB9269F9D10} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
7279
EndGlobalSection
7380
GlobalSection(ExtensibilityGlobals) = postSolution
7481
SolutionGuid = {B9BD3B8C-B2C9-468F-BF54-66BFE9B565EC}

NoteEventMESH.txt

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/EventMesh.Runtime.AMQP/AMQPConsumer.cs

Lines changed: 26 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,40 +13,38 @@
1313

1414
namespace EventMesh.Runtime.AMQP
1515
{
16-
public class AMQPConsumer : IMessageConsumer
16+
public class AMQPConsumer : BaseMessageConsumer<AMQPOptions>
1717
{
1818
private readonly List<AMQPSubscriptionRecord> _subscriptions = new List<AMQPSubscriptionRecord>();
1919
private readonly IBrokerConfigurationStore _brokerConfigurationStore;
2020
private readonly AMQPOptions _opts;
2121
private readonly IClientStore _clientStore;
22-
private readonly RuntimeOptions _runtimeOpts;
2322
private IConnection _connection;
2423

25-
public event EventHandler<CloudEventArgs> CloudEventReceived;
26-
2724
public AMQPConsumer(
2825
IBrokerConfigurationStore brokerConfigurationStore,
2926
IClientStore clientStore,
3027
IOptions<AMQPOptions> opts,
31-
IOptions<RuntimeOptions> runtimeOpts)
28+
IOptions<RuntimeOptions> runtimeOpts) : base(runtimeOpts)
3229
{
3330
_opts = opts.Value;
3431
_brokerConfigurationStore = brokerConfigurationStore;
3532
_clientStore = clientStore;
36-
_runtimeOpts = runtimeOpts.Value;
3733
}
3834

35+
public override event EventHandler<CloudEventArgs> CloudEventReceived;
36+
3937
#region Actions
4038

41-
public string BrokerName
39+
public override string BrokerName
4240
{
4341
get
4442
{
4543
return _opts.BrokerName;
4644
}
4745
}
4846

49-
public Task Start(CancellationToken cancellationToken)
47+
public override Task Start(CancellationToken cancellationToken)
5048
{
5149
var options = GetOptions();
5250
var connectionFactory = new ConnectionFactory();
@@ -55,7 +53,7 @@ public Task Start(CancellationToken cancellationToken)
5553
return Task.CompletedTask;
5654
}
5755

58-
public Task Stop(CancellationToken cancellationToken)
56+
public override Task Stop(CancellationToken cancellationToken)
5957
{
6058
if (_connection != null)
6159
{
@@ -65,53 +63,16 @@ public Task Stop(CancellationToken cancellationToken)
6563
return Task.CompletedTask;
6664
}
6765

68-
public Task Subscribe(string topicName, Client client, string sessionId, CancellationToken cancellationToken)
66+
public override void Dispose()
6967
{
70-
var options = GetOptions();
71-
var activeSession = client.GetActiveSession(sessionId);
72-
if (activeSession.HasTopic(topicName, options.BrokerName))
73-
{
74-
return Task.CompletedTask;
75-
}
76-
77-
var topic = client.GetTopic(topicName, options.BrokerName);
78-
if (topic == null)
79-
{
80-
topic = client.AddTopic(topicName, options.BrokerName);
81-
}
82-
83-
Task.Run(() =>
84-
{
85-
Thread.Sleep(_runtimeOpts.WaitLocalSubscriptionIntervalMS);
86-
ListenTopic(options, topicName, topic, client.ClientId, activeSession.Id);
87-
});
88-
activeSession.SubscribeTopic(topicName, options.BrokerName);
89-
return Task.CompletedTask;
90-
}
91-
92-
public Task Unsubscribe(string topicName, Client client, string sessionId, CancellationToken cancellationToken)
93-
{
94-
var options = GetOptions();
95-
var activeSession = client.GetActiveSession(sessionId);
96-
if (!activeSession.HasTopic(topicName, options.BrokerName))
97-
{
98-
return Task.CompletedTask;
99-
}
100-
101-
var subscription = _subscriptions.First(s => s.ClientSessionId == sessionId && s.ClientId == client.ClientId && s.TopicName == topicName);
102-
subscription.Channel.BasicCancel(subscription.ConsumerTag);
103-
_subscriptions.Remove(subscription);
104-
return Task.CompletedTask;
68+
Stop(CancellationToken.None).Wait();
10569
}
10670

107-
public void Dispose()
71+
public override void Commit(string topicName, Client client, string sessionId, int nbEvts)
10872
{
109-
Stop(CancellationToken.None).Wait();
11073
}
11174

112-
#endregion
113-
114-
private void ListenTopic(AMQPOptions options, string topicName, ClientTopic topic, string clientId, string clientSessionId)
75+
protected override void ListenTopic(AMQPOptions options, string topicName, ClientTopic topic, string clientId, string clientSessionId)
11576
{
11677
if (_subscriptions.Any(s => s.BrokerName == options.BrokerName && s.TopicName == topicName && s.ClientSessionId == clientSessionId && s.ClientId == clientId))
11778
{
@@ -137,6 +98,21 @@ private void ListenTopic(AMQPOptions options, string topicName, ClientTopic topi
13798
_subscriptions.Add(new AMQPSubscriptionRecord(topicName, options.BrokerName, clientId, clientSessionId, channel, tag));
13899
}
139100

101+
protected override void UnsubscribeTopic(string topicName, Client client, string sessionId)
102+
{
103+
var subscription = _subscriptions.First(s => s.ClientSessionId == sessionId && s.ClientId == client.ClientId && s.TopicName == topicName);
104+
subscription.Channel.BasicCancel(subscription.ConsumerTag);
105+
_subscriptions.Remove(subscription);
106+
}
107+
108+
protected override AMQPOptions GetOptions()
109+
{
110+
return _brokerConfigurationStore.Get(_opts.BrokerName).ToAMQPOptions();
111+
}
112+
113+
#endregion
114+
115+
140116
private void ReceiveMessage(object sender, string clientId, string clientSessionId, string topicName, string source, string brokerName, BasicDeliverEventArgs e)
141117
{
142118
var jsonEventFormatter = new JsonEventFormatter();
@@ -151,10 +127,5 @@ private void ReceiveMessage(object sender, string clientId, string clientSession
151127
var clientSession = client.GetActiveSessionByTopic(brokerName, topicName);
152128
CloudEventReceived(this, new CloudEventArgs(topicName, brokerName, cloudEvent, client.ClientId, clientSession));
153129
}
154-
155-
private AMQPOptions GetOptions()
156-
{
157-
return _brokerConfigurationStore.Get(_opts.BrokerName).ToAMQPOptions();
158-
}
159130
}
160131
}

src/EventMesh.Runtime.AMQP/AMQPOptions.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace EventMesh.Runtime.AMQP
55
{
6-
public class AMQPOptions
6+
public class AMQPOptions : BaseBrokerOptions
77
{
88
public AMQPOptions()
99
{
@@ -23,7 +23,6 @@ public AMQPOptions()
2323
public Action<ConnectionFactory> ConnectionFactory { get; set; }
2424
public string TopicName { get; set; }
2525
public string QueueName { get; set; }
26-
public string BrokerName { get; set; }
2726
public string Source { get; set; }
2827
}
2928
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
docker run -d --name some-rabbit -p 5672:5672 -p 5673:5673 -p 15672:15672 -p 1883:1883 rabbitmq:3-management

src/EventMesh.Runtime.Client/Scenario2SubscribeToOneTopic.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private static async Task<SubscriptionResult> SubscribeTopic(string clientId, st
4848
}, (msg) =>
4949
{
5050
var cloudEvts = string.Join(",", msg.CloudEvents.Select(c => c.Data));
51-
Console.WriteLine($"Receive '{msg.CloudEvents.Count()}' messages: {cloudEvts}, urn : {string.Join(',', msg.BridgeServers.Select(b => b.Urn))}");
51+
Console.WriteLine($"Receive '{msg.CloudEvents.Count()}' messages: {cloudEvts}, BrokerName : {msg.BrokerName}, urn : {string.Join(',', msg.BridgeServers.Select(b => b.Urn))}");
5252
});
5353
}
5454
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using EventMesh.Runtime.Models;
2+
using System.Collections.Generic;
3+
4+
namespace EventMesh.Runtime.Kafka
5+
{
6+
public static class BrokerConfigurationExtensions
7+
{
8+
public static KafkaOptions ToKafkaOptions(this BrokerConfiguration conf)
9+
{
10+
return new KafkaOptions
11+
{
12+
BrokerName = conf.Name,
13+
BootstrapServers = conf.GetValue("BootstrapServers")
14+
};
15+
}
16+
17+
public static BrokerConfiguration ToConfiguration(this KafkaOptions opts)
18+
{
19+
return new BrokerConfiguration
20+
{
21+
Name = opts.BrokerName,
22+
Protocol = Constants.Protocol,
23+
Records = new List<BrokerConfigurationRecord>
24+
{
25+
new BrokerConfigurationRecord
26+
{
27+
Key = "BootstrapServers",
28+
Value = opts.BootstrapServers
29+
}
30+
}
31+
};
32+
}
33+
}
34+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace EventMesh.Runtime.Kafka
2+
{
3+
public class Constants
4+
{
5+
public static string Protocol = "kafka";
6+
}
7+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using CloudNative.CloudEvents;
2+
using Confluent.Kafka;
3+
using System;
4+
5+
namespace EventMesh.Runtime.Kafka
6+
{
7+
public static class ConsumeResultExtensions
8+
{
9+
public static CloudEvent ToCloudEvent(this ConsumeResult<Ignore, string> message,
10+
CloudEventFormatter cloudEventFormatter,
11+
string source,
12+
string topicName,
13+
params CloudEvent[]? extensionAttributes)
14+
{
15+
var cloudEvent = new CloudEvent();
16+
cloudEvent.Id = Guid.NewGuid().ToString();
17+
cloudEvent.Source = new Uri($"{source}:{topicName}");
18+
cloudEvent.Type = message.Topic;
19+
cloudEvent.DataContentType = "application/json";
20+
cloudEvent.Data = message.Message.Value;
21+
return cloudEvent;
22+
}
23+
}
24+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net5.0</TargetFramework>
4+
</PropertyGroup>
5+
<ItemGroup>
6+
<ProjectReference Include="..\EventMesh.Runtime\EventMesh.Runtime.csproj" />
7+
</ItemGroup>
8+
<ItemGroup>
9+
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />
10+
</ItemGroup>
11+
</Project>

0 commit comments

Comments
 (0)