Skip to content

Commit 01962f3

Browse files
committed
Simplify parser logic by removing unneeded queue
1 parent bb713ac commit 01962f3

File tree

1 file changed

+45
-14
lines changed

1 file changed

+45
-14
lines changed

src/Protocal/Parser.php

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,20 @@ class Parser extends EventEmitter
2525
protected $dbname = '';
2626

2727
/**
28-
* @var \React\MySQL\Command
28+
* Keeps a reference to the command that is currently being processed.
29+
*
30+
* The MySQL protocol is inherently sequential, the pending commands will be
31+
* stored in an `Executor` queue.
32+
*
33+
* The MySQL protocol communication starts with the server sending a
34+
* handshake message, so the current command will be `null` until it's our
35+
* turn.
36+
*
37+
* Similarly, when one command is finished, it will continue processing the
38+
* next command from the `Executor` queue. If no command is outstanding,
39+
* this will be reset to the `null` state.
40+
*
41+
* @var \React\MySQL\Command|null
2942
*/
3043
protected $currCommand;
3144

@@ -80,13 +93,20 @@ class Parser extends EventEmitter
8093
*/
8194
protected $executor;
8295

96+
/**
97+
* @deprecated
98+
* @see self::$currCommand
99+
*/
83100
protected $queue;
84101

85102
public function __construct($stream, $executor)
86103
{
87104
$this->stream = $stream;
88105
$this->executor = $executor;
89-
$this->queue = new \SplQueue($this);
106+
107+
// @deprecated unused, exists for BC only.
108+
$this->queue = new \SplQueue();
109+
90110
$executor->on('new', array($this, 'handleNewCommand'));
91111
}
92112

@@ -98,7 +118,7 @@ public function start()
98118

99119
public function handleNewCommand()
100120
{
101-
if ($this->queue->count() <= 0) {
121+
if ($this->currCommand === null) {
102122
$this->nextRequest();
103123
}
104124
}
@@ -191,8 +211,8 @@ public function parse($data)
191211
$this->errmsg = $this->read($this->pctSize - $len + $this->length());
192212
$this->debug(sprintf("Error Packet:%d %s\n", $this->errno, $this->errmsg));
193213

194-
$this->nextRequest();
195214
$this->onError();
215+
$this->nextRequest();
196216
} elseif ($fieldCount === 0x00) {
197217
// Empty OK Packet
198218
$this->debug('Ok Packet');
@@ -238,8 +258,8 @@ public function parse($data)
238258
// finalize this result set (all rows completed)
239259
$this->debug('result done');
240260

241-
$this->nextRequest();
242261
$this->onResultDone();
262+
$this->nextRequest();
243263
} else {
244264
// move to next part of result set (header->field->row)
245265
++$this->rsState;
@@ -298,14 +318,15 @@ private function onResultRow($row)
298318
{
299319
// $this->debug('row data: ' . json_encode($row));
300320
$this->resultRows[] = $row;
301-
$command = $this->queue->dequeue();
321+
$command = $this->currCommand;
302322
$command->emit('result', array($row, $command, $command->getConnection()));
303-
$this->queue->unshift($command);
304323
}
305324

306325
protected function onError()
307326
{
308-
$command = $this->queue->dequeue();
327+
$command = $this->currCommand;
328+
$this->currCommand = null;
329+
309330
$error = new Exception($this->errmsg, $this->errno);
310331
$command->setError($error);
311332
$command->emit('error', array($error, $command, $command->getConnection()));
@@ -315,7 +336,9 @@ protected function onError()
315336

316337
protected function onResultDone()
317338
{
318-
$command = $this->queue->dequeue();
339+
$command = $this->currCommand;
340+
$this->currCommand = null;
341+
319342
$command->resultRows = $this->resultRows;
320343
$command->resultFields = $this->resultFields;
321344
$command->emit('results', array($this->resultRows, $command, $command->getConnection()));
@@ -327,7 +350,9 @@ protected function onResultDone()
327350

328351
protected function onSuccess()
329352
{
330-
$command = $this->queue->dequeue();
353+
$command = $this->currCommand;
354+
$this->currCommand = null;
355+
331356
if ($command->equals(Command::QUERY)) {
332357
$command->affectedRows = $this->affectedRows;
333358
$command->insertId = $this->insertId;
@@ -339,15 +364,19 @@ protected function onSuccess()
339364

340365
protected function onAuthenticated()
341366
{
342-
$command = $this->queue->dequeue();
367+
$command = $this->currCommand;
368+
$this->currCommand = null;
369+
343370
$command->emit('authenticated', array($this->connectOptions));
344371
}
345372

346373
protected function onClose()
347374
{
348375
$this->emit('close');
349-
if ($this->queue->count()) {
350-
$command = $this->queue->dequeue();
376+
if ($this->currCommand !== null) {
377+
$command = $this->currCommand;
378+
$this->currCommand = null;
379+
351380
if ($command->equals(Command::QUIT)) {
352381
$command->emit('success');
353382
}
@@ -525,9 +554,11 @@ protected function nextRequest($isHandshake = false)
525554
if (!$isHandshake && $this->phase != self::PHASE_HANDSHAKED) {
526555
return false;
527556
}
557+
528558
if (!$this->executor->isIdle()) {
529559
$command = $this->executor->dequeue();
530-
$this->queue->enqueue($command);
560+
$this->currCommand = $command;
561+
531562
if ($command->equals(Command::INIT_AUTHENTICATE)) {
532563
$this->authenticate();
533564
} else {

0 commit comments

Comments
 (0)