1
+ // This source code is dual-licensed under the Apache License, version
2
+ // 2.0, and the Mozilla Public License, version 1.1.
3
+ //
4
+ // The APL v2.0:
5
+ //
6
+ //---------------------------------------------------------------------------
7
+ // Copyright (c) 2007-2016 Pivotal Software, Inc.
8
+ //
9
+ // Licensed under the Apache License, Version 2.0 (the "License");
10
+ // you may not use this file except in compliance with the License.
11
+ // You may obtain a copy of the License at
12
+ //
13
+ // http://www.apache.org/licenses/LICENSE-2.0
14
+ //
15
+ // Unless required by applicable law or agreed to in writing, software
16
+ // distributed under the License is distributed on an "AS IS" BASIS,
17
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18
+ // See the License for the specific language governing permissions and
19
+ // limitations under the License.
20
+ //---------------------------------------------------------------------------
21
+ //
22
+ // The MPL v1.1:
23
+ //
24
+ //---------------------------------------------------------------------------
25
+ // The contents of this file are subject to the Mozilla Public License
26
+ // Version 1.1 (the "License"); you may not use this file except in
27
+ // compliance with the License. You may obtain a copy of the License
28
+ // at http://www.mozilla.org/MPL/
29
+ //
30
+ // Software distributed under the License is distributed on an "AS IS"
31
+ // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32
+ // the License for the specific language governing rights and
33
+ // limitations under the License.
34
+ //
35
+ // The Original Code is RabbitMQ.
36
+ //
37
+ // The Initial Developer of the Original Code is Pivotal Software, Inc.
38
+ // Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
39
+ //---------------------------------------------------------------------------
40
+
41
+ using NUnit . Framework ;
42
+
43
+ using System ;
44
+ using System . IO ;
45
+ using System . Text ;
46
+ using System . Collections ;
47
+
48
+ using RabbitMQ . Client ;
49
+ using RabbitMQ . Client . Events ;
50
+ using RabbitMQ . Client . Impl ;
51
+ using RabbitMQ . Client . Exceptions ;
52
+ using RabbitMQ . Util ;
53
+ using System . Threading ;
54
+ using System . Threading . Tasks ;
55
+
56
+ namespace RabbitMQ . Client . Unit
57
+ {
58
+ [ TestFixture ]
59
+ public class TestAsyncConsumer
60
+ {
61
+
62
+ [ Test ]
63
+ public async Task TestBasicRoundtrip ( )
64
+ {
65
+ var cf = new ConnectionFactory { DispatchConsumersAsync = true } ;
66
+ using ( var c = cf . CreateConnection ( ) )
67
+ using ( var m = c . CreateModel ( ) )
68
+ {
69
+ var q = m . QueueDeclare ( ) ;
70
+ var bp = m . CreateBasicProperties ( ) ;
71
+ var body = System . Text . Encoding . UTF8 . GetBytes ( "async-hi" ) ;
72
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
73
+ var consumer = new AsyncEventingBasicConsumer ( m ) ;
74
+ var sem = new SemaphoreSlim ( 0 ) ;
75
+ consumer . Received += async ( o , a ) =>
76
+ {
77
+ sem . Release ( ) ;
78
+ await Task . FromResult ( 0 ) ;
79
+ } ;
80
+ var tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
81
+ // ensure we get a delivery
82
+ var waitRes = await sem . WaitAsync ( 2000 ) ;
83
+ Assert . IsTrue ( waitRes ) ;
84
+ // unsubscribe and ensure no further deliveries
85
+ m . BasicCancel ( tag ) ;
86
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
87
+ var waitResFalse = await sem . WaitAsync ( 100 ) ;
88
+ Assert . IsFalse ( waitResFalse ) ;
89
+ }
90
+ }
91
+
92
+ [ Test ]
93
+ public void NonAsyncConsumerShouldThrowInvalidOperationException ( )
94
+ {
95
+ var cf = new ConnectionFactory { DispatchConsumersAsync = true } ;
96
+ using ( var c = cf . CreateConnection ( ) )
97
+ using ( var m = c . CreateModel ( ) )
98
+ {
99
+ var q = m . QueueDeclare ( ) ;
100
+ var bp = m . CreateBasicProperties ( ) ;
101
+ var body = System . Text . Encoding . UTF8 . GetBytes ( "async-hi" ) ;
102
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
103
+ var consumer = new EventingBasicConsumer ( m ) ;
104
+ Assert . Throws < InvalidOperationException > ( ( ) => m . BasicConsume ( q . QueueName , false , consumer ) ) ;
105
+ }
106
+ }
107
+ }
108
+ }
0 commit comments