9
9
use Clue \Redis \Protocol \Model \ErrorReplyException ;
10
10
use Clue \Redis \Protocol \Serializer \SerializerInterface ;
11
11
use Clue \Redis \Protocol \Factory as ProtocolFactory ;
12
- use Clue \React \Redis \Request ;
13
12
use UnderflowException ;
14
13
use RuntimeException ;
14
+ use React \Promise \Deferred ;
15
+ use Clue \Redis \Protocol \Model \ErrorReply ;
16
+ use Clue \Redis \Protocol \Model \ModelInterface ;
15
17
16
18
class Client extends EventEmitter
17
19
{
@@ -46,7 +48,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
46
48
47
49
foreach ($ models as $ data ) {
48
50
try {
49
- $ that ->handleReply ($ data );
51
+ $ that ->handleMessage ($ data );
50
52
}
51
53
catch (UnderflowException $ error ) {
52
54
$ that ->emit ('error ' , array ($ error ));
@@ -67,36 +69,34 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
67
69
68
70
public function __call ($ name , $ args )
69
71
{
70
- if ($ this ->ending ) {
71
- $ e = new RuntimeException ('Connection closed ' );
72
+ $ request = new Deferred ();
72
73
73
- if (class_exists ( ' React\Promise\When ' ) ) {
74
- return \ React \ Promise \When:: reject ($ e );
75
- } else {
76
- return \ React \ Promise \reject ( $ e );
77
- }
74
+ if ($ this -> ending ) {
75
+ $ request -> reject (new RuntimeException ( ' Connection closed ' ) );
76
+ } else {
77
+ $ this -> stream -> write ( $ this -> serializer -> getRequestMessage ( $ name , $ args ) );
78
+ $ this -> requests []= $ request ;
78
79
}
79
80
80
- $ this ->stream ->write ($ this ->serializer ->getRequestMessage ($ name , $ args ));
81
-
82
- $ request = new Request ($ name );
83
- $ this ->requests []= $ request ;
84
-
85
81
return $ request ->promise ();
86
82
}
87
83
88
- public function handleReply ( $ data )
84
+ public function handleMessage ( ModelInterface $ message )
89
85
{
90
- $ this ->emit ('message ' , array ($ data , $ this ));
86
+ $ this ->emit ('message ' , array ($ message , $ this ));
91
87
92
88
if (!$ this ->requests ) {
93
89
throw new UnderflowException ('Unexpected reply received, no matching request found ' );
94
90
}
95
91
96
92
$ request = array_shift ($ this ->requests );
97
- /* @var $request Request */
93
+ /* @var $request Deferred */
98
94
99
- $ request ->handleReply ($ data );
95
+ if ($ message instanceof ErrorReply) {
96
+ $ request ->reject ($ message );
97
+ } else {
98
+ $ request ->resolve ($ message ->getValueNative ());
99
+ }
100
100
101
101
if ($ this ->ending && !$ this ->isBusy ()) {
102
102
$ this ->close ();
0 commit comments