Skip to content

Commit fc9bf95

Browse files
jpalacjasontaylordevandreasohlundlailabougria
authored
AsyncAPI Sample (#7611)
* first checkin * publish / subscribe work in basic form * got featur working ok * Add Open API + Minimal API * Add AsyncAPI * Remove Saunter * AsyncAPI sample using reflection (#7619) * generation based on reflection somewhat working * add a second published event and remove the service * Naming cleanup Added Generic host with NSB that writes the file to disk * remove unsued region * remove unused references * removed unused projects * update language version * Added console titles * added some TODOs and start sample.md * remove unused usings * add endpoint name as channel address * cleanup TODOs * remove unused code * Clean-up namespaces * Fix async lambda in ForEach by using foreach with awaited operations * Stop application on other key press. * Describe the AsyncAPI concepts * Apply standard approach (don't make me think) * Rename subscriber events and add more snippets and content to sample.md * add asyncapi to shape the future * cleanup subscriber project doc * move message conventions into a folder, add content to sample.md * Remove unused method * Improve comments * add wording about the message being sent locally * Update sample docs and code * Remove bold * shape the future wording * Nitpicks * AsyncAPI demo with simple and complex versions (#7731) Split the sample into 2 different ones: - simple using IEvent to defined events - more complex one that does the event type translations * fix wording - removing 'you' * More logging and some minor nitpicks * add link to asyncapi feature issue for comments * Add wwwroot to avoid warning on startup * Revert "Add wwwroot to avoid warning on startup" This reverts commit 6a40d59. * Apply suggestions from code review Co-authored-by: Laila Bougria <[email protected]> * Update asyncapi.md with link to public issue * Update samples/asyncapi/simple/sample.md * Update samples/asyncapi/simple/sample.md * Update sample.md remove die comment * Update samples/asyncapi/custom-message-types/sample.md * Update sample.md remove ide comment --------- Co-authored-by: Jason Taylor <[email protected]> Co-authored-by: Andreas Öhlund <[email protected]> Co-authored-by: Laila Bougria <[email protected]>
1 parent 052f0eb commit fc9bf95

File tree

58 files changed

+1640
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1640
-0
lines changed

menu/menu.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1729,3 +1729,5 @@
17291729
Url: shape-the-future/redis
17301730
- Title: NServiceBus and Infrastructure as Code (IaC)
17311731
Url: shape-the-future/iac-provisioning
1732+
- Title: NServiceBus and AsyncAPI
1733+
Url: shape-the-future/asyncapi
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#nullable enable
2+
using Json.Schema;
3+
using Json.Schema.Generation;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Neuroglia.AsyncApi;
6+
using Neuroglia.AsyncApi.FluentBuilders.v3;
7+
using Neuroglia.AsyncApi.Generation;
8+
using Neuroglia.AsyncApi.v3;
9+
10+
namespace AsyncAPI.Feature;
11+
12+
public class ApiDocumentGenerator(IServiceProvider serviceProvider) : IAsyncApiDocumentGenerator
13+
{
14+
public async Task<IEnumerable<IAsyncApiDocument>> GenerateAsync(IEnumerable<Type> markupTypes, AsyncApiDocumentGenerationOptions options, CancellationToken cancellationToken = default)
15+
{
16+
ArgumentNullException.ThrowIfNull(options);
17+
18+
//only creating one document for the one NSB endpoint
19+
var documents = new List<IAsyncApiDocument>(1);
20+
21+
var document = serviceProvider.GetRequiredService<IV3AsyncApiDocumentBuilder>();
22+
options.V3BuilderSetup?.Invoke(document);
23+
24+
await GenerateChannels(document, options, cancellationToken);
25+
26+
documents.Add(document.Build());
27+
28+
return documents;
29+
}
30+
31+
async Task GenerateChannels(IV3AsyncApiDocumentBuilder document, AsyncApiDocumentGenerationOptions options, CancellationToken cancellationToken = default)
32+
{
33+
ArgumentNullException.ThrowIfNull(document);
34+
ArgumentNullException.ThrowIfNull(options);
35+
IV3ChannelDefinitionBuilder channelBuilder = null!;
36+
37+
var typeCache = serviceProvider.GetRequiredService<TypeCache>();
38+
39+
//get all published events
40+
foreach (var (actualType, publishedType) in typeCache.PublishedEventCache.Select(kvp => (kvp.Key, kvp.Value)))
41+
{
42+
var channelName = $"{publishedType.FullName!}";
43+
document.WithChannel(channelName, channel =>
44+
{
45+
channelBuilder = channel;
46+
channel
47+
.WithAddress(typeCache.EndpointName)
48+
.WithDescription(actualType.FullName);
49+
});
50+
await GenerateV3OperationForAsync(document, channelName, channelBuilder, actualType, publishedType, options, cancellationToken);
51+
}
52+
53+
//NOTE this is where more channels and operations can be defined, for example subscribed to events, sent/received commands and messages
54+
}
55+
56+
Task GenerateV3OperationForAsync(IV3AsyncApiDocumentBuilder document, string channelName, IV3ChannelDefinitionBuilder channel, Type actualType, Type producedType, AsyncApiDocumentGenerationOptions options, CancellationToken cancellationToken = default)
57+
{
58+
ArgumentNullException.ThrowIfNull(document);
59+
ArgumentException.ThrowIfNullOrWhiteSpace(channelName);
60+
ArgumentNullException.ThrowIfNull(channel);
61+
ArgumentNullException.ThrowIfNull(actualType);
62+
ArgumentNullException.ThrowIfNull(producedType);
63+
ArgumentNullException.ThrowIfNull(options);
64+
65+
var requestMessagePayloadSchema = new JsonSchemaBuilder().FromType(actualType, JsonSchemaGeneratorConfiguration.Default).Build();
66+
var messageName = producedType.FullName!;
67+
68+
var messageChannelReference = $"#/channels/{channelName}/messages/{producedType.FullName!}";
69+
channel.WithMessage(messageName, message =>
70+
{
71+
message
72+
.WithName(messageName)
73+
.WithPayloadSchema(schema => schema
74+
.WithFormat("application/vnd.aai.asyncapi+json;version=3.0.0")
75+
.WithSchema(requestMessagePayloadSchema));
76+
});
77+
78+
var operationName = $"{producedType.FullName!}";
79+
document.WithOperation(operationName, operation =>
80+
{
81+
operation
82+
.WithAction(V3OperationAction.Send)
83+
.WithChannel($"#/channels/{channelName}")
84+
.WithMessage(messageChannelReference);
85+
});
86+
return Task.CompletedTask;
87+
}
88+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net9.0</TargetFramework>
5+
<LangVersion>13.0</LangVersion>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="NServiceBus" Version="9.*" />
11+
<PackageReference Include="Neuroglia.AsyncApi.AspNetCore" Version="3.0.6" />
12+
<PackageReference Include="Neuroglia.AsyncApi.AspNetCore.UI" Version="3.0.6" />
13+
<PackageReference Include="Neuroglia.AsyncApi.Core" Version="3.0.6" />
14+
<PackageReference Include="Neuroglia.AsyncApi.DependencyInjectionExtensions" Version="3.0.6" />
15+
<PackageReference Include="Neuroglia.AsyncApi.FluentBuilders" Version="3.0.6" />
16+
<PackageReference Include="Neuroglia.AsyncApi.Generation" Version="3.0.6" />
17+
<PackageReference Include="Neuroglia.AsyncApi.Validation" Version="3.0.6" />
18+
</ItemGroup>
19+
20+
</Project>
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
using System.Reflection;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Neuroglia.AsyncApi.Generation;
4+
using NServiceBus.Features;
5+
using NServiceBus.Unicast.Messages;
6+
7+
namespace AsyncAPI.Feature;
8+
9+
public sealed class AsyncApiFeature : NServiceBus.Features.Feature
10+
{
11+
protected override void Setup(FeatureConfigurationContext context)
12+
{
13+
var conventions = context.Settings.Get<Conventions>();
14+
15+
var messageMetadataRegistry = context.Settings.Get<MessageMetadataRegistry>();
16+
17+
var proxyGenerator = new TypeProxyGenerator();
18+
19+
Dictionary<Type, Type> publishedEventCache = new();
20+
Dictionary<string, (Type SubscribedType, Type ActualType)> subscribedEventCache = new();
21+
22+
foreach (var messageMetadata in messageMetadataRegistry.GetAllMessages())
23+
{
24+
//NOTE only events decorated with the "PublishedEvent" or "SubscribedEvent" are being stored so that they can be "translated" at publish and subscribe time from their concrete types
25+
if (conventions.IsEventType(messageMetadata.MessageType))
26+
{
27+
var publishedEvent = messageMetadata.MessageType.GetCustomAttribute<PublishedEvent>();
28+
if (publishedEvent != null)
29+
{
30+
publishedEventCache.Add(messageMetadata.MessageType,
31+
proxyGenerator.CreateTypeFrom($"{publishedEvent.EventName}V{publishedEvent.Version}"));
32+
}
33+
34+
var subscribedEvent = messageMetadata.MessageType.GetCustomAttribute<SubscribedEvent>();
35+
if (subscribedEvent != null)
36+
{
37+
var subscribedType =
38+
proxyGenerator.CreateTypeFrom($"{subscribedEvent.EventName}V{subscribedEvent.Version}");
39+
subscribedEventCache.Add(subscribedType.FullName!,
40+
(SubscribedType: subscribedType, ActualType: messageMetadata.MessageType));
41+
}
42+
}
43+
}
44+
45+
if (context.Settings.GetOrDefault<bool>("Installers.Enable"))
46+
{
47+
context.RegisterStartupTask(static provider => new ManualSubscribe(provider.GetRequiredService<TypeCache>()
48+
.SubscribedEventCache.Values.Select(x => x.SubscribedType).ToArray()));
49+
}
50+
51+
//Registering the behaviors required to replace the outgoing and incoming message types based on the defined conventions
52+
context.Pipeline.Register(
53+
static provider =>
54+
new ReplaceOutgoingEnclosedMessageTypeHeaderBehavior(provider.GetRequiredService<TypeCache>().PublishedEventCache),
55+
"Replaces the outgoing enclosed message type header with the published event type fullname");
56+
context.Pipeline.Register(
57+
static provider => new ReplaceMulticastRoutingBehavior(provider.GetRequiredService<TypeCache>().PublishedEventCache),
58+
"Replaces the multicast routing strategies that match the actual published event type with the published event type name");
59+
60+
if (!context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
61+
{
62+
context.Pipeline.Register(
63+
static provider =>
64+
new ReplaceIncomingEnclosedMessageTypeHeaderBehavior(provider.GetRequiredService<TypeCache>()
65+
.SubscribedEventCache), "Replaces the incoming published event type name with the actual local event type name");
66+
}
67+
68+
#region RegisterEventMappings
69+
context.Services.AddSingleton(new TypeCache
70+
{
71+
EndpointName = context.Settings.EndpointName(),
72+
PublishedEventCache = publishedEventCache,
73+
SubscribedEventCache = subscribedEventCache
74+
});
75+
#endregion
76+
77+
#region RegisterCustomDocumentGenerator
78+
context.Services.AddTransient<IAsyncApiDocumentGenerator>(
79+
provider => new ApiDocumentGenerator(provider));
80+
#endregion
81+
}
82+
83+
class ManualSubscribe(Type[] subscribedEvents) : FeatureStartupTask
84+
{
85+
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
86+
{
87+
return Task.WhenAll(subscribedEvents.Select(subscribedEvent => session.Subscribe(subscribedEvent, cancellationToken: cancellationToken)));
88+
}
89+
90+
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default)
91+
{
92+
return Task.CompletedTask;
93+
}
94+
}
95+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using NServiceBus.Features;
2+
3+
namespace AsyncAPI.Feature;
4+
5+
public static class EndpointConfigurationExtensions
6+
{
7+
#region EnableAsyncApiSupport
8+
public static void EnableAsyncApiSupport(
9+
this EndpointConfiguration endpointConfiguration)
10+
{
11+
endpointConfiguration.DisableFeature<AutoSubscribe>();
12+
endpointConfiguration.EnableFeature<AsyncApiFeature>();
13+
14+
var conventions = endpointConfiguration.Conventions();
15+
conventions.Add(new PublishedEventsConvention());
16+
conventions.Add(new SubscribedEventsConvention());
17+
}
18+
#endregion
19+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace AsyncAPI.Feature;
2+
3+
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
4+
public sealed class PublishedEvent : Attribute
5+
{
6+
public string EventName { get; init; }
7+
public int Version { get; init; }
8+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using System.Reflection;
2+
3+
namespace AsyncAPI.Feature;
4+
5+
public class PublishedEventsConvention : IMessageConvention
6+
{
7+
public bool IsMessageType(Type type)
8+
{
9+
return false;
10+
}
11+
12+
public bool IsCommandType(Type type)
13+
{
14+
return false;
15+
}
16+
17+
public bool IsEventType(Type type)
18+
{
19+
return type.GetCustomAttribute<PublishedEvent>() != null;
20+
}
21+
22+
public string Name { get; } = "AsyncAPI Sample Event Message Convention";
23+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using NServiceBus.Pipeline;
2+
3+
namespace AsyncAPI.Feature;
4+
5+
class ReplaceIncomingEnclosedMessageTypeHeaderBehavior(Dictionary<string, (Type SubscribedType, Type ActualType)> subscribedEventCache)
6+
: IBehavior<ITransportReceiveContext, ITransportReceiveContext>
7+
{
8+
public Task Invoke(ITransportReceiveContext context, Func<ITransportReceiveContext, Task> next)
9+
{
10+
if (context.Message.Headers.TryGetValue(Headers.EnclosedMessageTypes, out var enclosedMessageTypes) && subscribedEventCache.TryGetValue(enclosedMessageTypes, out var subscribedEventType))
11+
{
12+
// very blunt and might break with certain transports
13+
context.Message.Headers[Headers.EnclosedMessageTypes] = subscribedEventType.ActualType.FullName;
14+
}
15+
return next(context);
16+
}
17+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using NServiceBus.Pipeline;
2+
using NServiceBus.Routing;
3+
4+
namespace AsyncAPI.Feature;
5+
6+
class ReplaceMulticastRoutingBehavior(Dictionary<Type, Type> publishedEventCache) : IBehavior<IRoutingContext, IRoutingContext>
7+
{
8+
public Task Invoke(IRoutingContext context, Func<IRoutingContext, Task> next)
9+
{
10+
var logicalMessage = context.Extensions.Get<OutgoingLogicalMessage>();
11+
if (publishedEventCache.TryGetValue(logicalMessage.MessageType, out var publishedEvent))
12+
{
13+
var newStrategies = new List<RoutingStrategy>(context.RoutingStrategies.Count);
14+
var strategies = context.RoutingStrategies;
15+
foreach (var strategy in strategies)
16+
{
17+
if (strategy is MulticastRoutingStrategy multicastRoutingStrategy)
18+
{
19+
// we assume here a multi cast address tag will never do anything with the headers so we pass a static empty dictionary
20+
var multicastAddressTag = (MulticastAddressTag) multicastRoutingStrategy.Apply(emptyHeaders);
21+
if (multicastAddressTag.MessageType == logicalMessage.MessageType)
22+
{
23+
newStrategies.Add(new MulticastRoutingStrategy(publishedEvent));
24+
}
25+
}
26+
else
27+
{
28+
newStrategies.Add(strategy);
29+
}
30+
}
31+
32+
context.RoutingStrategies = newStrategies;
33+
}
34+
35+
return next(context);
36+
}
37+
38+
private readonly Dictionary<string, string> emptyHeaders = new Dictionary<string, string>();
39+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using NServiceBus.Pipeline;
2+
3+
namespace AsyncAPI.Feature;
4+
5+
class ReplaceOutgoingEnclosedMessageTypeHeaderBehavior(Dictionary<Type, Type> publishedEventCache) : IBehavior<IOutgoingPhysicalMessageContext,
6+
IOutgoingPhysicalMessageContext>
7+
{
8+
public Task Invoke(IOutgoingPhysicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> next)
9+
{
10+
var logicalMessage = context.Extensions.Get<OutgoingLogicalMessage>();
11+
if (publishedEventCache.TryGetValue(logicalMessage.MessageType, out var publishedEvent))
12+
{
13+
// very blunt and might break with certain transports
14+
context.Headers[Headers.EnclosedMessageTypes] = publishedEvent.FullName;
15+
}
16+
17+
return next(context);
18+
}
19+
}

0 commit comments

Comments
 (0)