@@ -19,15 +19,14 @@ public async Task ToAmqp091()
1919 {
2020 Assert . NotNull ( _connection ) ;
2121 Assert . NotNull ( _management ) ;
22- _queueName = "q" ;
2322
2423 IQueueSpecification queueSpec = _management . Queue ( ) . Name ( _queueName ) . Type ( QueueType . QUORUM ) ;
2524 await queueSpec . DeclareAsync ( ) ;
2625
2726 var publisher = await _connection . PublisherBuilder ( ) . BuildAsync ( ) ;
28- byte [ ] body = System . Text . Encoding . UTF8 . GetBytes ( "{Text:as,Seq:1,Max:7000}" ) ;
27+ const string body = "{Text:as,Seq:1,Max:7000}" ;
2928 IMessage amqpMessage = new AmqpMessage ( body ) . ToAddress ( ) . Queue ( _queueName ) . Build ( ) ;
30- for ( int i = 0 ; i < 1 ; i ++ )
29+ for ( int i = 0 ; i < 0 ; i ++ )
3130 {
3231 PublishResult result = await publisher . PublishAsync ( message : amqpMessage ) . ConfigureAwait ( true ) ;
3332 Assert . NotNull ( result ) ;
@@ -51,6 +50,45 @@ public async Task ToAmqp091()
5150 Assert . Equal ( "consumerTag" , receivedMessage091 . ConsumerTag ) ;
5251 Assert . Equal ( "{Text:as,Seq:1,Max:7000}" ,
5352 System . Text . Encoding . UTF8 . GetString ( receivedMessage091 . Body . ToArray ( ) ) ) ;
53+ channel . Close ( ) ;
54+ connection . Close ( ) ;
55+ }
56+
57+ [ Fact ]
58+ public async Task FromAmqp091 ( )
59+ {
60+ Assert . NotNull ( _connection ) ;
61+ Assert . NotNull ( _management ) ;
62+
63+ IQueueSpecification queueSpec = _management . Queue ( ) . Name ( _queueName ) . Type ( QueueType . QUORUM ) ;
64+ await queueSpec . DeclareAsync ( ) ;
65+
66+ // publish a message with AMQP 0-9-1
67+ var factory = new ConnectionFactory ( ) ;
68+ var connection = factory . CreateConnection ( ) ;
69+ var channel = connection . CreateModel ( ) ;
70+ channel . BasicPublish (
71+ exchange : "" ,
72+ routingKey : _queueName ,
73+ basicProperties : null ,
74+ body : System . Text . Encoding . UTF8 . GetBytes ( "{Text:as,Seq:1,Max:7000}" ) ) ;
75+
76+ TaskCompletionSource < IMessage > tcs = new ( ) ;
77+ IConsumer consumer = await _connection . ConsumerBuilder ( )
78+ . Queue ( _queueName )
79+ . MessageHandler ( ( context , message ) =>
80+ {
81+ tcs . SetResult ( message ) ;
82+ context . Accept ( ) ;
83+ return Task . CompletedTask ;
84+ } ) . BuildAndStartAsync ( ) ;
85+
86+ var receivedMessage = await tcs . Task ;
87+ Assert . NotNull ( receivedMessage ) ;
88+ Assert . Equal ( "{Text:as,Seq:1,Max:7000}" , receivedMessage . BodyAsString ( ) ) ;
89+ channel . Close ( ) ;
90+ connection . Close ( ) ;
91+ await consumer . CloseAsync ( ) ;
5492 }
5593 }
5694}
0 commit comments