Skip to content

Commit 075491b

Browse files
lailabougriaandreasohlundjpalacjasontaylordev
authored
Avro sample (#7612)
* avro basic sample * remove unneeded references * Null check * Formatting * Tweaks * Return empty array when message type is not known * Thrown better exception when schema not found * Naming * Use record * Basic sample instructions * Switch to json * Link to sample for from shape the future * Add avro landing page * Throw when asked to infer message type from payload * Document interface messages not being supported * Add note about not suitable for production use * Add comment to the body logger * Use unrecoverable exception * Add note about what happens when schema is not found * Add code section * Update sample.md * Update sample.md * Update menu.yaml * Update avro.md * Update sample.md * Update sample.md * Update sample.md * throw a messagedeserializationexception is schema is not found * Update sample.md * links * fix link * align titles * menu title * link to issue * fix note display * Add Avro logo and make call-to-action more inviting for use case feedback * Enable implicit usings and clean-up imports * Avoid line-wrapping within docs snippets * Add intro sentences for configuration, sending message, and output examples --------- Co-authored-by: Andreas Öhlund <[email protected]> Co-authored-by: Jo Palac <[email protected]> Co-authored-by: Jason Taylor <[email protected]>
1 parent be8c918 commit 075491b

17 files changed

+443
-0
lines changed

menu/menu.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,8 @@
17211721
Url: shape-the-future
17221722
- Title: NServiceBus and CloudEvents
17231723
Url: shape-the-future/cloudevents
1724+
- Title: NServiceBus and Apache Avro
1725+
Url: shape-the-future/avro
17241726
- Title: NServiceBus and .NET Aspire
17251727
Url: shape-the-future/aspire
17261728
- Title: NServiceBus and Redis
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.29728.190
5+
MinimumVisualStudioVersion = 15.0.26730.12
6+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample", "Sample\Sample.csproj", "{6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
EndGlobalSection
12+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
13+
{6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
14+
{6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}.Debug|Any CPU.Build.0 = Debug|Any CPU
15+
EndGlobalSection
16+
GlobalSection(SolutionProperties) = preSolution
17+
HideSolutionNode = FALSE
18+
EndGlobalSection
19+
EndGlobal
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using Avro.IO;
2+
using Avro.Reflect;
3+
using NServiceBus.Serialization;
4+
5+
#region serializer-implementation
6+
public class AvroMessageSerializer(SchemaRegistry schemaRegistry, ClassCache classCache) : IMessageSerializer
7+
{
8+
public string ContentType => "avro/json";
9+
10+
public void Serialize(object message, Stream stream)
11+
{
12+
var messageType = message.GetType();
13+
var schema = schemaRegistry.GetSchema(messageType);
14+
var writer = new ReflectDefaultWriter(messageType, schema, classCache);
15+
16+
var encoder = new JsonEncoder(schema, stream);
17+
18+
writer.Write(message, encoder);
19+
20+
encoder.Flush();
21+
}
22+
23+
public object[] Deserialize(ReadOnlyMemory<byte> body, IList<Type> messageTypes = null)
24+
{
25+
if (messageTypes == null)
26+
{
27+
throw new MessageDeserializationException(
28+
"Avro is not able to infer message types from the body content only," +
29+
"the NServiceBus.EnclosedMessageTypes header must be present");
30+
}
31+
32+
var messages = new List<object>();
33+
foreach (var messageType in messageTypes)
34+
{
35+
try
36+
{
37+
var schema = schemaRegistry.GetSchema(messageType);
38+
var reader = new ReflectDefaultReader(messageType, schema, schema, classCache);
39+
using var stream = new ReadOnlyStream(body);
40+
var message = reader.Read(null, schema, schema, new JsonDecoder(schema, stream));
41+
messages.Add(message);
42+
}
43+
catch (KeyNotFoundException)
44+
{
45+
throw new MessageDeserializationException(
46+
$"No schema found for message type {messageType.FullName}");
47+
}
48+
}
49+
50+
return messages.ToArray();
51+
}
52+
}
53+
#endregion
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System.Reflection;
2+
using Avro;
3+
using Avro.Reflect;
4+
using NServiceBus.MessageInterfaces;
5+
using NServiceBus.Serialization;
6+
using NServiceBus.Settings;
7+
using NServiceBus.Unicast.Messages;
8+
9+
#region serializer-definition
10+
public class AvroSerializer : SerializationDefinition
11+
{
12+
public override Func<IMessageMapper, IMessageSerializer> Configure(IReadOnlySettings settings)
13+
{
14+
var registry = settings.Get<MessageMetadataRegistry>();
15+
var messageTypes = registry.GetAllMessages().Select(m => m.MessageType);
16+
var schemaCache = new SchemaRegistry();
17+
var assembly = Assembly.GetExecutingAssembly();
18+
19+
foreach (var messageType in messageTypes)
20+
{
21+
var manifestNamespace = "Sample.";
22+
var schemaResourceName = manifestNamespace + messageType.Name + ".avsc";
23+
using var stream = assembly.GetManifestResourceStream(schemaResourceName);
24+
25+
if (stream == null)
26+
{
27+
throw new InvalidOperationException(
28+
$"Resource '{schemaResourceName}' not found in assembly '{assembly.FullName}'.");
29+
}
30+
31+
// Load the schema from the embedded resource
32+
using var reader = new StreamReader(stream);
33+
var schemaJson = reader.ReadToEnd();
34+
35+
// Parse and cache the schema
36+
schemaCache.Add(messageType, Schema.Parse(schemaJson));
37+
}
38+
39+
return _ => new AvroMessageSerializer(schemaCache, new ClassCache());
40+
}
41+
}
42+
#endregion
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{
2+
"type": "record",
3+
"name": "CreateOrder",
4+
"namespace": "net.particular.samples.serializers.avro",
5+
"fields": [
6+
{
7+
"name": "OrderId",
8+
"type": "int"
9+
},
10+
{
11+
"name": "Date",
12+
"type": {
13+
"type": "long",
14+
"logicalType": "timestamp-millis"
15+
}
16+
},
17+
{
18+
"name": "CustomerId",
19+
"type": "int"
20+
},
21+
{
22+
"name": "OrderItems",
23+
"type": {
24+
"type": "array",
25+
"items": {
26+
"type": "record",
27+
"name": "OrderItem",
28+
"fields": [
29+
{
30+
"name": "ItemId",
31+
"type": "int"
32+
},
33+
{
34+
"name": "Quantity",
35+
"type": "int"
36+
}
37+
]
38+
}
39+
}
40+
}
41+
]
42+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
public record CreateOrder : IMessage
2+
{
3+
public int OrderId { get; set; }
4+
public DateTime Date { get; set; }
5+
public int CustomerId { get; set; }
6+
public List<OrderItem> OrderItems { get; set; }
7+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
public class CreateOrderHandler(ILogger<CreateOrderHandler> logger) :
4+
IHandleMessages<CreateOrder>
5+
{
6+
public Task Handle(CreateOrder message, IMessageHandlerContext context)
7+
{
8+
logger.LogInformation($"Order {message.OrderId} received");
9+
return Task.CompletedTask;
10+
}
11+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using Microsoft.Extensions.Hosting;
2+
3+
public class InputLoopService(IMessageSession messageSession) : BackgroundService
4+
{
5+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
6+
{
7+
while (!stoppingToken.IsCancellationRequested)
8+
{
9+
Console.WriteLine("Press any key, to send a message");
10+
Console.ReadKey();
11+
12+
#region message
13+
14+
var message = new CreateOrder
15+
{
16+
OrderId = 9,
17+
Date = DateTime.Now,
18+
CustomerId = 12,
19+
OrderItems =
20+
[
21+
new OrderItem
22+
{
23+
ItemId = 6,
24+
Quantity = 2
25+
},
26+
27+
new OrderItem
28+
{
29+
ItemId = 5,
30+
Quantity = 4
31+
}
32+
]
33+
};
34+
35+
await messageSession.SendLocal(message, cancellationToken: stoppingToken);
36+
37+
#endregion
38+
39+
Console.WriteLine("Message Sent");
40+
}
41+
}
42+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System.Text;
2+
using Microsoft.Extensions.Logging;
3+
using NServiceBus.Pipeline;
4+
5+
// The behavior is only needed to print the serialized avro message content to the console
6+
public class MessageBodyLogger(ILogger<MessageBodyLogger> logger) : Behavior<IIncomingPhysicalMessageContext>
7+
{
8+
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
9+
{
10+
var bodyAsString = Encoding.UTF8
11+
.GetString(context.Message.Body.ToArray());
12+
13+
logger.LogInformation("Serialized Message Body:");
14+
logger.LogInformation(bodyAsString);
15+
16+
return next();
17+
}
18+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
public class OrderItem
2+
{
3+
public int ItemId { get; set; }
4+
public int Quantity { get; set; }
5+
}

0 commit comments

Comments
 (0)