1111use Clue \Redis \Protocol \Factory as ProtocolFactory ;
1212use UnderflowException ;
1313use RuntimeException ;
14+ use InvalidArgumentException ;
1415use React \Promise \Deferred ;
1516use Clue \Redis \Protocol \Model \ErrorReply ;
1617use Clue \Redis \Protocol \Model \ModelInterface ;
@@ -77,18 +78,43 @@ public function __call($name, $args)
7778 {
7879 $ request = new Deferred ();
7980
81+ $ name = strtolower ($ name );
82+
83+ // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied
84+ static $ pubsubs = array ('subscribe ' , 'unsubscribe ' , 'psubscribe ' , 'punsubscribe ' );
85+
8086 if ($ this ->ending ) {
8187 $ request ->reject (new RuntimeException ('Connection closed ' ));
88+ } elseif (count ($ args ) !== 1 && in_array ($ name , $ pubsubs )) {
89+ $ request ->reject (new InvalidArgumentException ('PubSub commands limited to single argument ' ));
8290 } else {
8391 $ this ->stream ->write ($ this ->serializer ->getRequestMessage ($ name , $ args ));
8492 $ this ->requests []= $ request ;
8593 }
8694
87- if (strtolower ( $ name) === 'monitor ' ) {
95+ if ($ name === 'monitor ' ) {
8896 $ monitoring =& $ this ->monitoring ;
8997 $ request ->then (function () use (&$ monitoring ) {
9098 $ monitoring = true ;
9199 });
100+ } elseif (in_array ($ name , $ pubsubs )) {
101+ $ that = $ this ;
102+ $ subscribed =& $ this ->subscribed ;
103+ $ psubscribed =& $ this ->psubscribed ;
104+
105+ $ request ->then (function ($ array ) use ($ that , &$ subscribed , &$ psubscribed ) {
106+ $ first = array_shift ($ array );
107+
108+ // (p)(un)subscribe messages are to be forwarded
109+ $ that ->emit ($ first , $ array );
110+
111+ // remember number of (p)subscribe topics
112+ if ($ first === 'subscribe ' || $ first === 'unsubscribe ' ) {
113+ $ subscribed = $ array [1 ];
114+ } else {
115+ $ psubscribed = $ array [1 ];
116+ }
117+ });
92118 }
93119
94120 return $ request ->promise ();
@@ -103,17 +129,13 @@ public function handleMessage(ModelInterface $message)
103129 return ;
104130 }
105131
106- if (/* ($this->subscribed !== 0 || $this->psubscribed !== 0) &&*/ $ message instanceof MultiBulkReply) {
132+ if (($ this ->subscribed !== 0 || $ this ->psubscribed !== 0 ) && $ message instanceof MultiBulkReply) {
107133 $ array = $ message ->getValueNative ();
108134 $ first = array_shift ($ array );
109135
110- // pub/sub events are to be forwarded
111- if (in_array ($ first , array ('message ' , 'subscribe ' , 'unsubscribe ' , 'pmessage ' , 'psubscribe ' , 'punsubscribe ' ))) {
112- $ this ->emit ($ first , $ array );
113- }
114-
115- // pub/sub message events should not be processed as request responses
136+ // pub/sub messages are to be forwarded and should not be processed as request responses
116137 if (in_array ($ first , array ('message ' , 'pmessage ' ))) {
138+ $ this ->emit ($ first , $ array );
117139 return ;
118140 }
119141 }
0 commit comments