Skip to content

Commit 64ea4cd

Browse files
vobarianChad Berchekwellwelwel
authored
fix: fix backpressure when using TLS (#1752)
* Fix backpressure when using TLS * Skip load data infile backpressure test with compression * Tests use compressed protocol only when MYSQL_USE_COMPRESSION=1; previously tests were incorrectly using compression because '0' as a string is truthy * Update backpressure tests using poku functions; tighten threshold on result set streaming test to show it works with compression; skip for MySQL < 8 since CTE isn't supported * Update test/integration/connection/test-backpressure-load-data-infile.test.cjs * Increase delay for observing backpressure to account for environment variations Co-authored-by: Weslley Araújo <[email protected]> --------- Co-authored-by: Chad Berchek <[email protected]> Co-authored-by: Weslley Araújo <[email protected]>
1 parent 9642a1e commit 64ea4cd

File tree

4 files changed

+168
-2
lines changed

4 files changed

+168
-2
lines changed

lib/base/connection.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ class BaseConnection extends EventEmitter {
392392
secureSocket.on('data', (data) => {
393393
this.packetParser.execute(data);
394394
});
395-
this.write = (buffer) => secureSocket.write(buffer);
395+
this.stream = secureSocket;
396396
}
397397

398398
protocolError(message, code) {

test/common.test.cjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const config = {
1212
user: process.env.MYSQL_USER || 'root',
1313
password: (process.env.CI ? process.env.MYSQL_PASSWORD : '') || '',
1414
database: process.env.MYSQL_DATABASE || 'test',
15-
compress: process.env.MYSQL_USE_COMPRESSION,
15+
compress: process.env.MYSQL_USE_COMPRESSION === '1',
1616
port: process.env.MYSQL_PORT || 3306,
1717
disableEval,
1818
};
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
'use strict';
2+
3+
const { assert, log, skip, sleep, test } = require('poku');
4+
const common = require('../../common.test.cjs');
5+
const { Readable, Duplex } = require('node:stream');
6+
const Net = require('node:net');
7+
const driver = require('../../../index.js');
8+
9+
if (common.config.compress) {
10+
skip(
11+
'skipping test with compression; load data infile backpressure is not working with compression'
12+
);
13+
}
14+
15+
class BigInput extends Readable {
16+
count = 0;
17+
MAX_EXPECTED_ROWS = 100_000;
18+
onStart = null;
19+
20+
_read() {
21+
if (this.onStart) {
22+
this.onStart();
23+
this.onStart = null;
24+
}
25+
26+
if (this.count < this.MAX_EXPECTED_ROWS) {
27+
this.count++;
28+
const row = `${this.count}\n`;
29+
this.push(row);
30+
} else {
31+
this.push(null);
32+
}
33+
}
34+
}
35+
36+
test('load data infile backpressure on local stream', async () => {
37+
const config = common.config;
38+
const netStream = Net.connect(config.port, config.host);
39+
netStream.setNoDelay(true);
40+
await new Promise((resolve, reject) =>
41+
netStream.once('connect', resolve).once('error', reject)
42+
);
43+
44+
class NetworkInterceptor extends Duplex {
45+
simulateWriteBackpressure = false;
46+
47+
constructor() {
48+
super({ writableHighWaterMark: 65536 });
49+
netStream.on('data', (data) => {
50+
const continueReading = this.push(data);
51+
if (!continueReading) {
52+
netStream.pause();
53+
}
54+
});
55+
netStream.on('error', (err) => this.destroy(err));
56+
}
57+
58+
_read() {
59+
netStream.resume();
60+
}
61+
62+
_write(chunk, encoding, callback) {
63+
netStream.write(chunk, encoding, (err) => {
64+
if (err) {
65+
callback(err);
66+
} else if (!this.simulateWriteBackpressure) {
67+
callback();
68+
}
69+
});
70+
}
71+
}
72+
73+
const interceptor = new NetworkInterceptor();
74+
const connection = driver.createConnection({
75+
...config,
76+
multipleStatements: true,
77+
stream: interceptor,
78+
});
79+
80+
const bigInput = new BigInput();
81+
bigInput.onStart = () => (interceptor.simulateWriteBackpressure = true);
82+
83+
connection.query(
84+
{
85+
sql: `
86+
set global local_infile = 1;
87+
create temporary table test_load_data_backpressure (id varchar(100));
88+
load data local infile "_" replace into table test_load_data_backpressure;
89+
`,
90+
infileStreamFactory: () => bigInput,
91+
},
92+
(err, result) => {
93+
if (err) throw err;
94+
log('Load complete', result);
95+
}
96+
);
97+
98+
await sleep(1000); // allow time for backpressure to take effect
99+
100+
connection.close();
101+
netStream.destroy();
102+
103+
assert.ok(
104+
bigInput.count < bigInput.MAX_EXPECTED_ROWS,
105+
`expected backpressure to stop infile stream at less than ${bigInput.MAX_EXPECTED_ROWS} rows (read ${bigInput.count} rows)`
106+
);
107+
});
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
'use strict';
2+
3+
const { assert, skip, sleep, test } = require('poku');
4+
const common = require('../../common.test.cjs');
5+
6+
test('result event backpressure with pause/resume', async () => {
7+
const connection = common.createConnection({
8+
multipleStatements: true,
9+
});
10+
11+
const mySqlVersion = await common.getMysqlVersion(connection);
12+
if (mySqlVersion.major < 8) {
13+
skip('MySQL >= 8.0 required to use CTE');
14+
}
15+
16+
// in case wrapping with TLS, get the underlying socket first so we can see actual number of bytes read
17+
const originalSocket = connection.stream;
18+
19+
// the full result set will be over 6 MB uncompressed; about 490 KB with compression
20+
const largeQuery = `
21+
SET SESSION cte_max_recursion_depth = 100000;
22+
WITH RECURSIVE cte (n, s) AS (
23+
SELECT 1, 'this is just to cause more bytes transferred for each row'
24+
UNION ALL
25+
SELECT n + 1, s
26+
FROM cte
27+
WHERE n < 100000
28+
)
29+
SELECT * FROM cte;
30+
`;
31+
32+
let resultRowsCount = 0;
33+
await new Promise((resolve, reject) =>
34+
connection
35+
.query(largeQuery)
36+
.on('result', (row) => {
37+
resultRowsCount++;
38+
if (row.n === 1) {
39+
connection.pause();
40+
resolve();
41+
}
42+
})
43+
.on('error', reject)
44+
);
45+
46+
// if backpressure is not working, the bytes received will grow during this time, even though connection is paused
47+
await sleep(500);
48+
49+
assert.equal(resultRowsCount, 2, 'stop receiving result rows when paused');
50+
51+
// if backpressure is working, there should be less than 300 KB received;
52+
// experimentally it appears to be around 100 KB but may vary if buffer sizes change
53+
assert.ok(
54+
originalSocket.bytesRead < 300000,
55+
`Received ${originalSocket.bytesRead} bytes on paused connection`
56+
);
57+
58+
connection.close();
59+
});

0 commit comments

Comments
 (0)