1111use Clue \Redis \Protocol \Factory as ProtocolFactory ;
1212use UnderflowException ;
1313use RuntimeException ;
14+ use InvalidArgumentException ;
1415use React \Promise \Deferred ;
1516use Clue \Redis \Protocol \Model \ErrorReply ;
1617use Clue \Redis \Protocol \Model \ModelInterface ;
18+ use Clue \Redis \Protocol \Model \MultiBulkReply ;
1719use Clue \Redis \Protocol \Model \StatusReply ;
1820
1921class StreamingClient extends EventEmitter implements Client
@@ -25,6 +27,8 @@ class StreamingClient extends EventEmitter implements Client
2527 private $ ending = false ;
2628 private $ closed = false ;
2729
30+ private $ subscribed = 0 ;
31+ private $ psubscribed = 0 ;
2832 private $ monitoring = false ;
2933
3034 public function __construct (Stream $ stream , ParserInterface $ parser = null , SerializerInterface $ serializer = null )
@@ -74,18 +78,43 @@ public function __call($name, $args)
7478 {
7579 $ request = new Deferred ();
7680
81+ $ name = strtolower ($ name );
82+
83+ // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied
84+ static $ pubsubs = array ('subscribe ' , 'unsubscribe ' , 'psubscribe ' , 'punsubscribe ' );
85+
7786 if ($ this ->ending ) {
7887 $ request ->reject (new RuntimeException ('Connection closed ' ));
88+ } elseif (count ($ args ) !== 1 && in_array ($ name , $ pubsubs )) {
89+ $ request ->reject (new InvalidArgumentException ('PubSub commands limited to single argument ' ));
7990 } else {
8091 $ this ->stream ->write ($ this ->serializer ->getRequestMessage ($ name , $ args ));
8192 $ this ->requests []= $ request ;
8293 }
8394
84- if (strtolower ( $ name) === 'monitor ' ) {
95+ if ($ name === 'monitor ' ) {
8596 $ monitoring =& $ this ->monitoring ;
8697 $ request ->then (function () use (&$ monitoring ) {
8798 $ monitoring = true ;
8899 });
100+ } elseif (in_array ($ name , $ pubsubs )) {
101+ $ that = $ this ;
102+ $ subscribed =& $ this ->subscribed ;
103+ $ psubscribed =& $ this ->psubscribed ;
104+
105+ $ request ->then (function ($ array ) use ($ that , &$ subscribed , &$ psubscribed ) {
106+ $ first = array_shift ($ array );
107+
108+ // (p)(un)subscribe messages are to be forwarded
109+ $ that ->emit ($ first , $ array );
110+
111+ // remember number of (p)subscribe topics
112+ if ($ first === 'subscribe ' || $ first === 'unsubscribe ' ) {
113+ $ subscribed = $ array [1 ];
114+ } else {
115+ $ psubscribed = $ array [1 ];
116+ }
117+ });
89118 }
90119
91120 return $ request ->promise ();
@@ -100,6 +129,17 @@ public function handleMessage(ModelInterface $message)
100129 return ;
101130 }
102131
132+ if (($ this ->subscribed !== 0 || $ this ->psubscribed !== 0 ) && $ message instanceof MultiBulkReply) {
133+ $ array = $ message ->getValueNative ();
134+ $ first = array_shift ($ array );
135+
136+ // pub/sub messages are to be forwarded and should not be processed as request responses
137+ if (in_array ($ first , array ('message ' , 'pmessage ' ))) {
138+ $ this ->emit ($ first , $ array );
139+ return ;
140+ }
141+ }
142+
103143 if (!$ this ->requests ) {
104144 throw new UnderflowException ('Unexpected reply received, no matching request found ' );
105145 }
0 commit comments