1414using System . Text . Json ;
1515using System . Threading ;
1616using System . Threading . Tasks ;
17+ using RabbitMQ . Client ;
1718using RabbitMQ . Stream . Client ;
1819using RabbitMQ . Stream . Client . AMQP ;
1920using Xunit ;
@@ -128,7 +129,8 @@ public static async Task PublishMessages(StreamSystem system, string stream, int
128129 public static async Task PublishMessages ( StreamSystem system , string stream , int numberOfMessages ,
129130 string producerName , ITestOutputHelper testOutputHelper )
130131 {
131- testOutputHelper . WriteLine ( "Publishing messages..." ) ;
132+ testOutputHelper . WriteLine (
133+ $ "Publishing messages to the stream { stream } number of messages { numberOfMessages } ") ;
132134
133135 var testPassed = new TaskCompletionSource < int > ( ) ;
134136 var count = 0 ;
@@ -155,11 +157,17 @@ public static async Task PublishMessages(StreamSystem system, string stream, int
155157 await producer . Send ( Convert . ToUInt64 ( i ) , message ) ;
156158 }
157159
160+ testOutputHelper . WriteLine ( $ "Messages sent to the stream { stream } number of messages { numberOfMessages } ") ;
161+
158162 testPassed . Task . Wait ( TimeSpan . FromSeconds ( 10 ) ) ;
159163 Assert . Equal ( numberOfMessages , testPassed . Task . Result ) ;
160164 WaitUntil ( ( ) => producer . ConfirmFrames >= 1 ) ;
161165 WaitUntil ( ( ) => producer . IncomingFrames >= 1 ) ;
162166 WaitUntil ( ( ) => producer . PublishCommandsSent >= 1 ) ;
167+
168+ testOutputHelper . WriteLine (
169+ $ "Messages sent to the stream { stream } number of messages { numberOfMessages } " +
170+ $ "confirmed { producer . ConfirmFrames } incoming { producer . IncomingFrames } publish commands sent { producer . PublishCommandsSent } ") ;
163171 producer . Dispose ( ) ;
164172 }
165173
@@ -180,7 +188,8 @@ public static async Task<ConcurrentDictionary<string, IOffsetType>> OffsetsForSu
180188 public static async Task PublishMessagesSuperStream ( StreamSystem system , string stream , int numberOfMessages ,
181189 string producerName , ITestOutputHelper testOutputHelper )
182190 {
183- testOutputHelper . WriteLine ( "Publishing super stream messages..." ) ;
191+ testOutputHelper . WriteLine ( $ "Publishing super stream messages...to the stream { stream } " +
192+ $ "number of messages { numberOfMessages } ") ;
184193
185194 var testPassed = new TaskCompletionSource < int > ( ) ;
186195 var count = 0 ;
@@ -210,11 +219,16 @@ public static async Task PublishMessagesSuperStream(StreamSystem system, string
210219 await producer . Send ( Convert . ToUInt64 ( i ) , message ) ;
211220 }
212221
222+ testOutputHelper . WriteLine ( $ "Messages sent to the stream { stream } number of messages { numberOfMessages } ") ;
213223 testPassed . Task . Wait ( TimeSpan . FromSeconds ( 10 ) ) ;
214224 Assert . Equal ( numberOfMessages , testPassed . Task . Result ) ;
215225 Assert . True ( producer . ConfirmFrames >= 1 ) ;
216226 Assert . True ( producer . IncomingFrames >= 1 ) ;
217227 Assert . True ( producer . PublishCommandsSent >= 1 ) ;
228+
229+ testOutputHelper . WriteLine (
230+ $ "Messages sent to the stream { stream } number of messages { numberOfMessages } " +
231+ $ "confirmed { producer . ConfirmFrames } incoming { producer . IncomingFrames } publish commands sent { producer . PublishCommandsSent } ") ;
218232 producer . Dispose ( ) ;
219233 }
220234
@@ -373,18 +387,6 @@ public static void HttpDeleteQueue(string queue)
373387 }
374388 }
375389
376- private static void HttpDeleteExchange ( string exchange )
377- {
378- var task = CreateHttpClient ( ) . DeleteAsync ( $ "http://localhost:15672/api/exchanges/%2F/{ exchange } ") ;
379- task . Wait ( ) ;
380- var result = task . Result ;
381- if ( ! result . IsSuccessStatusCode && result . StatusCode != HttpStatusCode . NotFound )
382- {
383- throw new XunitException ( string . Format ( "HTTP DELETE failed: {0} {1}" , result . StatusCode ,
384- result . ReasonPhrase ) ) ;
385- }
386- }
387-
388390 public static byte [ ] GetFileContent ( string fileName )
389391 {
390392 var codeBaseUrl = new Uri ( Assembly . GetExecutingAssembly ( ) . Location ) ;
@@ -403,14 +405,40 @@ public static byte[] GetFileContent(string fileName)
403405
404406 public static void ResetSuperStreams ( )
405407 {
406- HttpDeleteExchange ( "invoices" ) ;
407- HttpDeleteQueue ( "invoices-0" ) ;
408- HttpDeleteQueue ( "invoices-1" ) ;
409- HttpDeleteQueue ( "invoices-2" ) ;
408+ var factory = new ConnectionFactory ( ) ;
409+ using var connection = factory . CreateConnection ( ) ;
410+ var channel = connection . CreateModel ( ) ;
411+
412+ channel . ExchangeDelete ( InvoicesExchange ) ;
413+
414+ channel . QueueDelete ( InvoicesStream0 ) ;
415+ channel . QueueDelete ( InvoicesStream1 ) ;
416+ channel . QueueDelete ( InvoicesStream2 ) ;
417+ Wait ( ) ;
418+
419+ channel . ExchangeDeclare ( InvoicesExchange , "direct" , true , false ,
420+ new Dictionary < string , object > ( ) { { "x-super-stream-enabled" , "true" } } ) ;
421+
422+ channel . QueueDeclare ( InvoicesStream0 , true , false , false ,
423+ new Dictionary < string , object > ( ) { { "x-queue-type" , "stream" } , } ) ;
424+
425+ channel . QueueDeclare ( InvoicesStream1 , true , false , false ,
426+ new Dictionary < string , object > ( ) { { "x-queue-type" , "stream" } , } ) ;
427+
428+ channel . QueueDeclare ( InvoicesStream2 , true , false , false ,
429+ new Dictionary < string , object > ( ) { { "x-queue-type" , "stream" } , } ) ;
410430 Wait ( ) ;
411- HttpPost (
412- Encoding . Default . GetString (
413- GetFileContent ( "definition_test.json" ) ) , "definitions" ) ;
431+
432+ channel . QueueBind ( InvoicesStream0 , InvoicesExchange , "0" ,
433+ new Dictionary < string , object > ( ) { { "x-stream-partition-order" , "0" } } ) ;
434+
435+ channel . QueueBind ( InvoicesStream1 , InvoicesExchange , "1" ,
436+ new Dictionary < string , object > ( ) { { "x-stream-partition-order" , "1" } } ) ;
437+
438+ channel . QueueBind ( InvoicesStream2 , InvoicesExchange , "2" ,
439+ new Dictionary < string , object > ( ) { { "x-stream-partition-order" , "2" } } ) ;
440+
441+ connection . Close ( ) ;
414442 }
415443 }
416444}
0 commit comments