1+ namespace ServiceControl . Persistence . Tests . Recoverability
2+ {
3+ using System ;
4+ using System . Collections . Generic ;
5+ using System . Linq ;
6+ using System . Text ;
7+ using System . Threading ;
8+ using System . Threading . Tasks ;
9+ using Contracts . Operations ;
10+ using MessageFailures ;
11+ using Microsoft . Extensions . DependencyInjection ;
12+ using NServiceBus . Testing ;
13+ using NServiceBus . Transport ;
14+ using NUnit . Framework ;
15+ using ServiceControl . Persistence . MessageRedirects ;
16+ using ServiceControl . Recoverability ;
17+ using ServiceControl . Recoverability . Editing ;
18+
19+ sealed class EditMessageTestsCopy : PersistenceTestBase
20+ {
21+ EditHandlerCopy handler ;
22+ readonly TestableUnicastDispatcherCopy dispatcher = new ( ) ;
23+ readonly ErrorQueueNameCache errorQueueNameCache = new ( )
24+ {
25+ ResolvedErrorAddress = "errorQueueName"
26+ } ;
27+
28+ public EditMessageTestsCopy ( ) =>
29+ RegisterServices = services => services
30+ . AddSingleton < IMessageDispatcher > ( dispatcher )
31+ . AddSingleton ( errorQueueNameCache )
32+ . AddTransient < EditHandlerCopy > ( ) ;
33+
34+ [ SetUp ]
35+ public void Setup ( ) => handler = ServiceProvider . GetRequiredService < EditHandlerCopy > ( ) ;
36+
37+ [ Test ]
38+ public async Task Should_discard_edit_when_failed_message_not_exists ( )
39+ {
40+ var message = CreateEditMessage ( "some-id" ) ;
41+ await handler . Handle ( message , message . FailedMessageId ) ;
42+
43+ Assert . That ( dispatcher . DispatchedMessages , Is . Empty ) ;
44+ }
45+
46+ [ Test ]
47+ [ TestCase ( FailedMessageStatus . RetryIssued ) ]
48+ [ TestCase ( FailedMessageStatus . Archived ) ]
49+ [ TestCase ( FailedMessageStatus . Resolved ) ]
50+ public async Task Should_discard_edit_if_edited_message_not_unresolved ( FailedMessageStatus status )
51+ {
52+ var failedMessageId = Guid . NewGuid ( ) . ToString ( "D" ) ;
53+ await CreateAndStoreFailedMessage ( failedMessageId , status ) ;
54+
55+ var message = CreateEditMessage ( failedMessageId ) ;
56+ await handler . Handle ( message , message . FailedMessageId ) ;
57+
58+ var failedMessage = await ErrorMessageDataStore . ErrorBy ( failedMessageId ) ;
59+
60+ var editFailedMessagesManager = await ErrorMessageDataStore . CreateEditFailedMessageManager ( ) ;
61+ var editOperation = await editFailedMessagesManager . GetCurrentEditingMessageId ( failedMessageId ) ;
62+
63+ Assert . Multiple ( ( ) =>
64+ {
65+ Assert . That ( failedMessage . Status , Is . EqualTo ( status ) ) ;
66+ Assert . That ( editOperation , Is . Null ) ;
67+ Assert . That ( dispatcher . DispatchedMessages , Is . Empty ) ;
68+ } ) ;
69+ }
70+
71+ [ Test ]
72+ public async Task Should_discard_edit_when_different_edit_already_exists ( )
73+ {
74+ var failedMessageId = Guid . NewGuid ( ) . ToString ( ) ;
75+ var previousEdit = Guid . NewGuid ( ) . ToString ( ) ;
76+
77+ _ = await CreateAndStoreFailedMessage ( failedMessageId ) ;
78+
79+ using ( var editFailedMessagesManager = await ErrorMessageDataStore . CreateEditFailedMessageManager ( ) )
80+ {
81+ _ = await editFailedMessagesManager . GetFailedMessage ( failedMessageId ) ;
82+ await editFailedMessagesManager . SetCurrentEditingMessageId ( previousEdit ) ;
83+ await editFailedMessagesManager . SaveChanges ( ) ;
84+ }
85+
86+ var message = CreateEditMessage ( failedMessageId ) ;
87+
88+ // Act
89+ await handler . Handle ( message , message . FailedMessageId ) ;
90+
91+ using ( var editFailedMessagesManagerAssert = await ErrorMessageDataStore . CreateEditFailedMessageManager ( ) )
92+ {
93+ var failedMessage = await editFailedMessagesManagerAssert . GetFailedMessage ( failedMessageId ) ;
94+ var editId = await editFailedMessagesManagerAssert . GetCurrentEditingMessageId ( failedMessageId ) ;
95+
96+ Assert . Multiple ( ( ) =>
97+ {
98+ Assert . That ( editId , Is . EqualTo ( previousEdit ) ) ;
99+ Assert . That ( failedMessage . Status , Is . EqualTo ( FailedMessageStatus . Unresolved ) ) ;
100+ } ) ;
101+ }
102+
103+ Assert . That ( dispatcher . DispatchedMessages , Is . Empty ) ;
104+ }
105+
106+ //[Test]
107+ //public async Task Should_dispatch_edited_message_when_first_edit()
108+ //{
109+ // var failedMessage = await CreateAndStoreFailedMessage();
110+
111+ // var newBodyContent = Encoding.UTF8.GetBytes("new body content");
112+ // var newHeaders = new Dictionary<string, string> { { "someKey", "someValue" } };
113+ // var message = CreateEditMessage(failedMessage.UniqueMessageId, newBodyContent, newHeaders);
114+
115+ // var handlerContent = message.FailedMessageId;
116+ // await handler.Handle(message, handlerContent);
117+
118+ // var dispatchedMessage = dispatcher.DispatchedMessages.Single();
119+ // Assert.Multiple(() =>
120+ // {
121+ // Assert.That(
122+ // dispatchedMessage.Item1.Destination,
123+ // Is.EqualTo(failedMessage.ProcessingAttempts.Last().FailureDetails.AddressOfFailingEndpoint));
124+ // Assert.That(dispatchedMessage.Item1.Message.Body.ToArray(), Is.EqualTo(newBodyContent));
125+ // Assert.That(dispatchedMessage.Item1.Message.Headers["someKey"], Is.EqualTo("someValue"));
126+ // });
127+
128+ // using (var x = await ErrorMessageDataStore.CreateEditFailedMessageManager())
129+ // {
130+ // var failedMessage2 = await x.GetFailedMessage(failedMessage.UniqueMessageId);
131+ // Assert.That(failedMessage2, Is.Not.Null, "Edited failed message");
132+
133+ // var editId = await x.GetCurrentEditingMessageId(failedMessage2.UniqueMessageId);
134+
135+ // Assert.Multiple(() =>
136+ // {
137+ // Assert.That(failedMessage2.Status, Is.EqualTo(FailedMessageStatus.Resolved), "Failed message status");
138+ // Assert.That(editId, Is.EqualTo(handlerContent.MessageId), "MessageId");
139+ // });
140+ // }
141+ //}
142+
143+ //[Test]
144+ //public async Task Should_dispatch_edited_message_when_retrying()
145+ //{
146+ // var failedMessageId = Guid.NewGuid().ToString();
147+ // await CreateAndStoreFailedMessage(failedMessageId);
148+
149+ // var handlerContext = new TestableMessageHandlerContext();
150+ // var message = CreateEditMessage(failedMessageId);
151+ // await handler.Handle(message, handlerContext);
152+ // await handler.Handle(message, handlerContext);
153+
154+ // Assert.That(dispatcher.DispatchedMessages, Has.Count.EqualTo(2), "Dispatched message count");
155+ //}
156+
157+ //[Test]
158+ //public async Task Should_dispatch_message_using_incoming_transaction()
159+ //{
160+ // var failedMessage = await CreateAndStoreFailedMessage();
161+ // var message = CreateEditMessage(failedMessage.UniqueMessageId);
162+ // var handlerContent = new TestableMessageHandlerContext();
163+ // var transportTransaction = new TransportTransaction();
164+ // handlerContent.Extensions.Set(transportTransaction);
165+
166+ // await handler.Handle(message, handlerContent);
167+
168+ // Assert.That(transportTransaction, Is.SameAs(dispatcher.DispatchedMessages.Single().Item2));
169+ //}
170+
171+ //[Test]
172+ //public async Task Should_route_to_redirect_route_if_exists()
173+ //{
174+ // const string redirectAddress = "a different destination";
175+ // var failedMessage = await CreateAndStoreFailedMessage();
176+ // var message = CreateEditMessage(failedMessage.UniqueMessageId);
177+
178+ // var redirects = await MessageRedirectsDataStore.GetOrCreate();
179+ // redirects.Redirects.Add(new MessageRedirect
180+ // {
181+ // FromPhysicalAddress = failedMessage.ProcessingAttempts.Last().FailureDetails.AddressOfFailingEndpoint,
182+ // ToPhysicalAddress = redirectAddress
183+ // });
184+ // await MessageRedirectsDataStore.Save(redirects);
185+
186+ // await handler.Handle(message, new TestableInvokeHandlerContext());
187+
188+ // var sentMessage = dispatcher.DispatchedMessages.Single().Item1;
189+ // Assert.That(sentMessage.Destination, Is.EqualTo(redirectAddress));
190+ //}
191+
192+ //[Test]
193+ //public async Task Should_mark_edited_message_with_edit_information()
194+ //{
195+ // var messageFailure = await CreateAndStoreFailedMessage();
196+ // var message = CreateEditMessage(messageFailure.UniqueMessageId);
197+
198+ // await handler.Handle(message, new TestableInvokeHandlerContext());
199+
200+ // var sentMessage = dispatcher.DispatchedMessages.Single();
201+ // Assert.That(
202+ // "FailedMessages/" + sentMessage.Item1.Message.Headers["ServiceControl.EditOf"],
203+ // Is.EqualTo(messageFailure.Id));
204+ //}
205+
206+ //[Test]
207+ //public async Task Should_assign_edited_message_new_message_id()
208+ //{
209+ // var messageFailure = await CreateAndStoreFailedMessage();
210+ // var message = CreateEditMessage(messageFailure.UniqueMessageId);
211+
212+ // await handler.Handle(message, new TestableInvokeHandlerContext());
213+
214+ // var sentMessage = dispatcher.DispatchedMessages.Single();
215+ // Assert.That(
216+ // sentMessage.Item1.Message.MessageId,
217+ // Is.Not.EqualTo(messageFailure.ProcessingAttempts.Last().MessageId));
218+ //}
219+
220+ //[Test]
221+ //public async Task Should_assign_correct_akcnowledgment_queue_address_when_editing_and_retyring()
222+ //{
223+ // var messageFailure = await CreateAndStoreFailedMessage();
224+ // var message = CreateEditMessage(messageFailure.UniqueMessageId);
225+
226+ // await handler.Handle(message, new TestableInvokeHandlerContext());
227+
228+ // var sentMessage = dispatcher.DispatchedMessages.Single();
229+ // Assert.That(
230+ // sentMessage.Item1.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"],
231+ // Is.EqualTo(errorQueueNameCache.ResolvedErrorAddress));
232+ //}
233+
234+ static EditAndSend CreateEditMessage ( string failedMessageId , byte [ ] newBodyContent = null , Dictionary < string , string > newHeaders = null )
235+ {
236+ return new EditAndSend
237+ {
238+ FailedMessageId = failedMessageId ,
239+ NewBody = Convert . ToBase64String ( newBodyContent ?? Encoding . UTF8 . GetBytes ( Guid . NewGuid ( ) . ToString ( ) ) ) ,
240+ NewHeaders = newHeaders ?? [ ]
241+ } ;
242+ }
243+
244+ async Task < FailedMessage > CreateAndStoreFailedMessage ( string failedMessageId = null , FailedMessageStatus status = FailedMessageStatus . Unresolved )
245+ {
246+ failedMessageId ??= Guid . NewGuid ( ) . ToString ( ) ;
247+
248+ var failedMessage = new FailedMessage
249+ {
250+ UniqueMessageId = failedMessageId ,
251+ Id = FailedMessageIdGenerator . MakeDocumentId ( failedMessageId ) ,
252+ Status = status ,
253+ ProcessingAttempts =
254+ [
255+ new FailedMessage . ProcessingAttempt
256+ {
257+ MessageId = Guid . NewGuid ( ) . ToString ( ) ,
258+ FailureDetails = new FailureDetails
259+ {
260+ AddressOfFailingEndpoint = "OriginalEndpointAddress"
261+ }
262+ }
263+ ]
264+ } ;
265+ await ErrorMessageDataStore . StoreFailedMessagesForTestsOnly ( new [ ] { failedMessage } ) ;
266+ return failedMessage ;
267+ }
268+ }
269+
270+ public sealed class TestableUnicastDispatcherCopy : IMessageDispatcher
271+ {
272+ public List < ( UnicastTransportOperation , TransportTransaction ) > DispatchedMessages { get ; } = [ ] ;
273+
274+ public Task Dispatch ( TransportOperations outgoingMessages , TransportTransaction transaction , CancellationToken cancellationToken )
275+ {
276+ DispatchedMessages . AddRange ( outgoingMessages . UnicastTransportOperations . Select ( m => ( m , transaction ) ) ) ;
277+ return Task . CompletedTask ;
278+ }
279+ }
280+ }
0 commit comments