|
1 | 1 | <?php
|
2 | 2 |
|
3 | 3 | use React\Stream\Stream;
|
4 |
| - |
5 | 4 | use React\Stream\ReadableStream;
|
6 |
| - |
7 | 5 | use Clue\React\Redis\Factory;
|
8 |
| - |
9 | 6 | use Clue\React\Redis\StreamingClient;
|
| 7 | +use React\Promise\Deferred; |
| 8 | +use Clue\React\Block; |
10 | 9 |
|
11 | 10 | class FunctionalTest extends TestCase
|
12 | 11 | {
|
@@ -135,15 +134,21 @@ public function testPubSub()
|
135 | 134 | $consumer = $this->createClient();
|
136 | 135 | $producer = $this->createClient();
|
137 | 136 |
|
138 |
| - $that = $this; |
| 137 | + $channel = 'channel:test:' . mt_rand(); |
139 | 138 |
|
140 |
| - $producer->publish('channel:test', 'nobody sees this')->then($this->expectCallableOnce(0)); |
| 139 | + // consumer receives a single message |
| 140 | + $deferred = new Deferred(); |
| 141 | + $consumer->on('message', $this->expectCallableOnce()); |
| 142 | + $consumer->on('message', array($deferred, 'resolve')); |
| 143 | + $consumer->subscribe($channel)->then($this->expectCallableOnce()); |
| 144 | + $this->waitFor($consumer); |
141 | 145 |
|
| 146 | + // producer sends a single message |
| 147 | + $producer->publish($channel, 'hello world')->then($this->expectCallableOnce()); |
142 | 148 | $this->waitFor($producer);
|
143 | 149 |
|
144 |
| - $consumer->subscribe('channel:test')->then(function () { |
145 |
| - // ? |
146 |
| - }); |
| 150 | + // expect "message" event to take no longer than 0.1s |
| 151 | + Block\await($deferred->promise(), self::$loop, 0.1); |
147 | 152 | }
|
148 | 153 |
|
149 | 154 | public function testClose()
|
@@ -186,24 +191,7 @@ public function testInvalidServerRepliesWithDuplicateMessages()
|
186 | 191 | */
|
187 | 192 | protected function createClient()
|
188 | 193 | {
|
189 |
| - $client = null; |
190 |
| - $exception = null; |
191 |
| - |
192 |
| - self::$factory->createClient()->then(function ($c) use (&$client) { |
193 |
| - $client = $c; |
194 |
| - }, function($error) use (&$exception) { |
195 |
| - $exception = $error; |
196 |
| - }); |
197 |
| - |
198 |
| - while ($client === null && $exception === null) { |
199 |
| - self::$loop->tick(); |
200 |
| - } |
201 |
| - |
202 |
| - if ($exception !== null) { |
203 |
| - throw $exception; |
204 |
| - } |
205 |
| - |
206 |
| - return $client; |
| 194 | + return Block\await(self::$factory->createClient(), self::$loop); |
207 | 195 | }
|
208 | 196 |
|
209 | 197 | protected function createClientResponse($response)
|
|
0 commit comments