5
5
use React \MySQL \Commands \AuthenticateCommand ;
6
6
use React \MySQL \Commands \QueryCommand ;
7
7
use React \MySQL \Commands \QuitCommand ;
8
- use React \MySQL \Exception ;
8
+ use React \MySQL \Exception as MysqlException ;
9
9
use React \Stream \DuplexStreamInterface ;
10
10
11
11
/**
@@ -25,6 +25,13 @@ class Parser
25
25
const STATE_STANDBY = 0 ;
26
26
const STATE_BODY = 1 ;
27
27
28
+ /**
29
+ * The packet header always consists of 4 bytes, 3 bytes packet length + 1 byte sequence number
30
+ *
31
+ * @var integer
32
+ */
33
+ const PACKET_SIZE_HEADER = 4 ;
34
+
28
35
/**
29
36
* Keeps a reference to the command that is currently being processed.
30
37
*
@@ -63,7 +70,20 @@ class Parser
63
70
protected $ serverStatus ;
64
71
65
72
protected $ rsState = 0 ;
66
- protected $ pctSize = 0 ;
73
+
74
+ /**
75
+ * Packet size expected in number of bytes
76
+ *
77
+ * Depending on `self::$state`, the Parser excepts either a packet header
78
+ * (always 4 bytes) or the packet contents (n bytes determined by prior
79
+ * packet header).
80
+ *
81
+ * @var int
82
+ * @see self::$state
83
+ * @see self::PACKET_SIZE_HEADER
84
+ */
85
+ private $ pctSize = self ::PACKET_SIZE_HEADER ;
86
+
67
87
protected $ resultFields = [];
68
88
69
89
protected $ insertId ;
@@ -97,7 +117,7 @@ public function __construct(DuplexStreamInterface $stream, Executor $executor)
97
117
98
118
public function start ()
99
119
{
100
- $ this ->stream ->on ('data ' , [$ this , 'parse ' ]);
120
+ $ this ->stream ->on ('data ' , [$ this , 'handleData ' ]);
101
121
$ this ->stream ->on ('close ' , [$ this , 'onClose ' ]);
102
122
}
103
123
@@ -110,31 +130,53 @@ public function debug($message)
110
130
}
111
131
}
112
132
113
- public function parse ($ data )
133
+ /** @var string $data */
134
+ public function handleData ($ data )
114
135
{
115
136
$ this ->buffer ->append ($ data );
116
- packet:
117
- if ($ this ->state === self ::STATE_STANDBY ) {
118
- if ($ this ->buffer ->length () < 4 ) {
137
+
138
+ if ($ this ->debug ) {
139
+ $ this ->debug ('Received ' . strlen ($ data ) . ' byte(s), buffer now has ' . ($ len = $ this ->buffer ->length ()) . ' byte(s): ' . wordwrap (bin2hex ($ b = $ this ->buffer ->read ($ len )), 2 , ' ' , true )); $ this ->buffer ->append ($ b ); // @codeCoverageIgnore
140
+ }
141
+
142
+ while ($ this ->buffer ->length () >= $ this ->pctSize ) {
143
+ if ($ this ->state === self ::STATE_STANDBY ) {
144
+ $ this ->pctSize = $ this ->buffer ->readInt3 ();
145
+ //printf("packet size:%d\n", $this->pctSize);
146
+ $ this ->state = self ::STATE_BODY ;
147
+ $ this ->seq = $ this ->buffer ->readInt1 () + 1 ;
148
+ }
149
+
150
+ $ len = $ this ->buffer ->length ();
151
+ if ($ len < $ this ->pctSize ) {
152
+ $ this ->debug ('Waiting for complete packet with ' . $ len . '/ ' . $ this ->pctSize . ' bytes ' );
153
+
119
154
return ;
120
155
}
121
156
122
- $ this ->pctSize = $ this ->buffer ->readInt3 ();
123
- //printf("packet size:%d\n", $this->pctSize);
124
- $ this ->state = self ::STATE_BODY ;
125
- $ this ->seq = $ this ->buffer ->readInt1 () + 1 ;
126
- }
157
+ $ packet = $ this ->buffer ->readBuffer ($ this ->pctSize );
158
+ $ this ->state = self ::STATE_STANDBY ;
159
+ $ this ->pctSize = self ::PACKET_SIZE_HEADER ;
127
160
128
- $ len = $ this ->buffer ->length ();
129
- if ($ len < $ this ->pctSize ) {
130
- $ this ->debug ('Waiting for complete packet with ' . $ len . '/ ' . $ this ->pctSize . ' bytes ' );
161
+ try {
162
+ $ this ->parsePacket ($ packet );
163
+ } catch (\UnderflowException $ e ) {
164
+ $ this ->onError (new \UnexpectedValueException ('Unexpected protocol error, received malformed packet: ' . $ e ->getMessage (), 0 , $ e ));
165
+ $ this ->stream ->close ();
166
+ return ;
167
+ }
131
168
132
- return ;
169
+ if ($ packet ->length () !== 0 ) {
170
+ $ this ->onError (new \UnexpectedValueException ('Unexpected protocol error, received malformed packet with ' . $ packet ->length () . ' unknown byte(s) ' ));
171
+ $ this ->stream ->close ();
172
+ return ;
173
+ }
133
174
}
175
+ }
134
176
135
- $ packet = $ this -> buffer -> readBuffer ( $ this -> pctSize );
136
- $ this -> state = self :: STATE_STANDBY ;
137
-
177
+ /** @return void */
178
+ private function parsePacket ( Buffer $ packet )
179
+ {
138
180
if ($ this ->debug ) {
139
181
$ this ->debug ('Parse packet# ' . $ this ->seq . ' with ' . ($ len = $ packet ->length ()) . ' bytes: ' . wordwrap (bin2hex ($ b = $ packet ->read ($ len )), 2 , ' ' , true )); $ packet ->append ($ b ); // @codeCoverageIgnore
140
182
}
@@ -146,7 +188,7 @@ public function parse($data)
146
188
$ this ->phase = self ::PHASE_AUTH_ERR ;
147
189
148
190
$ code = $ packet ->readInt2 ();
149
- $ exception = new Exception ($ packet ->read ($ packet ->length ()), $ code );
191
+ $ exception = new MysqlException ($ packet ->read ($ packet ->length ()), $ code );
150
192
$ this ->debug (sprintf ("Error Packet:%d %s \n" , $ code , $ exception ->getMessage ()));
151
193
152
194
// error during init phase also means we're not currently executing any command
@@ -186,7 +228,7 @@ public function parse($data)
186
228
// error packet
187
229
$ code = $ packet ->readInt2 ();
188
230
$ packet ->skip (6 ); // skip SQL state
189
- $ exception = new Exception ($ packet ->read ($ packet ->length ()), $ code );
231
+ $ exception = new MysqlException ($ packet ->read ($ packet ->length ()), $ code );
190
232
$ this ->debug (sprintf ("Error Packet:%d %s \n" , $ code , $ exception ->getMessage ()));
191
233
192
234
$ this ->onError ($ exception );
@@ -266,10 +308,6 @@ public function parse($data)
266
308
}
267
309
}
268
310
}
269
-
270
- // finished parsing packet, continue with next packet
271
- assert ($ packet ->length () === 0 );
272
- goto packet;
273
311
}
274
312
275
313
private function onResultRow ($ row )
@@ -279,7 +317,7 @@ private function onResultRow($row)
279
317
$ command ->emit ('result ' , [$ row ]);
280
318
}
281
319
282
- private function onError (Exception $ error )
320
+ private function onError (\ Exception $ error )
283
321
{
284
322
$ this ->rsState = self ::RS_STATE_HEADER ;
285
323
$ this ->resultFields = [];
0 commit comments