Skip to content

Commit 893aa25

Browse files
committed
Keep track of (p)subscribe responses to toggle PubSub state
The (p)(un)subscribe commands only accept a single topic name for now.
1 parent 7fcb65b commit 893aa25

File tree

3 files changed

+49
-25
lines changed

3 files changed

+49
-25
lines changed

src/StreamingClient.php

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Clue\Redis\Protocol\Factory as ProtocolFactory;
1212
use UnderflowException;
1313
use RuntimeException;
14+
use InvalidArgumentException;
1415
use React\Promise\Deferred;
1516
use Clue\Redis\Protocol\Model\ErrorReply;
1617
use Clue\Redis\Protocol\Model\ModelInterface;
@@ -77,18 +78,43 @@ public function __call($name, $args)
7778
{
7879
$request = new Deferred();
7980

81+
$name = strtolower($name);
82+
83+
// special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied
84+
static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
85+
8086
if ($this->ending) {
8187
$request->reject(new RuntimeException('Connection closed'));
88+
} elseif (count($args) !== 1 && in_array($name, $pubsubs)) {
89+
$request->reject(new InvalidArgumentException('PubSub commands limited to single argument'));
8290
} else {
8391
$this->stream->write($this->serializer->getRequestMessage($name, $args));
8492
$this->requests []= $request;
8593
}
8694

87-
if (strtolower($name) === 'monitor') {
95+
if ($name === 'monitor') {
8896
$monitoring =& $this->monitoring;
8997
$request->then(function () use (&$monitoring) {
9098
$monitoring = true;
9199
});
100+
} elseif (in_array($name, $pubsubs)) {
101+
$that = $this;
102+
$subscribed =& $this->subscribed;
103+
$psubscribed =& $this->psubscribed;
104+
105+
$request->then(function ($array) use ($that, &$subscribed, &$psubscribed) {
106+
$first = array_shift($array);
107+
108+
// (p)(un)subscribe messages are to be forwarded
109+
$that->emit($first, $array);
110+
111+
// remember number of (p)subscribe topics
112+
if ($first === 'subscribe' || $first === 'unsubscribe') {
113+
$subscribed = $array[1];
114+
} else {
115+
$psubscribed = $array[1];
116+
}
117+
});
92118
}
93119

94120
return $request->promise();
@@ -103,17 +129,13 @@ public function handleMessage(ModelInterface $message)
103129
return;
104130
}
105131

106-
if (/*($this->subscribed !== 0 || $this->psubscribed !== 0) &&*/ $message instanceof MultiBulkReply) {
132+
if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) {
107133
$array = $message->getValueNative();
108134
$first = array_shift($array);
109135

110-
// pub/sub events are to be forwarded
111-
if (in_array($first, array('message', 'subscribe', 'unsubscribe', 'pmessage', 'psubscribe', 'punsubscribe'))) {
112-
$this->emit($first, $array);
113-
}
114-
115-
// pub/sub message events should not be processed as request responses
136+
// pub/sub messages are to be forwarded and should not be processed as request responses
116137
if (in_array($first, array('message', 'pmessage'))) {
138+
$this->emit($first, $array);
117139
return;
118140
}
119141
}

tests/FunctionalTest.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ public function testPing()
3636

3737
public function testMgetIsNotInterpretedAsSubMessage()
3838
{
39-
$this->markTestIncomplete();
40-
4139
$client = $this->createClient();
4240

4341
$client->mset('message', 'message', 'channel', 'channel', 'payload', 'payload');

tests/StreamingClientTest.php

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -228,27 +228,31 @@ public function testPubsubSubscribe()
228228
* @depends testPubsubSubscribe
229229
* @param Client $client
230230
*/
231+
public function testPubsubPatternSubscribe(Client $client)
232+
{
233+
$promise = $client->psubscribe('demo_*');
234+
$this->expectPromiseResolve($promise);
235+
236+
$client->on('psubscribe', $this->expectCallableOnce());
237+
$client->handleMessage(new MultiBulkReply(array(new BulkReply('psubscribe'), new BulkReply('demo_*'), new IntegerReply(1))));
238+
239+
return $client;
240+
}
241+
242+
/**
243+
* @depends testPubsubPatternSubscribe
244+
* @param Client $client
245+
*/
231246
public function testPubsubMessage(Client $client)
232247
{
233248
$client->on('message', $this->expectCallableOnce());
234249
$client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload'))));
235250
}
236251

237-
public function testPubsubSubscribeMultiple()
252+
public function testPubsubSubscribeSingleOnly()
238253
{
239-
$this->markTestIncomplete();
240-
241-
$promise = $this->client->subscribe('first', 'second');
242-
$this->expectPromiseResolve($promise);
243-
244-
// expect two "subscribe" events
245-
$mock = $this->createCallableMock();
246-
$mock->expects($this->exactly(2))->method('__invoke');
247-
$this->client->on('subscribe', $mock);
248-
249-
$this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('first'), new IntegerReply(1))));
250-
$this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('second'), new IntegerReply(2))));
251-
252-
return $this->client;
254+
$this->expectPromiseReject($this->client->subscribe('a', 'b'));
255+
$this->expectPromiseReject($this->client->unsubscribe('a', 'b'));
256+
$this->expectPromiseReject($this->client->unsubscribe());
253257
}
254258
}

0 commit comments

Comments
 (0)