1+ namespace SlimMessageBus . Host . Redis . Test ;
2+
3+ using System . Text ;
4+
5+ using Serialization ;
6+
7+ using StackExchange . Redis ;
8+
9+ public class RedisListCheckerConsumerTest
10+ {
11+ private readonly Mock < IDatabase > _databaseMock ;
12+ private readonly Mock < IMessageProcessor < MessageWithHeaders > > _messageProcessorMock ;
13+ private readonly Mock < IMessageSerializer > _envelopeSerializerMock ;
14+ private readonly RedisListCheckerConsumer _subject ;
15+ private readonly string _queueName = "test-queue" ;
16+ private readonly MessageWithHeaders _testMessage ;
17+
18+ public RedisListCheckerConsumerTest ( )
19+ {
20+ _databaseMock = new Mock < IDatabase > ( ) ;
21+ _messageProcessorMock = new Mock < IMessageProcessor < MessageWithHeaders > > ( ) ;
22+ _envelopeSerializerMock = new Mock < IMessageSerializer > ( ) ;
23+ var testPayload = "{\" Data\" :\" test\" }"u8 . ToArray ( ) ;
24+ _testMessage = new MessageWithHeaders ( testPayload , new Dictionary < string , object > ( ) ) ;
25+ var queues = new [ ] { ( _queueName , _messageProcessorMock . Object ) } ;
26+ _subject = new RedisListCheckerConsumer (
27+ NullLogger < RedisListCheckerConsumer > . Instance ,
28+ new List < IAbstractConsumerInterceptor > ( ) ,
29+ _databaseMock . Object ,
30+ pollDelay : TimeSpan . FromMilliseconds ( 10 ) ,
31+ maxIdle : TimeSpan . FromMilliseconds ( 50 ) ,
32+ queues ,
33+ _envelopeSerializerMock . Object ) ;
34+ }
35+
36+ [ Fact ]
37+ public async Task Should_ProcessMessage_AfterException ( )
38+ {
39+ // Arrange
40+ var processedMessages = new List < MessageWithHeaders > ( ) ;
41+ var callCount = 0 ;
42+
43+ _databaseMock
44+ . Setup ( x => x . ListLeftPopAsync ( It . IsAny < RedisKey > ( ) , It . IsAny < CommandFlags > ( ) ) )
45+ . ReturnsAsync ( ( ) =>
46+ {
47+ callCount ++ ;
48+ if ( callCount == 1 )
49+ throw new RedisConnectionException ( ConnectionFailureType . SocketFailure , "Connection failed" ) ;
50+ if ( callCount == 2 )
51+ return ( RedisValue ) "serialized-message" ;
52+ return RedisValue . Null ;
53+ } ) ;
54+
55+ _envelopeSerializerMock
56+ . Setup ( x => x . Deserialize ( typeof ( MessageWithHeaders ) , null , It . IsAny < byte [ ] > ( ) , null ) )
57+ . Returns ( _testMessage ) ;
58+
59+ var tcs = new TaskCompletionSource ( ) ;
60+
61+ _messageProcessorMock
62+ . Setup ( x => x . ProcessMessage (
63+ It . IsAny < MessageWithHeaders > ( ) ,
64+ It . IsAny < IReadOnlyDictionary < string , object > > ( ) ,
65+ It . IsAny < IDictionary < string , object > > ( ) ,
66+ It . IsAny < IServiceProvider > ( ) ,
67+ It . IsAny < CancellationToken > ( ) ) )
68+ . Returns < MessageWithHeaders , IReadOnlyDictionary < string , object > , IDictionary < string , object > ,
69+ IServiceProvider , CancellationToken > ( ( msg , _ , _ , _ , _ ) =>
70+ {
71+ processedMessages . Add ( msg ) ;
72+ tcs . SetResult ( ) ;
73+ return Task . FromResult ( new ProcessMessageResult { Result = ProcessResult . Success } ) ;
74+ } ) ;
75+
76+ // Act
77+ _ = _subject . Start ( ) ;
78+ await tcs . Task . WaitAsync ( TimeSpan . FromSeconds ( 1 ) ) ; // Wait for processing or timeout
79+ await _subject . Stop ( ) ;
80+
81+ // Assert
82+ _testMessage . Should ( ) . BeEquivalentTo ( processedMessages [ 0 ] ) ;
83+ }
84+
85+ [ Fact ]
86+ public async Task Should_ProcessMultipleMessages_Successfully ( )
87+ {
88+ // Arrange
89+ var processedMessages = new List < MessageWithHeaders > ( ) ;
90+ var callCount = 0 ;
91+ var totalMessages = 3 ;
92+ var tcs = new TaskCompletionSource ( ) ;
93+
94+ _databaseMock
95+ . Setup ( x => x . ListLeftPopAsync ( It . IsAny < RedisKey > ( ) , It . IsAny < CommandFlags > ( ) ) )
96+ . ReturnsAsync ( ( ) =>
97+ {
98+ callCount ++ ;
99+ if ( callCount <= totalMessages ) return ( RedisValue ) $ "serialized-message-{ callCount } ";
100+ return RedisValue . Null ;
101+ } ) ;
102+
103+ _envelopeSerializerMock
104+ . Setup ( x => x . Deserialize ( typeof ( MessageWithHeaders ) , null , It . IsAny < byte [ ] > ( ) , null ) )
105+ . Returns ( ( Type type , string _ , byte [ ] bytes , string __ ) =>
106+ {
107+ // Simulate unique payload for each message
108+ return new MessageWithHeaders ( bytes , new Dictionary < string , object > ( ) ) ;
109+ } ) ;
110+
111+ _messageProcessorMock
112+ . Setup ( x => x . ProcessMessage (
113+ It . IsAny < MessageWithHeaders > ( ) ,
114+ It . IsAny < IReadOnlyDictionary < string , object > > ( ) ,
115+ It . IsAny < IDictionary < string , object > > ( ) ,
116+ It . IsAny < IServiceProvider > ( ) ,
117+ It . IsAny < CancellationToken > ( ) ) )
118+ . Returns < MessageWithHeaders , IReadOnlyDictionary < string , object > , IDictionary < string , object > ,
119+ IServiceProvider , CancellationToken > ( ( msg , _ , _ , _ , _ ) =>
120+ {
121+ processedMessages . Add ( msg ) ;
122+ if ( processedMessages . Count == totalMessages ) tcs . SetResult ( ) ;
123+
124+ return Task . FromResult ( new ProcessMessageResult { Result = ProcessResult . Success } ) ;
125+ } ) ;
126+
127+ // Act
128+ _ = _subject . Start ( ) ;
129+ await tcs . Task . WaitAsync ( TimeSpan . FromSeconds ( 1 ) ) ; // Wait for all messages or timeout
130+ await _subject . Stop ( ) ;
131+
132+ // Assert
133+ processedMessages . Count ( ) . Should ( ) . Be ( totalMessages ) ;
134+ for ( var i = 0 ; i < totalMessages ; i ++ )
135+ {
136+ var expectedPayload = Encoding . UTF8 . GetBytes ( $ "serialized-message-{ i + 1 } ") ;
137+ processedMessages [ i ] . Payload . Should ( ) . BeEquivalentTo ( expectedPayload ) ;
138+ }
139+ }
140+ }
0 commit comments