Skip to content

Commit f7e5bd9

Browse files
Ticket #59 : Publish message to a topic
1 parent fcc02c9 commit f7e5bd9

File tree

52 files changed

+1067
-1114
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1067
-1114
lines changed

EventMesh.sln

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "02. Startup Layer", "02. St
1515
EndProject
1616
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventMesh.Runtime.AMQP", "src\EventMesh.Runtime.AMQP\EventMesh.Runtime.AMQP.csproj", "{0478428F-225D-4541-BC67-805A2041C12C}"
1717
EndProject
18-
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventMesh.Runtime.Server", "src\EventMesh.Runtime.Server\EventMesh.Runtime.Server.csproj", "{C4C745A5-DFFF-4803-883A-B7F4F8888338}"
19-
EndProject
2018
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventMesh.Runtime.Client", "src\EventMesh.Runtime.Client\EventMesh.Runtime.Client.csproj", "{3F30FA61-B630-4EB3-959C-9AD5FEEDB2E4}"
2119
EndProject
2220
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventMesh.Runtime.EF", "src\EventMesh.Runtime.EF\EventMesh.Runtime.EF.csproj", "{5A4AE409-D86A-435A-B6DA-2FA57896067C}"
@@ -43,10 +41,6 @@ Global
4341
{0478428F-225D-4541-BC67-805A2041C12C}.Debug|Any CPU.Build.0 = Debug|Any CPU
4442
{0478428F-225D-4541-BC67-805A2041C12C}.Release|Any CPU.ActiveCfg = Release|Any CPU
4543
{0478428F-225D-4541-BC67-805A2041C12C}.Release|Any CPU.Build.0 = Release|Any CPU
46-
{C4C745A5-DFFF-4803-883A-B7F4F8888338}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
47-
{C4C745A5-DFFF-4803-883A-B7F4F8888338}.Debug|Any CPU.Build.0 = Debug|Any CPU
48-
{C4C745A5-DFFF-4803-883A-B7F4F8888338}.Release|Any CPU.ActiveCfg = Release|Any CPU
49-
{C4C745A5-DFFF-4803-883A-B7F4F8888338}.Release|Any CPU.Build.0 = Release|Any CPU
5044
{3F30FA61-B630-4EB3-959C-9AD5FEEDB2E4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
5145
{3F30FA61-B630-4EB3-959C-9AD5FEEDB2E4}.Debug|Any CPU.Build.0 = Debug|Any CPU
5246
{3F30FA61-B630-4EB3-959C-9AD5FEEDB2E4}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -71,7 +65,6 @@ Global
7165
{D2CC2EC6-5499-4D4E-88DC-ECFCCBEC0278} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
7266
{BC28994B-7B8A-4C6B-99E5-9167130ADC04} = {2A14F697-FFC8-402F-82B4-8128FE897DC9}
7367
{0478428F-225D-4541-BC67-805A2041C12C} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
74-
{C4C745A5-DFFF-4803-883A-B7F4F8888338} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}
7568
{3F30FA61-B630-4EB3-959C-9AD5FEEDB2E4} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}
7669
{5A4AE409-D86A-435A-B6DA-2FA57896067C} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
7770
{99809234-C3D5-45ED-8699-E1DD1337682B} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}

docs/documentation/eventmesh/protocol.md

Lines changed: 103 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,65 @@
22

33
## Concepts
44

5-
Session
6-
* A session is needed to send commands to the server like: Subscribe to a topic.
5+
### EventMesh Server
76

8-
Bridge
9-
* A bridge can be created between two servers. Subscription to one or more topics happened to all the servers present in the configured network.
7+
Acts as a proxy between message brokers and clients.
108

11-
EventMesh server
12-
* An EventMesh server is linked to one or more message brokers (RabbitMQ / Apache Kafka etc...)
13-
* All the messages are translated into CloudEvents.
9+
### Client
10+
11+
Client can be any types of application which implements the EventMesh protocol.
12+
13+
### Session
14+
15+
Client can have one or more sessions.
16+
There are two types of session :
17+
* SUB : subscribe to one or more topics.
18+
* PUB : publish messages.
19+
20+
### Bridge
21+
22+
Link between two servers.
23+
When a bridge is established, messages can transit from one server to an another.
24+
25+
### CloudEvent
26+
27+
All messages coming from Brokers are translated into CloudEvent and vice versa.
1428

1529
## Common Structure
1630

1731
### Message structure
1832

19-
| Field size | Description | Data Type | Comments |
20-
| ---------- | ------------------ | --------- | ----------------------------------------------------------------------------- |
21-
| 10 | Magic Value | string | Magic value, must be equal to "EventMesh" |
22-
| 5 | Protocol Version | string | Protocol version number, must be equal to 0000 |
23-
| 4 | Command | int32 | Identification of the command |
24-
| 4 | Status Code | int32 | Identification of the status |
25-
| ANY | Status Description | string | Short description of the status |
26-
| 11 | Seq | string | Correlation number of the request. Response must have the same seq number |
27-
| any | body | object | Content of the command |
33+
| Description | Data Type | Comment | Default Value |
34+
| ------------------ | --------- | ----------------------------------------------------------------------------- | ------------- |
35+
| Magic Value | string | Magic value. | EventMesh |
36+
| Protocol Version | string | Protocol version. | 0000 |
37+
| Command | int32 | Identifier of the command. | |
38+
| Status | int32 | Identifier of the status. | |
39+
| Status description | string | Short description of the status. | |
40+
| Error | string | Short description of the error . | |
41+
| Seq | string | Correlation number. Request and response must have the same seq number. | |
42+
| Body | ANY | Contains the properties of the command. | |
2843

2944
Known commands:
3045

31-
| Code | Name |
32-
| ---- | --------------------------- |
33-
| 0 | HEARBEAT_REQUEST |
34-
| 1 | HEARTBEAT_RESPONSE |
35-
| 2 | HELLO_REQUEST |
36-
| 3 | HELLO_RESPONSE |
37-
| 4 | SUBSCRIBE_REQUEST |
38-
| 5 | SUBSCRIBE_RESPONSE |
39-
| 6 | ASYNC_MESSAGE_TO_CLIENT |
40-
| 7 | ASYNC_MESSAGE_TO_CLIENT_ACK |
41-
| 8 | ADD_BRIDGE_REQUEST |
42-
| 9 | ADD_BRIDGE_RESPONSE |
43-
| 11 | DISCONNECT_REQUEST |
44-
| 12 | DISCONNECT_RESPONSE |
46+
| Code | Name |
47+
| ---- | ------------------------------------ |
48+
| 0 | HEARBEAT_REQUEST |
49+
| 1 | HEARTBEAT_RESPONSE |
50+
| 2 | HELLO_REQUEST |
51+
| 3 | HELLO_RESPONSE |
52+
| 4 | SUBSCRIBE_REQUEST |
53+
| 5 | SUBSCRIBE_RESPONSE |
54+
| 6 | ASYNC_MESSAGE_TO_CLIENT |
55+
| 7 | ASYNC_MESSAGE_TO_CLIENT_ACK |
56+
| 8 | ADD_BRIDGE_REQUEST |
57+
| 9 | ADD_BRIDGE_RESPONSE |
58+
| 10 | ASYNC_MESSAGE_TO_SERVER |
59+
| 11 | DISCONNECT_REQUEST |
60+
| 12 | DISCONNECT_RESPONSE |
61+
| 13 | ASYNC_MESSAGE_TO_CLIENT_ACK_RESPONSE |
62+
| 14 | PUBLISH_MESSAGE_REQUEST |
63+
| 15 | PUBLISH_MESSAGE_RESPONSE |
4564

4665
Known status:
4766

@@ -54,26 +73,68 @@ Known status:
5473

5574
## Commands
5675

57-
### Heartbeat
76+
### Heartbeat request
77+
78+
*Request* : HEARBEAT_REQUEST.
79+
*Response* : HEARTBEAT_RESPONSE.
80+
81+
Heartbeat request are sent by client to check the availablity of an EventMesh Server.
82+
83+
### Hello request
84+
85+
*Request* : HELLO_REQUEST.
86+
*Response* : HELLO_RESPONSE.
87+
88+
Client send hello request to EventMesh server to create a session.
89+
The following informations are passed in the request. They are used by the EventMesh server to check if the client is authorized to subscribe or publish.
90+
91+
| Description | Data Type | Comment |
92+
| ----------------- | --------- | ----------------------------------------------------------------------------------------------------------------------------- |
93+
| ClientId | string | Identifier of the client |
94+
| Environment | string | Environment used by the client. Possible values can be TST,VAL or PRD |
95+
| Urn | string | URN of the EventMesh server calling an another EventMesh server. |
96+
| Port | int32 | Port of the EventMesh server calling an another EventMesh server. |
97+
| Password | string | Password of the client. Used during the authentication phase. |
98+
| BufferCloudEvents | int32 | Number of messages sent to the client. Used to send a batch of messages. |
99+
| Purpose | int32 | Type of session. Possible values are : SUBSCRIBE = 0, PUBLISH = 1 |
100+
| IsServer | boolean | Indicate if the request is coming from a client or a server. |
101+
| Pid | int32 | Identifier of the process. |
102+
103+
> [!WARNING]
104+
> When a bridge is created between two EventMesh servers. The parameters `Urn` and `Port` will be used to transmit messages from one server to the second and to the client.
105+
106+
When a session is created, an hello response is sent by the EventMesh server to the client.
107+
It contains a unique session identifier. This value will be used by the client to perform future operations.
108+
109+
| Description | Data Type | |
110+
| ----------- | --------- | -------------------------------- |
111+
| SessionId | string | Unique identifier of the session |
58112

59-
Hearbeat requests are sent to check the availablity of the EventMesh Runtime server.
113+
### Subscribe request
60114

61-
### Hello
115+
*Request* : SUBSCRIBE_REQUEST.
116+
*Response* : SUBSCRIBE_RESPONSE.
62117

63-
Hello requests are sent to create and start to communicate with the remote server.
118+
> [!WARNING]
119+
> A session is required to perform a subscription.
64120
65-
### Subscribe
121+
Client send subscribe request to EventMesh sever to subscribe to one or more topics.
122+
The following informations are passed in the request.
66123

67-
This command is used by the client to subscribe to one or more topics.
124+
| Description | Data Type | Comment |
125+
| ----------- | --------- | ---------------------------------- |
126+
| ClientId | string | Identifier of the client. |
127+
| SessionId | string | Unique identifier of the session. |
128+
| Topics | string[] | List of topics. |
68129

69-
### Async Message To Client
130+
### Add Bridge request
70131

71-
When a message is received from a message broker then the EventMesh Runtime server sent an AsyncMessageToClient to the client.
132+
TODO
72133

73-
### AddBridgeRequest
134+
### Disconnect request
74135

75-
This command is used to create a bridge between two servers.
136+
TODO
76137

77-
### Disconnect
138+
### Publish Message request
78139

79-
Close the active session.
140+
TODO

src/EventMesh.Runtime.AMQP/AMQPConsumer.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ protected override AMQPOptions GetOptions()
112112

113113
#endregion
114114

115-
116115
private void ReceiveMessage(object sender, string clientId, string clientSessionId, string topicName, string source, string brokerName, BasicDeliverEventArgs e)
117116
{
118117
var jsonEventFormatter = new JsonEventFormatter();
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using CloudNative.CloudEvents;
2+
using EventMesh.Runtime.Models;
3+
using EventMesh.Runtime.Stores;
4+
using Microsoft.Extensions.Options;
5+
using RabbitMQ.Client;
6+
using System.Threading.Tasks;
7+
8+
namespace EventMesh.Runtime.AMQP
9+
{
10+
public class AMQPPublisher : IMessagePublisher
11+
{
12+
private readonly AMQPOptions _options;
13+
private readonly IBrokerConfigurationStore _brokerConfigurationStore;
14+
15+
public AMQPPublisher(
16+
IOptions<AMQPOptions> options,
17+
IBrokerConfigurationStore brokerConfigurationStore)
18+
{
19+
_options = options.Value;
20+
_brokerConfigurationStore = brokerConfigurationStore;
21+
}
22+
23+
public string BrokerName
24+
{
25+
get
26+
{
27+
return _options.BrokerName;
28+
}
29+
}
30+
31+
public Task Publish(CloudEvent cloudEvent, string topicName, Client client)
32+
{
33+
var options = GetOptions();
34+
var connectionFactory = new ConnectionFactory();
35+
options.ConnectionFactory(connectionFactory);
36+
using (var connection = connectionFactory.CreateConnection())
37+
{
38+
var channel = connection.CreateModel();
39+
var props = channel.CreateBasicProperties();
40+
cloudEvent.EnrichBasicProperties(props);
41+
channel.BasicPublish(exchange: options.TopicName,
42+
routingKey: topicName,
43+
basicProperties: props,
44+
body: cloudEvent.SerializeBody());
45+
}
46+
47+
return Task.CompletedTask;
48+
}
49+
50+
private AMQPOptions GetOptions()
51+
{
52+
return _brokerConfigurationStore.Get(_options.BrokerName).ToAMQPOptions();
53+
}
54+
}
55+
}

src/EventMesh.Runtime.AMQP/BasicDeliverEventArgsExtensions.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
using CloudNative.CloudEvents;
22
using CloudNative.CloudEvents.Core;
3+
using CloudNative.CloudEvents.SystemTextJson;
4+
using RabbitMQ.Client;
35
using RabbitMQ.Client.Events;
46
using System;
7+
using System.Collections.Generic;
58
using System.IO;
69
using System.Net.Mime;
710
using System.Text;
@@ -13,16 +16,12 @@ public static class BasicDeliverEventArgsExtensions
1316
internal const string AmqpHeaderPrefix = "cloudEvents:";
1417
internal const string SpecVersionAmqpHeader = AmqpHeaderPrefix + "specversion";
1518

16-
public static CloudEvent ToCloudEvent(this BasicDeliverEventArgs message,
17-
CloudEventFormatter cloudEventFormatter,
18-
string source,
19-
string topicName,
20-
params CloudEventAttribute[]? extensionAttributes)
19+
public static CloudEvent ToCloudEvent(this BasicDeliverEventArgs message, CloudEventFormatter cloudEventFormatter, string source, string topicName)
2120
{
2221
var properties = message.BasicProperties;
2322
if (HasCloudEventsContentType(message, out var contentType))
2423
{
25-
return cloudEventFormatter.DecodeStructuredModeMessage(new MemoryStream(message.Body.ToArray()), new ContentType(contentType), extensionAttributes);
24+
return cloudEventFormatter.DecodeStructuredModeMessage(new MemoryStream(message.Body.ToArray()), new ContentType(contentType), null);
2625
}
2726

2827
var cloudEvent = new CloudEvent();
@@ -41,6 +40,21 @@ public static CloudEvent ToCloudEvent(this BasicDeliverEventArgs message,
4140
return cloudEvent;
4241
}
4342

43+
public static byte[] SerializeBody(this CloudEvent cloudEvt)
44+
{
45+
var formatter = new JsonEventFormatter();
46+
return formatter.EncodeBinaryModeEventData(cloudEvt).ToArray();
47+
}
48+
49+
public static void EnrichBasicProperties(this CloudEvent cloudEvt, IBasicProperties basicProperties)
50+
{
51+
basicProperties.ContentType = "application/json";
52+
basicProperties.Headers = new Dictionary<string, object>
53+
{
54+
{ SpecVersionAmqpHeader, CloudEventsSpecVersion.Default.VersionId }
55+
};
56+
}
57+
4458
private static bool HasCloudEventsContentType(BasicDeliverEventArgs message, out string? contentType)
4559
{
4660
contentType = message.BasicProperties.ContentType?.ToString();

src/EventMesh.Runtime.AMQP/EventMeshRuntimeHostBuilderExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public static RuntimeHostBuilder AddAMQP(this RuntimeHostBuilder eventMeshRuntim
3232
}
3333
});
3434
eventMeshRuntime.ServiceCollection.AddSingleton<IMessageConsumer, AMQPConsumer>();
35+
eventMeshRuntime.ServiceCollection.AddSingleton<IMessagePublisher, AMQPPublisher>();
3536
return eventMeshRuntime;
3637
}
3738
}

src/EventMesh.Runtime.Client/Program.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ class Program
77
static async Task<int> Main(string[] args)
88
{
99
// await Scenario1CreateSubSession.Launch();
10-
await Scenario2SubscribeToOneTopic.Launch();
10+
// await Scenario2SubscribeToOneTopic.Launch();
11+
await Scenario3SubscribeToOneTopicAndPublishMessage.Launch();
1112
return 1;
1213
}
1314
}

0 commit comments

Comments
 (0)