1
1
<?php
2
2
3
3
use React \Stream \Stream ;
4
-
5
4
use React \Stream \ReadableStream ;
6
-
7
5
use Clue \React \Redis \Factory ;
8
-
9
6
use Clue \React \Redis \StreamingClient ;
7
+ use React \Promise \Deferred ;
8
+ use Clue \React \Block ;
10
9
11
10
class FunctionalTest extends TestCase
12
11
{
13
- protected static $ loop ;
14
- protected static $ factory ;
12
+ private $ loop ;
13
+ private $ factory ;
14
+ private $ client ;
15
15
16
- public static function setUpBeforeClass ()
16
+ public function setUp ()
17
17
{
18
- self ::$ loop = new React \EventLoop \StreamSelectLoop ();
19
- self ::$ factory = new Factory (self ::$ loop );
18
+ $ this ->loop = new React \EventLoop \StreamSelectLoop ();
19
+ $ this ->factory = new Factory ($ this ->loop );
20
+ $ this ->client = $ this ->createClient ();
20
21
}
21
22
22
23
public function testPing ()
23
24
{
24
- $ client = $ this ->createClient () ;
25
+ $ client = $ this ->client ;
25
26
26
27
$ promise = $ client ->ping ();
27
28
$ this ->assertInstanceOf ('React\Promise\PromiseInterface ' , $ promise );
@@ -36,7 +37,7 @@ public function testPing()
36
37
37
38
public function testMgetIsNotInterpretedAsSubMessage ()
38
39
{
39
- $ client = $ this ->createClient () ;
40
+ $ client = $ this ->client ;
40
41
41
42
$ client ->mset ('message ' , 'message ' , 'channel ' , 'channel ' , 'payload ' , 'payload ' );
42
43
@@ -46,14 +47,9 @@ public function testMgetIsNotInterpretedAsSubMessage()
46
47
$ this ->waitFor ($ client );
47
48
}
48
49
49
- /**
50
- *
51
- * @param StreamingClient $client
52
- * @depends testPing
53
- */
54
- public function testPipeline (StreamingClient $ client )
50
+ public function testPipeline ()
55
51
{
56
- $ this ->assertFalse ( $ client-> isBusy ()) ;
52
+ $ client = $ this ->client ;
57
53
58
54
$ client ->set ('a ' , 1 )->then ($ this ->expectCallableOnce ('OK ' ));
59
55
$ client ->incr ('a ' )->then ($ this ->expectCallableOnce (2 ));
@@ -63,46 +59,27 @@ public function testPipeline(StreamingClient $client)
63
59
$ this ->assertTrue ($ client ->isBusy ());
64
60
65
61
$ this ->waitFor ($ client );
66
-
67
- return $ client ;
68
62
}
69
63
70
- /**
71
- *
72
- * @param StreamingClient $client
73
- * @depends testPipeline
74
- */
75
- public function testInvalidCommand (StreamingClient $ client )
64
+ public function testInvalidCommand ()
76
65
{
77
- $ client ->doesnotexist (1 , 2 , 3 )->then ($ this ->expectCallableNever ());
78
-
79
- $ this ->waitFor ($ client );
66
+ $ this ->client ->doesnotexist (1 , 2 , 3 )->then ($ this ->expectCallableNever ());
80
67
81
- return $ client ;
68
+ $ this -> waitFor ( $ this -> client ) ;
82
69
}
83
70
84
- /**
85
- *
86
- * @param StreamingClient $client
87
- * @depends testInvalidCommand
88
- */
89
- public function testMultiExecEmpty (StreamingClient $ client )
71
+ public function testMultiExecEmpty ()
90
72
{
91
- $ client ->multi ()->then ($ this ->expectCallableOnce ('OK ' ));
92
- $ client ->exec ()->then ($ this ->expectCallableOnce (array ()));
93
-
94
- $ this ->waitFor ($ client );
73
+ $ this ->client ->multi ()->then ($ this ->expectCallableOnce ('OK ' ));
74
+ $ this ->client ->exec ()->then ($ this ->expectCallableOnce (array ()));
95
75
96
- return $ client ;
76
+ $ this -> waitFor ( $ this -> client ) ;
97
77
}
98
78
99
- /**
100
- *
101
- * @param StreamingClient $client
102
- * @depends testMultiExecEmpty
103
- */
104
- public function testMultiExecQueuedExecHasValues (StreamingClient $ client )
79
+ public function testMultiExecQueuedExecHasValues ()
105
80
{
81
+ $ client = $ this ->client ;
82
+
106
83
$ client ->multi ()->then ($ this ->expectCallableOnce ('OK ' ));
107
84
$ client ->set ('b ' , 10 )->then ($ this ->expectCallableOnce ('QUEUED ' ));
108
85
$ client ->expire ('b ' , 20 )->then ($ this ->expectCallableOnce ('QUEUED ' ));
@@ -111,17 +88,12 @@ public function testMultiExecQueuedExecHasValues(StreamingClient $client)
111
88
$ client ->exec ()->then ($ this ->expectCallableOnce (array ('OK ' , 1 , 12 , 20 )));
112
89
113
90
$ this ->waitFor ($ client );
114
-
115
- return $ client ;
116
91
}
117
92
118
- /**
119
- *
120
- * @param StreamingClient $client
121
- * @depends testPipeline
122
- */
123
- public function testMonitorPing (StreamingClient $ client )
93
+ public function testMonitorPing ()
124
94
{
95
+ $ client = $ this ->client ;
96
+
125
97
$ client ->on ('monitor ' , $ this ->expectCallableOnce ());
126
98
127
99
$ client ->monitor ()->then ($ this ->expectCallableOnce ('OK ' ));
@@ -132,29 +104,33 @@ public function testMonitorPing(StreamingClient $client)
132
104
133
105
public function testPubSub ()
134
106
{
135
- $ consumer = $ this ->createClient () ;
107
+ $ consumer = $ this ->client ;
136
108
$ producer = $ this ->createClient ();
137
109
138
- $ that = $ this ;
110
+ $ channel = ' channel:test: ' . mt_rand () ;
139
111
140
- $ producer ->publish ('channel:test ' , 'nobody sees this ' )->then ($ this ->expectCallableOnce (0 ));
112
+ // consumer receives a single message
113
+ $ deferred = new Deferred ();
114
+ $ consumer ->on ('message ' , $ this ->expectCallableOnce ());
115
+ $ consumer ->on ('message ' , array ($ deferred , 'resolve ' ));
116
+ $ consumer ->subscribe ($ channel )->then ($ this ->expectCallableOnce ());
117
+ $ this ->waitFor ($ consumer );
141
118
119
+ // producer sends a single message
120
+ $ producer ->publish ($ channel , 'hello world ' )->then ($ this ->expectCallableOnce ());
142
121
$ this ->waitFor ($ producer );
143
122
144
- $ consumer ->subscribe ('channel:test ' )->then (function () {
145
- // ?
146
- });
123
+ // expect "message" event to take no longer than 0.1s
124
+ Block \await ($ deferred ->promise (), $ this ->loop , 0.1 );
147
125
}
148
126
149
127
public function testClose ()
150
128
{
151
- $ client = $ this ->createClient ( );
129
+ $ this -> client -> get ( ' willBeCanceledAnyway ' )-> then ( null , $ this ->expectCallableOnce () );
152
130
153
- $ client -> get ( ' willBeCanceledAnyway ' )-> then ( null , $ this -> expectCallableOnce () );
131
+ $ this -> client -> close ( );
154
132
155
- $ client ->close ();
156
-
157
- $ client ->get ('willBeRejectedRightAway ' )->then (null , $ this ->expectCallableOnce ());
133
+ $ this ->client ->get ('willBeRejectedRightAway ' )->then (null , $ this ->expectCallableOnce ());
158
134
}
159
135
160
136
public function testInvalidProtocol ()
@@ -186,24 +162,7 @@ public function testInvalidServerRepliesWithDuplicateMessages()
186
162
*/
187
163
protected function createClient ()
188
164
{
189
- $ client = null ;
190
- $ exception = null ;
191
-
192
- self ::$ factory ->createClient ()->then (function ($ c ) use (&$ client ) {
193
- $ client = $ c ;
194
- }, function ($ error ) use (&$ exception ) {
195
- $ exception = $ error ;
196
- });
197
-
198
- while ($ client === null && $ exception === null ) {
199
- self ::$ loop ->tick ();
200
- }
201
-
202
- if ($ exception !== null ) {
203
- throw $ exception ;
204
- }
205
-
206
- return $ client ;
165
+ return Block \await ($ this ->factory ->createClient (), $ this ->loop );
207
166
}
208
167
209
168
protected function createClientResponse ($ response )
@@ -212,7 +171,7 @@ protected function createClientResponse($response)
212
171
fwrite ($ fp , $ response );
213
172
fseek ($ fp , 0 );
214
173
215
- $ stream = new Stream ($ fp , self :: $ loop );
174
+ $ stream = new Stream ($ fp , $ this -> loop );
216
175
217
176
return new StreamingClient ($ stream );
218
177
}
@@ -229,7 +188,7 @@ protected function waitFor(StreamingClient $client)
229
188
$ this ->assertTrue ($ client ->isBusy ());
230
189
231
190
while ($ client ->isBusy ()) {
232
- self :: $ loop ->tick ();
191
+ $ this -> loop ->tick ();
233
192
}
234
193
}
235
194
}
0 commit comments