Skip to content

Commit b539eec

Browse files
committed
Skeleton for forwarding pub/sub events
1 parent 3ec3c01 commit b539eec

File tree

3 files changed

+50
-0
lines changed

3 files changed

+50
-0
lines changed

src/Client.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@
1111
*
1212
* @event data(ModelInterface $messageModel, Client $thisClient)
1313
* @event close()
14+
*
15+
* @event message($channel, $message)
16+
* @event subscribe($channel, $numberOfChannels)
17+
* @event unsubscribe($channel, $numberOfChannels)
18+
*
19+
* @event pmessage($pattern, $channel, $message)
20+
* @event psubscribe($channel, $numberOfChannels)
21+
* @event punsubscribe($channel, $numberOfChannels)
1422
*/
1523
interface Client extends EventEmitterInterface
1624
{

src/StreamingClient.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use React\Promise\Deferred;
1515
use Clue\Redis\Protocol\Model\ErrorReply;
1616
use Clue\Redis\Protocol\Model\ModelInterface;
17+
use Clue\Redis\Protocol\Model\MultiBulkReply;
1718

1819
class StreamingClient extends EventEmitter implements Client
1920
{
@@ -24,6 +25,9 @@ class StreamingClient extends EventEmitter implements Client
2425
private $ending = false;
2526
private $closed = false;
2627

28+
private $subscribed = 0;
29+
private $psubscribed = 0;
30+
2731
public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
2832
{
2933
if ($parser === null || $serializer === null) {
@@ -85,6 +89,21 @@ public function handleMessage(ModelInterface $message)
8589
{
8690
$this->emit('data', array($message, $this));
8791

92+
if (/*($this->subscribed !== 0 || $this->psubscribed !== 0) &&*/ $message instanceof MultiBulkReply) {
93+
$array = $message->getValueNative();
94+
$first = array_shift($array);
95+
96+
// pub/sub events are to be forwarded
97+
if (in_array($first, array('message', 'subscribe', 'unsubscribe', 'pmessage', 'psubscribe', 'punsubscribe'))) {
98+
$this->emit($first, $array);
99+
}
100+
101+
// pub/sub message events should not be processed as request responses
102+
if (in_array($first, array('message', 'pmessage'))) {
103+
return;
104+
}
105+
}
106+
88107
if (!$this->requests) {
89108
throw new UnderflowException('Unexpected reply received, no matching request found');
90109
}

tests/StreamingClientTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use Clue\Redis\Protocol\Model\IntegerReply;
66
use Clue\Redis\Protocol\Model\BulkReply;
77
use Clue\Redis\Protocol\Model\ErrorReply;
8+
use Clue\Redis\Protocol\Model\MultiBulkReply;
9+
use Clue\React\Redis\Client;
810

911
class ClientTest extends TestCase
1012
{
@@ -163,4 +165,25 @@ public function testReceivingUnexpectedMessageThrowsException()
163165
$this->setExpectedException('UnderflowException');
164166
$this->client->handleMessage(new BulkReply('PONG'));
165167
}
168+
169+
public function testPubsubSubscribe()
170+
{
171+
$promise = $this->client->subscribe('test');
172+
$this->expectPromiseResolve($promise);
173+
174+
$this->client->on('subscribe', $this->expectCallableOnce());
175+
$this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('test'), new IntegerReply(1))));
176+
177+
return $this->client;
178+
}
179+
180+
/**
181+
* @depends testPubsubSubscribe
182+
* @param Client $client
183+
*/
184+
public function testPubsubMessage(Client $client)
185+
{
186+
$client->on('message', $this->expectCallableOnce());
187+
$client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload'))));
188+
}
166189
}

0 commit comments

Comments
 (0)