-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathProgram.cs
More file actions
87 lines (78 loc) · 3.02 KB
/
Program.cs
File metadata and controls
87 lines (78 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Motor.Extensions.Conversion.SystemJson;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.Consumer;
using Motor.Extensions.Hosting.RabbitMQ;
using Motor.Extensions.Utilities;
await MotorHost
.CreateDefaultBuilder()
// Configure input message type
.ConfigureNoOutputService<SomeMessage>()
.ConfigureServices(
(_, services) =>
{
// Add a handler for the input message
// This handler is called for every new incoming message
services.AddTransient<INoOutputService<SomeMessage>, SomeNoOutputService>();
}
)
// Add the incoming communication module.
.ConfigureConsumer<SomeMessage>(
(_, builder) =>
{
// In this case the messages are received from RabbitMQ
builder.AddRabbitMQ();
// The encoding of the incoming message, such that the handler is able to deserialize the message
builder.AddSystemJson();
}
)
.RunConsoleAsync();
// This is the message handler, we added above
public class SomeNoOutputService : INoOutputService<SomeMessage>
{
// Handle incoming messages
public Task<ProcessedMessageStatus> HandleMessageAsync(
MotorCloudEvent<SomeMessage> inputEvent,
CancellationToken token = default
)
{
// Get the input message from the cloud event
var input = inputEvent.TypedData;
if (!IsValid(input))
{
// Depending on the configuration of RepublishInvalidInputToDeadLetterExchange (cf. appsettings.json)
// the message will be either republished to the configured dead Letter exchange if set to true
// However, if set to false, which is also the default, the message will be acknowledged
return Task.FromResult(ProcessedMessageStatus.InvalidInput);
}
// Process the message
var processingResult = ProcessMessage(input);
// Depending on our processingResult
return processingResult switch
{
// We can either return ProcessedMessageStatus.Failure or throw a
// FailureException to indicate that message processing was not successful
// and republish the message to the configured dead letter exchange
0 => Task.FromResult(ProcessedMessageStatus.Failure),
1 => throw new FailureException("i failed"),
// Return ProcessedMessageStatus.Success when message processing succeeds
_ => Task.FromResult(ProcessedMessageStatus.Success),
};
}
private static int ProcessMessage(SomeMessage message)
{
return message.FancyNumber * message.FancyNumber;
}
private static bool IsValid(SomeMessage message)
{
return message.FancyNumber > 0;
}
}
public record SomeMessage
{
public int FancyNumber { get; set; } = 42;
}