22using Microsoft . Extensions . Logging ;
33using Microsoft . Extensions . Options ;
44using System . Text . Json ;
5+ using System . Text . Json . Serialization ;
56
67namespace DotNetElements . Core ;
78
@@ -19,6 +20,13 @@ public sealed class OutboxService<TDbContext> : IOutboxService
1920 private readonly OutboxOptions outboxOptions ;
2021 private readonly ILogger < OutboxService < TDbContext > > logger ;
2122
23+ private readonly JsonSerializerOptions jsonOptions = new ( )
24+ {
25+ UnmappedMemberHandling = JsonUnmappedMemberHandling . Disallow ,
26+ PropertyNameCaseInsensitive = true
27+ } ;
28+
29+
2230 public OutboxService (
2331 TDbContext dbContext ,
2432 TimeProvider timeProvider ,
@@ -37,12 +45,18 @@ public OutboxService(
3745
3846 public async Task RunAsync ( CancellationToken cancellation )
3947 {
40- IReadOnlyList < OutboxMessage > messages = await GetPendingMessages ( ) ;
48+ IReadOnlyList < OutboxMessage > messages = await GetPendingMessages ( cancellation ) ;
4149
4250 logger . LogInformation ( "Started processing {MessageCount} messages" , messages . Count ) ;
4351
4452 foreach ( var messagesByType in messages . GroupBy ( m => m . Type ) )
4553 {
54+ if ( cancellation . IsCancellationRequested )
55+ {
56+ logger . LogWarning ( "Processing was cancelled" ) ;
57+ break ;
58+ }
59+
4660 logger . LogInformation ( "Started processing {MessageCount} messages of type {MessageType}" , messagesByType . Count ( ) , messagesByType . Key ) ;
4761
4862 Type ? messageType = outboxOptions . MessagesAssembly ! . GetType ( messagesByType . Key ) ;
@@ -66,17 +80,31 @@ public async Task RunAsync(CancellationToken cancellation)
6680
6781 foreach ( OutboxMessage message in messagesByType )
6882 {
83+ if ( cancellation . IsCancellationRequested )
84+ {
85+ logger . LogWarning ( "Processing was cancelled" ) ;
86+ break ;
87+ }
88+
6989 try
7090 {
71- object ? deserializedMessage = JsonSerializer . Deserialize ( message . Content , messageType ) ;
91+ object ? deserializedMessage = JsonSerializer . Deserialize ( message . Content , messageType , jsonOptions ) ;
7292
7393 if ( deserializedMessage is null )
7494 {
7595 logger . LogError ( "Failed to process message {MessageId} of type {MessageType}. Failed to read content" , message . Id , message . Type ) ;
7696 continue ;
7797 }
7898
79- await messageProcessor . ProcessAsync ( deserializedMessage ) ;
99+ Result processMessageResult = await messageProcessor . ProcessAsync ( deserializedMessage , cancellation ) ;
100+
101+ if ( processMessageResult . IsFail )
102+ {
103+ logger . LogError ( "Failed to process message {MessageId} of type {MessageType}. Error: {Error}" , message . Id , message . Type , processMessageResult . ErrorMessage ) ;
104+
105+ await HandleFailedMessage ( message , processMessageResult . ErrorMessage ) ;
106+ continue ;
107+ }
80108
81109 bool updateSuccess = await UpdateMessageStatus ( message , processedOnUtc : timeProvider . GetUtcNow ( ) ) ;
82110
@@ -102,14 +130,14 @@ public async Task RunAsync(CancellationToken cancellation)
102130 logger . LogInformation ( "Finished processing messages" ) ;
103131 }
104132
105- private async Task < IReadOnlyList < OutboxMessage > > GetPendingMessages ( )
133+ private async Task < IReadOnlyList < OutboxMessage > > GetPendingMessages ( CancellationToken cancellation )
106134 {
107135 try
108136 {
109137 return await dbContext . OutboxMessages
110138 . Where ( m => m . ProcessedOnUtc == null )
111139 . OrderBy ( m => m . OccurredOnUtc )
112- . ToListAsync ( ) ;
140+ . ToListAsync ( cancellation ) ;
113141 }
114142 catch ( Exception ex )
115143 {
@@ -131,7 +159,7 @@ private async Task HandleFailedMessage(OutboxMessage message, string error)
131159 {
132160 logger . LogError ( "Failed to process message {MessageId} of type {MessageType}. Error: Max retry count reached" , message . Id , message . Type ) ;
133161
134- bool updateSuccess = await UpdateMessageStatus ( message , processedOnUtc : timeProvider . GetUtcNow ( ) , error : error , retryCount : message . RetryCount + 1 ) ;
162+ bool updateSuccess = await UpdateMessageStatus ( message , processedOnUtc : timeProvider . GetUtcNow ( ) , error : error , retryCount : message . RetryCount ) ;
135163
136164 // todo check what to do if update fails here
137165 }
@@ -148,8 +176,6 @@ private async Task<bool> UpdateMessageStatus(OutboxMessage message, DateTimeOffs
148176 if ( retryCount is not null )
149177 message . RetryCount = retryCount . Value ;
150178
151- //dbContext.OutboxMessages.Update(message); // todo remove, should not be needed as EF already tracks the entity
152-
153179 int numRowsUpdated = await dbContext . SaveChangesAsync ( ) ;
154180
155181 bool success = numRowsUpdated == 1 ;
0 commit comments