Skip to content

Commit 9642a1e

Browse files
authored
fix(stream): resume connection when stream errors or is destroyed (#3775)
* fix(stream): resume connection when stream errors or is destroyed * fix: remove listeners when stream is destroyed * ci: remove unreachable logic * chore: remove stream listener when it is destroyed * chore: ensure the error is emitted * chore: TOC * refactor: use native `close` and auto detroy * refactor: follow ESLint rules
1 parent 5bef657 commit 9642a1e

File tree

2 files changed

+133
-52
lines changed

2 files changed

+133
-52
lines changed

lib/commands/query.js

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -271,33 +271,67 @@ class Query extends Command {
271271
}
272272

273273
stream(options) {
274-
options = options || {};
274+
options = options || Object.create(null);
275275
options.objectMode = true;
276-
const stream = new Readable(options);
277-
stream._read = () => {
278-
this._connection && this._connection.resume();
279-
};
280-
this.on('result', (row, resultSetIndex) => {
281-
if (!stream.push(row)) {
282-
this._connection.pause();
283-
}
284-
stream.emit('result', row, resultSetIndex); // replicate old emitter
285-
});
286-
this.on('error', (err) => {
287-
stream.emit('error', err); // Pass on any errors
276+
277+
const stream = new Readable({
278+
...options,
279+
emitClose: true,
280+
autoDestroy: true,
281+
read: () => {
282+
this._connection && this._connection.resume();
283+
},
288284
});
289-
this.on('end', () => {
290-
stream.push(null); // pushing null, indicating EOF
285+
286+
// Prevent a breaking change for users that rely on `end` event
287+
stream.once('close', () => {
288+
if (!stream.readableEnded) {
289+
stream.emit('end');
290+
}
291291
});
292-
this.on('fields', (fields) => {
292+
293+
const onResult = (row, index) => {
294+
if (stream.destroyed) return;
295+
296+
if (!stream.push(row)) {
297+
this._connection && this._connection.pause();
298+
}
299+
300+
stream.emit('result', row, index); // replicate old emitter
301+
};
302+
303+
const onFields = (fields) => {
304+
if (stream.destroyed) return;
305+
293306
stream.emit('fields', fields); // replicate old emitter
294-
});
295-
stream.on('end', () => {
296-
stream.emit('close');
297-
});
298-
stream.on('error', () => {
299-
this._connection && this._connection.destroy();
300-
});
307+
};
308+
309+
const onEnd = () => {
310+
if (stream.destroyed) return;
311+
312+
stream.push(null); // pushing null, indicating EOF
313+
};
314+
315+
const onError = (err) => {
316+
stream.destroy(err);
317+
};
318+
319+
stream._destroy = (err, cb) => {
320+
this._connection && this._connection.resume();
321+
322+
this.removeListener('result', onResult);
323+
this.removeListener('fields', onFields);
324+
this.removeListener('end', onEnd);
325+
this.removeListener('error', onError);
326+
327+
cb(err); // Pass on any errors
328+
};
329+
330+
this.on('result', onResult);
331+
this.on('fields', onFields);
332+
this.on('end', onEnd);
333+
this.on('error', onError);
334+
301335
return stream;
302336
}
303337

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,92 @@
11
'use strict';
22

3-
const process = require('node:process');
4-
const { test, skip } = require('poku');
3+
const { assert, describe, test } = require('poku');
54
const common = require('../../common.test.cjs');
65

7-
if (process.env.MYSQL_USE_TLS === '1') skip('Skipping for SSL=1');
8-
9-
test('Ensure stream ends in case of error', async () => {
6+
describe(async () => {
107
const connection = common.createConnection();
118

12-
connection.query(
13-
[
14-
'CREATE TEMPORARY TABLE `items` (',
15-
'`id` int(11) NOT NULL AUTO_INCREMENT,',
16-
'`text` varchar(255) DEFAULT NULL,',
17-
'PRIMARY KEY (`id`)',
18-
') ENGINE=InnoDB DEFAULT CHARSET=utf8',
19-
].join('\n'),
20-
(err) => {
21-
if (err) {
22-
throw err;
9+
await test('Ensure stream ends in case of error', async () => {
10+
connection.query(
11+
[
12+
'CREATE TEMPORARY TABLE `items` (',
13+
'`id` int(11) NOT NULL AUTO_INCREMENT,',
14+
'`text` varchar(255) DEFAULT NULL,',
15+
'PRIMARY KEY (`id`)',
16+
') ENGINE=InnoDB DEFAULT CHARSET=utf8',
17+
].join('\n'),
18+
(err) => {
19+
if (err) {
20+
throw err;
21+
}
2322
}
23+
);
24+
25+
for (let i = 0; i < 100; i++) {
26+
connection.execute(
27+
'INSERT INTO items(text) VALUES(?)',
28+
['test'],
29+
(err) => {
30+
if (err) {
31+
throw err;
32+
}
33+
}
34+
);
2435
}
25-
);
2636

27-
for (let i = 0; i < 100; i++) {
28-
connection.execute('INSERT INTO items(text) VALUES(?)', ['test'], (err) => {
29-
if (err) {
30-
throw err;
31-
}
37+
const rows = connection.query('SELECT * FROM items').stream();
38+
39+
// eslint-disable-next-line no-unused-vars
40+
for await (const _ of rows) break; // forces return () -> destroy()
41+
});
42+
43+
await test('end: Ensure stream emits error then close on server-side query error', async () => {
44+
let uncaughtExceptionError;
45+
46+
const stream = connection
47+
.query('SELECT invalid_column FROM invalid_table')
48+
.stream();
49+
50+
stream.on('error', (error) => {
51+
uncaughtExceptionError = error;
52+
});
53+
54+
await new Promise((resolve) => stream.on('end', resolve));
55+
56+
assert(
57+
uncaughtExceptionError instanceof Error,
58+
'Expected an uncaught exception error'
59+
);
60+
61+
assert.equal(
62+
uncaughtExceptionError.message,
63+
"Table 'test.invalid_table' doesn't exist"
64+
);
65+
});
66+
67+
await test('close: Ensure stream emits error then close on server-side query error', async () => {
68+
let uncaughtExceptionError;
69+
70+
const stream = connection
71+
.query('SELECT invalid_column FROM invalid_table')
72+
.stream();
73+
74+
stream.on('error', (error) => {
75+
uncaughtExceptionError = error;
3276
});
33-
}
3477

35-
const rows = connection.query('SELECT * FROM items').stream();
78+
await new Promise((resolve) => stream.on('close', resolve));
3679

37-
// eslint-disable-next-line no-unused-vars
38-
for await (const _ of rows) break;
80+
assert(
81+
uncaughtExceptionError instanceof Error,
82+
'Expected an uncaught exception error'
83+
);
3984

40-
setTimeout(() => {
41-
throw new Error('Connection remains open after stream error');
42-
}, 1000).unref();
85+
assert.equal(
86+
uncaughtExceptionError.message,
87+
"Table 'test.invalid_table' doesn't exist"
88+
);
89+
});
4390

4491
connection.end();
4592
});

0 commit comments

Comments
 (0)