Skip to content

Commit 8ec58bc

Browse files
author
Chad Berchek
committed
Fix backpressure when using TLS
1 parent 0b835df commit 8ec58bc

File tree

3 files changed

+161
-1
lines changed

3 files changed

+161
-1
lines changed

lib/base/connection.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ class BaseConnection extends EventEmitter {
392392
secureSocket.on('data', (data) => {
393393
this.packetParser.execute(data);
394394
});
395-
this.write = (buffer) => secureSocket.write(buffer);
395+
this.stream = secureSocket;
396396
}
397397

398398
protocolError(message, code) {
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
'use strict';
2+
3+
const { assert, test, log } = require('poku');
4+
const common = require('../../common.test.cjs');
5+
const { Readable, Duplex } = require('stream');
6+
const Net = require('node:net');
7+
const driver = require('../../../index.js');
8+
const { setTimeout } = require('node:timers/promises');
9+
10+
class BigInput extends Readable {
11+
count = 0;
12+
MAX_EXPECTED_ROWS = 100_000;
13+
onStart = null;
14+
15+
_read() {
16+
if (this.onStart) {
17+
this.onStart();
18+
this.onStart = null;
19+
}
20+
21+
if (this.count < this.MAX_EXPECTED_ROWS) {
22+
this.count++;
23+
const row = `${this.count}-${Math.random()}\n`;
24+
this.push(row);
25+
} else {
26+
this.push(null);
27+
}
28+
}
29+
}
30+
31+
test('load data infile backpressure on local stream', async () => {
32+
const config = common.config;
33+
const netStream = Net.connect(config.port, config.host);
34+
netStream.setNoDelay(true);
35+
await new Promise((resolve, reject) =>
36+
netStream.once('connect', resolve).once('error', reject)
37+
);
38+
39+
class NetworkInterceptor extends Duplex {
40+
simulateWriteBackpressure = false;
41+
42+
constructor() {
43+
super({ writableHighWaterMark: 65536 });
44+
netStream.on('data', (data) => {
45+
const continueReading = this.push(data);
46+
if (!continueReading) {
47+
netStream.pause();
48+
}
49+
});
50+
netStream.on('error', (err) => this.destroy(err));
51+
}
52+
53+
_read() {
54+
netStream.resume();
55+
}
56+
57+
_write(chunk, encoding, callback) {
58+
netStream.write(chunk, encoding, (err) => {
59+
if (err) {
60+
callback(err);
61+
} else if (!this.simulateWriteBackpressure) {
62+
callback();
63+
}
64+
});
65+
}
66+
}
67+
68+
const interceptor = new NetworkInterceptor();
69+
const connection = driver.createConnection({
70+
...config,
71+
multipleStatements: true,
72+
stream: interceptor,
73+
});
74+
75+
try {
76+
const bigInput = new BigInput();
77+
bigInput.onStart = () => (interceptor.simulateWriteBackpressure = true);
78+
79+
connection.query(
80+
{
81+
sql: `
82+
set global local_infile = 1;
83+
create temporary table test_load_data_backpressure (id varchar(100));
84+
load data local infile "_" replace into table test_load_data_backpressure;
85+
`,
86+
infileStreamFactory: () => bigInput,
87+
},
88+
(err, result) => {
89+
if (err) throw err;
90+
log('Load complete', result);
91+
}
92+
);
93+
94+
await setTimeout(100); // allow time for backpressure to take effect
95+
96+
assert.ok(
97+
bigInput.count < bigInput.MAX_EXPECTED_ROWS,
98+
`expected backpressure to stop infile stream at less than ${bigInput.MAX_EXPECTED_ROWS} rows (read ${bigInput.count} rows)`
99+
);
100+
} finally {
101+
connection.close();
102+
netStream.destroy();
103+
}
104+
});
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict';
2+
3+
const { assert, test } = require('poku');
4+
const common = require('../../common.test.cjs');
5+
const timers = require('node:timers');
6+
7+
test('result event backpressure with pause/resume', async () => {
8+
const connection = common.createConnection({
9+
multipleStatements: true,
10+
});
11+
try {
12+
// in case wrapping with TLS, get the underlying socket first so we can see actual number of bytes read
13+
const originalSocket = connection.stream;
14+
15+
// the full result set will be over 6 MB
16+
const largeQuery = `
17+
SET SESSION cte_max_recursion_depth = 100000;
18+
WITH RECURSIVE cte (n, s) AS (
19+
SELECT 1, 'this is just to cause more bytes transferred for each row'
20+
UNION ALL
21+
SELECT n + 1, s
22+
FROM cte
23+
WHERE n < 100000
24+
)
25+
SELECT * FROM cte;
26+
`;
27+
28+
let resultRowsCount = 0;
29+
await new Promise((resolve, reject) =>
30+
connection
31+
.query(largeQuery)
32+
.on('result', (row) => {
33+
resultRowsCount++;
34+
if (row.n === 1) {
35+
connection.pause();
36+
resolve();
37+
}
38+
})
39+
.on('error', reject)
40+
);
41+
42+
// if backpressure is not working, the bytes received will grow during this time, even though connection is paused
43+
await timers.promises.setTimeout(500);
44+
45+
assert.equal(resultRowsCount, 2, 'stop receiving result rows when paused');
46+
47+
// if backpressure is working, there should be less than 1 MB received;
48+
// experimentally it appears to be around 100 KB but may vary if buffer sizes change
49+
assert.ok(
50+
originalSocket.bytesRead < 1000000,
51+
`Received ${originalSocket.bytesRead} bytes on paused connection`
52+
);
53+
} finally {
54+
connection.close();
55+
}
56+
});

0 commit comments

Comments
 (0)