Skip to content

Commit 277b540

Browse files
committed
Merge pull request clue#24 from clue/pubsub
Support Pub/Sub
2 parents 165649c + 1893c50 commit 277b540

File tree

4 files changed

+106
-1
lines changed

4 files changed

+106
-1
lines changed

src/Client.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@
1212
* @event data(ModelInterface $messageModel, Client $thisClient)
1313
* @event close()
1414
*
15+
* @event message($channel, $message)
16+
* @event subscribe($channel, $numberOfChannels)
17+
* @event unsubscribe($channel, $numberOfChannels)
18+
*
19+
* @event pmessage($pattern, $channel, $message)
20+
* @event psubscribe($channel, $numberOfChannels)
21+
* @event punsubscribe($channel, $numberOfChannels)
22+
*
1523
* @event monitor(ModelInterface $statusModel)
1624
*/
1725
interface Client extends EventEmitterInterface

src/StreamingClient.php

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
use Clue\Redis\Protocol\Factory as ProtocolFactory;
1212
use UnderflowException;
1313
use RuntimeException;
14+
use InvalidArgumentException;
1415
use React\Promise\Deferred;
1516
use Clue\Redis\Protocol\Model\ErrorReply;
1617
use Clue\Redis\Protocol\Model\ModelInterface;
18+
use Clue\Redis\Protocol\Model\MultiBulkReply;
1719
use Clue\Redis\Protocol\Model\StatusReply;
1820

1921
class StreamingClient extends EventEmitter implements Client
@@ -25,6 +27,8 @@ class StreamingClient extends EventEmitter implements Client
2527
private $ending = false;
2628
private $closed = false;
2729

30+
private $subscribed = 0;
31+
private $psubscribed = 0;
2832
private $monitoring = false;
2933

3034
public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
@@ -74,18 +78,43 @@ public function __call($name, $args)
7478
{
7579
$request = new Deferred();
7680

81+
$name = strtolower($name);
82+
83+
// special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied
84+
static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
85+
7786
if ($this->ending) {
7887
$request->reject(new RuntimeException('Connection closed'));
88+
} elseif (count($args) !== 1 && in_array($name, $pubsubs)) {
89+
$request->reject(new InvalidArgumentException('PubSub commands limited to single argument'));
7990
} else {
8091
$this->stream->write($this->serializer->getRequestMessage($name, $args));
8192
$this->requests []= $request;
8293
}
8394

84-
if (strtolower($name) === 'monitor') {
95+
if ($name === 'monitor') {
8596
$monitoring =& $this->monitoring;
8697
$request->then(function () use (&$monitoring) {
8798
$monitoring = true;
8899
});
100+
} elseif (in_array($name, $pubsubs)) {
101+
$that = $this;
102+
$subscribed =& $this->subscribed;
103+
$psubscribed =& $this->psubscribed;
104+
105+
$request->then(function ($array) use ($that, &$subscribed, &$psubscribed) {
106+
$first = array_shift($array);
107+
108+
// (p)(un)subscribe messages are to be forwarded
109+
$that->emit($first, $array);
110+
111+
// remember number of (p)subscribe topics
112+
if ($first === 'subscribe' || $first === 'unsubscribe') {
113+
$subscribed = $array[1];
114+
} else {
115+
$psubscribed = $array[1];
116+
}
117+
});
89118
}
90119

91120
return $request->promise();
@@ -100,6 +129,17 @@ public function handleMessage(ModelInterface $message)
100129
return;
101130
}
102131

132+
if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) {
133+
$array = $message->getValueNative();
134+
$first = array_shift($array);
135+
136+
// pub/sub messages are to be forwarded and should not be processed as request responses
137+
if (in_array($first, array('message', 'pmessage'))) {
138+
$this->emit($first, $array);
139+
return;
140+
}
141+
}
142+
103143
if (!$this->requests) {
104144
throw new UnderflowException('Unexpected reply received, no matching request found');
105145
}

tests/FunctionalTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ public function testPing()
3434
return $client;
3535
}
3636

37+
public function testMgetIsNotInterpretedAsSubMessage()
38+
{
39+
$client = $this->createClient();
40+
41+
$client->mset('message', 'message', 'channel', 'channel', 'payload', 'payload');
42+
43+
$client->mget('message', 'channel', 'payload')->then($this->expectCallableOnce());
44+
$client->on('message', $this->expectCallableNever());
45+
46+
$this->waitFor($client);
47+
}
48+
3749
/**
3850
*
3951
* @param StreamingClient $client

tests/StreamingClientTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use Clue\Redis\Protocol\Model\IntegerReply;
66
use Clue\Redis\Protocol\Model\BulkReply;
77
use Clue\Redis\Protocol\Model\ErrorReply;
8+
use Clue\Redis\Protocol\Model\MultiBulkReply;
9+
use Clue\React\Redis\Client;
810
use Clue\Redis\Protocol\Model\StatusReply;
911

1012
class ClientTest extends TestCase
@@ -210,4 +212,47 @@ public function testReceivingUnexpectedMessageThrowsException()
210212
$this->setExpectedException('UnderflowException');
211213
$this->client->handleMessage(new BulkReply('PONG'));
212214
}
215+
216+
public function testPubsubSubscribe()
217+
{
218+
$promise = $this->client->subscribe('test');
219+
$this->expectPromiseResolve($promise);
220+
221+
$this->client->on('subscribe', $this->expectCallableOnce());
222+
$this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('test'), new IntegerReply(1))));
223+
224+
return $this->client;
225+
}
226+
227+
/**
228+
* @depends testPubsubSubscribe
229+
* @param Client $client
230+
*/
231+
public function testPubsubPatternSubscribe(Client $client)
232+
{
233+
$promise = $client->psubscribe('demo_*');
234+
$this->expectPromiseResolve($promise);
235+
236+
$client->on('psubscribe', $this->expectCallableOnce());
237+
$client->handleMessage(new MultiBulkReply(array(new BulkReply('psubscribe'), new BulkReply('demo_*'), new IntegerReply(1))));
238+
239+
return $client;
240+
}
241+
242+
/**
243+
* @depends testPubsubPatternSubscribe
244+
* @param Client $client
245+
*/
246+
public function testPubsubMessage(Client $client)
247+
{
248+
$client->on('message', $this->expectCallableOnce());
249+
$client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload'))));
250+
}
251+
252+
public function testPubsubSubscribeSingleOnly()
253+
{
254+
$this->expectPromiseReject($this->client->subscribe('a', 'b'));
255+
$this->expectPromiseReject($this->client->unsubscribe('a', 'b'));
256+
$this->expectPromiseReject($this->client->unsubscribe());
257+
}
213258
}

0 commit comments

Comments
 (0)