1
+ // This source code is dual-licensed under the Apache License, version
2
+ // 2.0, and the Mozilla Public License, version 1.1.
3
+ //
4
+ // The APL v2.0:
5
+ //
6
+ //---------------------------------------------------------------------------
7
+ // Copyright (C) 2007-2013 VMware, Inc.
8
+ //
9
+ // Licensed under the Apache License, Version 2.0 (the "License");
10
+ // you may not use this file except in compliance with the License.
11
+ // You may obtain a copy of the License at
12
+ //
13
+ // http://www.apache.org/licenses/LICENSE-2.0
14
+ //
15
+ // Unless required by applicable law or agreed to in writing, software
16
+ // distributed under the License is distributed on an "AS IS" BASIS,
17
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18
+ // See the License for the specific language governing permissions and
19
+ // limitations under the License.
20
+ //---------------------------------------------------------------------------
21
+ //
22
+ // The MPL v1.1:
23
+ //
24
+ //---------------------------------------------------------------------------
25
+ // The contents of this file are subject to the Mozilla Public License
26
+ // Version 1.1 (the "License"); you may not use this file except in
27
+ // compliance with the License. You may obtain a copy of the License
28
+ // at http://www.mozilla.org/MPL/
29
+ //
30
+ // Software distributed under the License is distributed on an "AS IS"
31
+ // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32
+ // the License for the specific language governing rights and
33
+ // limitations under the License.
34
+ //
35
+ // The Original Code is RabbitMQ.
36
+ //
37
+ // The Initial Developer of the Original Code is VMware, Inc.
38
+ // Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
39
+ //---------------------------------------------------------------------------
40
+
41
+ using NUnit . Framework ;
42
+
43
+ using System ;
44
+ using System . IO ;
45
+ using System . Text ;
46
+ using System . Collections ;
47
+ using System . Threading ;
48
+ using System . Diagnostics ;
49
+
50
+ using RabbitMQ . Client ;
51
+ using RabbitMQ . Client . Impl ;
52
+ using RabbitMQ . Util ;
53
+ using RabbitMQ . Client . Events ;
54
+
55
+ namespace RabbitMQ . Client . Unit {
56
+ [ TestFixture ]
57
+ public class TestConnectionBlocked {
58
+
59
+ Object lockObject = new Object ( ) ;
60
+ UTF8Encoding enc = new UTF8Encoding ( ) ;
61
+ bool notified = false ;
62
+
63
+ [ Test ]
64
+ public void TestConnectionBlockedNotification ( )
65
+ {
66
+ ConnectionFactory cf = new ConnectionFactory ( ) ;
67
+ IConnection conn = cf . CreateConnection ( ) ;
68
+
69
+ conn . ConnectionBlocked += new ConnectionBlockedEventHandler ( HandleBlocked ) ;
70
+ conn . ConnectionUnblocked += new ConnectionUnblockedEventHandler ( HandleUnblocked ) ;
71
+
72
+ Block ( ) ;
73
+ Publish ( conn ) ;
74
+ Monitor . Wait ( TimeSpan . FromSeconds ( 10 ) ) ;
75
+
76
+ Assert . IsTrue ( notified ) ;
77
+ }
78
+
79
+ public void HandleBlocked ( IConnection sender , ConnectionBlockedEventArgs args )
80
+ {
81
+ Unblock ( ) ;
82
+ }
83
+
84
+
85
+ public void HandleUnblocked ( IConnection sender )
86
+ {
87
+ notified = true ;
88
+ Monitor . PulseAll ( lockObject ) ;
89
+ }
90
+
91
+
92
+ protected void Block ( )
93
+ {
94
+ ExecRabbitMQCtl ( "set_vm_memory_high_watermark 0.000000001" ) ;
95
+ }
96
+
97
+ protected void Unblock ( )
98
+ {
99
+ ExecRabbitMQCtl ( "set_vm_memory_high_watermark 0.4" ) ;
100
+ }
101
+
102
+ protected void ExecRabbitMQCtl ( string command )
103
+ {
104
+ if ( IsRunningOnMono ( ) ) {
105
+ ExecCommand ( "../rabbitmq-server/scripts/rabbitmqctl " + command ) ;
106
+ } else {
107
+ ExecCommand ( "..\\ rabbitmq-server\\ scripts\\ rabbitmqctl " + command ) ;
108
+ }
109
+ }
110
+
111
+ protected void ExecCommand ( string command )
112
+ {
113
+ Process proc = new Process ( ) ;
114
+ proc . StartInfo . CreateNoWindow = true ;
115
+
116
+ string sh ;
117
+ string args ;
118
+ if ( IsRunningOnMono ( ) ) {
119
+ sh = "/bin/sh" ;
120
+ args = "-c " + command ;
121
+ } else {
122
+ sh = "C:\\ winnt\\ system32\\ cmd.exe" ;
123
+ args = "/y /c " + command ;
124
+ }
125
+
126
+ try {
127
+ proc . StartInfo . FileName = sh ;
128
+ proc . StartInfo . Arguments = args ;
129
+
130
+ proc . Start ( ) ;
131
+ } catch ( Exception e ) {
132
+ Console . WriteLine ( "Failed to run subprocess with args " + args + " : " + e . Message ) ;
133
+ }
134
+ }
135
+
136
+ public static bool IsRunningOnMono ( )
137
+ {
138
+ return Type . GetType ( "Mono.Runtime" ) != null ;
139
+ }
140
+
141
+ protected void Publish ( IConnection conn )
142
+ {
143
+ IModel ch = conn . CreateModel ( ) ;
144
+ ch . BasicPublish ( "" , "amq.fanout" , null , enc . GetBytes ( "message" ) ) ;
145
+ }
146
+ }
147
+ }
0 commit comments