Skip to content

Commit 8a05b9e

Browse files
committed
Make parser more robust by splitting on individual package boundaries
1 parent d3d5b08 commit 8a05b9e

File tree

4 files changed

+124
-89
lines changed

4 files changed

+124
-89
lines changed

src/Io/Buffer.php

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,43 @@ public function read($len)
6161
return $buffer;
6262
}
6363

64+
/**
65+
* Reads data with given byte length from buffer into a new buffer
66+
*
67+
* This class keeps consumed data in memory for performance reasons and only
68+
* advances the internal buffer position by default. Reading data into a new
69+
* buffer will clear the data from the original buffer to free memory.
70+
*
71+
* @param int $len length in bytes, must be positive or zero
72+
* @return self
73+
* @throws \UnderflowException
74+
*/
75+
public function readBuffer($len)
76+
{
77+
// happy path to return empty buffer without any memory access for zero length string
78+
if ($len === 0) {
79+
return new self();
80+
}
81+
82+
// ensure buffer size contains $len bytes by checking target buffer position
83+
if ($len < 0 || !isset($this->buffer[$this->bufferPos + $len - 1])) {
84+
throw new \UnderflowException('Not enough data in buffer to read ' . $len . ' bytes');
85+
}
86+
87+
$buffer = new self();
88+
$buffer->buffer = $this->read($len);
89+
90+
if (!isset($this->buffer[$this->bufferPos])) {
91+
$this->buffer = '';
92+
} else {
93+
$this->buffer = \substr($this->buffer, $this->bufferPos);
94+
}
95+
$this->bufferPos = 0;
96+
97+
return $buffer;
98+
99+
}
100+
64101
/**
65102
* Skips binary string data with given byte length from buffer
66103
*
@@ -79,24 +116,6 @@ public function skip($len)
79116
$this->bufferPos += $len;
80117
}
81118

82-
/**
83-
* Clears all consumed data from the buffer
84-
*
85-
* This class keeps consumed data in memory for performance reasons and only
86-
* advances the internal buffer position until this method is called.
87-
*
88-
* @return void
89-
*/
90-
public function trim()
91-
{
92-
if (!isset($this->buffer[$this->bufferPos])) {
93-
$this->buffer = '';
94-
} else {
95-
$this->buffer = \substr($this->buffer, $this->bufferPos);
96-
}
97-
$this->bufferPos = 0;
98-
}
99-
100119
/**
101120
* returns the buffer length measures in number of bytes
102121
*

src/Io/Parser.php

Lines changed: 59 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,22 @@ public function parse($data)
131131

132132
return;
133133
}
134+
135+
$packet = $this->buffer->readBuffer($this->pctSize);
134136
$this->state = self::STATE_STANDBY;
135-
//$this->stream->bufferSize = 4;
137+
138+
if ($this->debug) {
139+
$this->debug('Parse packet#' . $this->seq . ' with ' . ($len = $packet->length()) . ' bytes: ' . wordwrap(bin2hex($b = $packet->read($len)), 2, ' ', true)); $packet->append($b); // @codeCoverageIgnore
140+
}
141+
136142
if ($this->phase === 0) {
137-
$response = $this->buffer->readInt1();
143+
$response = $packet->readInt1();
138144
if ($response === 0xFF) {
139145
// error packet before handshake means we did not exchange capabilities and error does not include SQL state
140146
$this->phase = self::PHASE_AUTH_ERR;
141147

142-
$code = $this->buffer->readInt2();
143-
$exception = new Exception($this->buffer->read($this->pctSize - $len + $this->buffer->length()), $code);
148+
$code = $packet->readInt2();
149+
$exception = new Exception($packet->read($packet->length()), $code);
144150
$this->debug(sprintf("Error Packet:%d %s\n", $code, $exception->getMessage()));
145151

146152
// error during init phase also means we're not currently executing any command
@@ -155,32 +161,32 @@ public function parse($data)
155161
$this->debug(sprintf("Protocal Version: %d", $this->protocalVersion));
156162

157163
$options = &$this->connectOptions;
158-
$options['serverVersion'] = $this->buffer->readStringNull();
159-
$options['threadId'] = $this->buffer->readInt4();
160-
$this->scramble = $this->buffer->read(8); // 1st part
161-
$this->buffer->skip(1); // filler
162-
$options['ServerCaps'] = $this->buffer->readInt2(); // 1st part
163-
$options['serverLang'] = $this->buffer->readInt1();
164-
$options['serverStatus'] = $this->buffer->readInt2();
165-
$options['ServerCaps'] += $this->buffer->readInt2() << 16; // 2nd part
166-
$this->buffer->skip(11); // plugin length, 6 + 4 filler
167-
$this->scramble .= $this->buffer->read(12); // 2nd part
168-
$this->buffer->skip(1);
164+
$options['serverVersion'] = $packet->readStringNull();
165+
$options['threadId'] = $packet->readInt4();
166+
$this->scramble = $packet->read(8); // 1st part
167+
$packet->skip(1); // filler
168+
$options['ServerCaps'] = $packet->readInt2(); // 1st part
169+
$options['serverLang'] = $packet->readInt1();
170+
$options['serverStatus'] = $packet->readInt2();
171+
$options['ServerCaps'] += $packet->readInt2() << 16; // 2nd part
172+
$packet->skip(11); // plugin length, 6 + 4 filler
173+
$this->scramble .= $packet->read(12); // 2nd part
174+
$packet->skip(1);
169175

170176
if ($this->connectOptions['ServerCaps'] & Constants::CLIENT_PLUGIN_AUTH) {
171-
$this->buffer->readStringNull(); // skip authentication plugin name
177+
$packet->readStringNull(); // skip authentication plugin name
172178
}
173179

174180
// init completed, continue with sending AuthenticateCommand
175181
$this->nextRequest(true);
176182
} else {
177-
$fieldCount = $this->buffer->readInt1();
183+
$fieldCount = $packet->readInt1();
178184

179185
if ($fieldCount === 0xFF) {
180186
// error packet
181-
$code = $this->buffer->readInt2();
182-
$this->buffer->skip(6); // skip SQL state
183-
$exception = new Exception($this->buffer->read($this->pctSize - $len + $this->buffer->length()), $code);
187+
$code = $packet->readInt2();
188+
$packet->skip(6); // skip SQL state
189+
$exception = new Exception($packet->read($packet->length()), $code);
184190
$this->debug(sprintf("Error Packet:%d %s\n", $code, $exception->getMessage()));
185191

186192
$this->onError($exception);
@@ -193,19 +199,19 @@ public function parse($data)
193199
$this->phase = self::PHASE_HANDSHAKED;
194200
}
195201

196-
$this->affectedRows = $this->buffer->readIntLen();
197-
$this->insertId = $this->buffer->readIntLen();
198-
$this->serverStatus = $this->buffer->readInt2();
199-
$this->warningCount = $this->buffer->readInt2();
202+
$this->affectedRows = $packet->readIntLen();
203+
$this->insertId = $packet->readIntLen();
204+
$this->serverStatus = $packet->readInt2();
205+
$this->warningCount = $packet->readInt2();
200206

201-
$this->message = $this->buffer->read($this->pctSize - $len + $this->buffer->length());
207+
$this->message = $packet->read($packet->length());
202208

203209
$this->debug(sprintf("AffectedRows: %d, InsertId: %d, WarningCount:%d", $this->affectedRows, $this->insertId, $this->warningCount));
204210
$this->onSuccess();
205211
$this->nextRequest();
206212
} elseif ($fieldCount === 0xFE) {
207213
// EOF Packet
208-
$this->buffer->skip(4); // warn, status
214+
$packet->skip(4); // warn, status
209215
if ($this->rsState === self::RS_STATE_ROW) {
210216
// finalize this result set (all rows completed)
211217
$this->debug('Result set done');
@@ -217,54 +223,52 @@ public function parse($data)
217223
$this->debug('Result set next part');
218224
++$this->rsState;
219225
}
220-
} elseif ($fieldCount === 0x00 && $this->pctSize === 1) {
221-
// Empty data packet during result set => row with only empty strings
222-
$this->debug('Result set empty row data');
223-
224-
$row = [];
225-
foreach ($this->resultFields as $field) {
226-
$row[$field['name']] = '';
227-
}
228-
$this->onResultRow($row);
229226
} else {
230227
// Data packet
231-
$this->buffer->prepend($this->buffer->buildInt1($fieldCount));
228+
$packet->prepend($packet->buildInt1($fieldCount));
232229

233230
if ($this->rsState === self::RS_STATE_HEADER) {
234-
$this->debug('Result set header packet');
235-
$this->buffer->readIntLen(); // extra
231+
$columns = $packet->readIntLen(); // extra
232+
$this->debug('Result set with ' . $columns . ' column(s)');
236233
$this->rsState = self::RS_STATE_FIELD;
237234
} elseif ($this->rsState === self::RS_STATE_FIELD) {
238-
$this->debug('Result set field packet');
239235
$field = [
240-
'catalog' => $this->buffer->readStringLen(),
241-
'db' => $this->buffer->readStringLen(),
242-
'table' => $this->buffer->readStringLen(),
243-
'org_table' => $this->buffer->readStringLen(),
244-
'name' => $this->buffer->readStringLen(),
245-
'org_name' => $this->buffer->readStringLen()
236+
'catalog' => $packet->readStringLen(),
237+
'db' => $packet->readStringLen(),
238+
'table' => $packet->readStringLen(),
239+
'org_table' => $packet->readStringLen(),
240+
'name' => $packet->readStringLen(),
241+
'org_name' => $packet->readStringLen()
246242
];
247243

248-
$this->buffer->skip(1); // 0xC0
249-
$field['charset'] = $this->buffer->readInt2();
250-
$field['length'] = $this->buffer->readInt4();
251-
$field['type'] = $this->buffer->readInt1();
252-
$field['flags'] = $this->buffer->readInt2();
253-
$field['decimals'] = $this->buffer->readInt1();
254-
$this->buffer->skip(2); // unused
244+
$packet->skip(1); // 0xC0
245+
$field['charset'] = $packet->readInt2();
246+
$field['length'] = $packet->readInt4();
247+
$field['type'] = $packet->readInt1();
248+
$field['flags'] = $packet->readInt2();
249+
$field['decimals'] = $packet->readInt1();
250+
$packet->skip(2); // unused
251+
252+
if ($this->debug) {
253+
$this->debug('Result set column: ' . json_encode($field, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRESERVE_ZERO_FRACTION | JSON_INVALID_UTF8_SUBSTITUTE)); // @codeCoverageIgnore
254+
}
255255
$this->resultFields[] = $field;
256256
} elseif ($this->rsState === self::RS_STATE_ROW) {
257-
$this->debug('Result set row data');
258257
$row = [];
259258
foreach ($this->resultFields as $field) {
260-
$row[$field['name']] = $this->buffer->readStringLen();
259+
$row[$field['name']] = $packet->readStringLen();
260+
}
261+
262+
if ($this->debug) {
263+
$this->debug('Result set row: ' . json_encode($row, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRESERVE_ZERO_FRACTION | JSON_INVALID_UTF8_SUBSTITUTE)); // @codeCoverageIgnore
261264
}
262265
$this->onResultRow($row);
263266
}
264267
}
265268
}
266269

267-
$this->buffer->trim();
270+
// finished parsing packet, continue with next packet
271+
assert($packet->length() === 0);
268272
goto packet;
269273
}
270274

tests/Io/BufferTest.php

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,41 +36,53 @@ public function testReadAfterSkipOne()
3636
$this->assertSame('i', $buffer->read(1));
3737
}
3838

39-
public function testSkipZeroThrows()
39+
public function testReadBufferEmptyIsNoop()
4040
{
4141
$buffer = new Buffer();
4242

43-
$buffer->append('hi');
43+
$new = $buffer->readBuffer(0);
4444

45-
$this->setExpectedException('LogicException');
46-
$buffer->skip(0);
45+
$this->assertSame(0, $buffer->length());
46+
$this->assertSame(0, $new->length());
4747
}
4848

49-
public function testSkipBeyondLimitThrows()
49+
public function testReadBufferReturnsBufferWithOriginalLengthAndClearsOriginalBuffer()
5050
{
5151
$buffer = new Buffer();
52+
$buffer->append('foo');
5253

53-
$buffer->append('hi');
54+
$new = $buffer->readBuffer($buffer->length());
5455

55-
$this->setExpectedException('LogicException');
56-
$buffer->skip(3);
56+
$this->assertSame(0, $buffer->length());
57+
$this->assertSame(3, $new->length());
5758
}
5859

59-
public function testTrimEmptyIsNoop()
60+
public function testReadBufferBeyondLimitThrows()
6061
{
6162
$buffer = new Buffer();
62-
$buffer->trim();
6363

64-
$this->assertSame(0, $buffer->length());
64+
$this->setExpectedException('UnderflowException');
65+
$buffer->readBuffer(3);
66+
}
67+
68+
public function testSkipZeroThrows()
69+
{
70+
$buffer = new Buffer();
71+
72+
$buffer->append('hi');
73+
74+
$this->setExpectedException('LogicException');
75+
$buffer->skip(0);
6576
}
6677

67-
public function testTrimDoesNotChangeLength()
78+
public function testSkipBeyondLimitThrows()
6879
{
6980
$buffer = new Buffer();
70-
$buffer->append('a');
71-
$buffer->trim();
7281

73-
$this->assertSame(1, $buffer->length());
82+
$buffer->append('hi');
83+
84+
$this->setExpectedException('LogicException');
85+
$buffer->skip(3);
7486
}
7587

7688
public function testParseInt1()

tests/Io/ParserTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public function testReceivingErrorFrameForQueryAfterResultSetHeadersShouldEmitEr
132132

133133
$stream->write("\x33\0\0\0" . "\x0a" . "mysql\0" . str_repeat("\0", 44));
134134
$stream->write("\x01\0\0\1" . "\x01");
135-
$stream->write("\x1E\0\0\2" . "\x03" . "def" . "\0" . "\0" . "\0" . "\x09" . "sleep(10)" . "\0" . "\xC0" . "\x3F\0" . "\1\0\0\0" . "\3" . "\x81\0". "\0" . "\0\0");
135+
$stream->write("\x1F\0\0\2" . "\x03" . "def" . "\0" . "\0" . "\0" . "\x09" . "sleep(10)" . "\0" . "\xC0" . "\x3F\0" . "\1\0\0\0" . "\3" . "\x81\0". "\0" . "\0\0");
136136
$stream->write("\x05\0\0\3" . "\xFE" . "\0\0\2\0");
137137
$stream->write("\x28\0\0\4" . "\xFF" . "\x25\x05" . "#abcde" . "Query execution was interrupted");
138138

0 commit comments

Comments
 (0)