Skip to content
2 changes: 1 addition & 1 deletion lib/base/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class BaseConnection extends EventEmitter {
secureSocket.on('data', (data) => {
this.packetParser.execute(data);
});
this.write = (buffer) => secureSocket.write(buffer);
this.stream = secureSocket;
}

protocolError(message, code) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
'use strict';

const { assert, test, log } = require('poku');
const common = require('../../common.test.cjs');
const { Readable, Duplex } = require('stream');
const Net = require('node:net');
const driver = require('../../../index.js');
const { setTimeout } = require('node:timers/promises');

if (common.config.compress) {
console.log('skipping test with compression; load data infile backpressure is not working with compression');

Check failure on line 11 in test/integration/connection/test-backpressure-load-data-infile.test.cjs

View workflow job for this annotation

GitHub Actions / lint-js

Replace `'skipping·test·with·compression;·load·data·infile·backpressure·is·not·working·with·compression'` with `⏎····'skipping·test·with·compression;·load·data·infile·backpressure·is·not·working·with·compression'⏎··`
process.exit(0);
}

class BigInput extends Readable {
count = 0;
MAX_EXPECTED_ROWS = 100_000;
onStart = null;

_read() {
if (this.onStart) {
this.onStart();
this.onStart = null;
}

if (this.count < this.MAX_EXPECTED_ROWS) {
this.count++;
const row = `${this.count}\n`;
this.push(row);
} else {
this.push(null);
}
}
}

test('load data infile backpressure on local stream', async () => {
const config = common.config;
const netStream = Net.connect(config.port, config.host);
netStream.setNoDelay(true);
await new Promise((resolve, reject) =>
netStream.once('connect', resolve).once('error', reject)
);

class NetworkInterceptor extends Duplex {
simulateWriteBackpressure = false;

constructor() {
super({ writableHighWaterMark: 65536 });
netStream.on('data', (data) => {
const continueReading = this.push(data);
if (!continueReading) {
netStream.pause();
}
});
netStream.on('error', (err) => this.destroy(err));
}

_read() {
netStream.resume();
}

_write(chunk, encoding, callback) {
netStream.write(chunk, encoding, (err) => {
if (err) {
callback(err);
} else if (!this.simulateWriteBackpressure) {
callback();
}
});
}
}

const interceptor = new NetworkInterceptor();
const connection = driver.createConnection({
...config,
multipleStatements: true,
stream: interceptor,
});

try {
const bigInput = new BigInput();
bigInput.onStart = () => (interceptor.simulateWriteBackpressure = true);

connection.query(
{
sql: `
set global local_infile = 1;
create temporary table test_load_data_backpressure (id varchar(100));
load data local infile "_" replace into table test_load_data_backpressure;
`,
infileStreamFactory: () => bigInput,
},
(err, result) => {
if (err) throw err;
log('Load complete', result);
}
);

await setTimeout(100); // allow time for backpressure to take effect

assert.ok(
bigInput.count < bigInput.MAX_EXPECTED_ROWS,
`expected backpressure to stop infile stream at less than ${bigInput.MAX_EXPECTED_ROWS} rows (read ${bigInput.count} rows)`
);
} finally {
connection.close();
netStream.destroy();
}
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict';

const { assert, test } = require('poku');
const common = require('../../common.test.cjs');
const timers = require('node:timers');

test('result event backpressure with pause/resume', async () => {
const connection = common.createConnection({
multipleStatements: true,
});
try {
// in case wrapping with TLS, get the underlying socket first so we can see actual number of bytes read
const originalSocket = connection.stream;

// the full result set will be over 6 MB
const largeQuery = `
SET SESSION cte_max_recursion_depth = 100000;
WITH RECURSIVE cte (n, s) AS (
SELECT 1, 'this is just to cause more bytes transferred for each row'
UNION ALL
SELECT n + 1, s
FROM cte
WHERE n < 100000
)
SELECT * FROM cte;
`;

let resultRowsCount = 0;
await new Promise((resolve, reject) =>
connection
.query(largeQuery)
.on('result', (row) => {
resultRowsCount++;
if (row.n === 1) {
connection.pause();
resolve();
}
})
.on('error', reject)
);

// if backpressure is not working, the bytes received will grow during this time, even though connection is paused
await timers.promises.setTimeout(500);

assert.equal(resultRowsCount, 2, 'stop receiving result rows when paused');

// if backpressure is working, there should be less than 1 MB received;
// experimentally it appears to be around 100 KB but may vary if buffer sizes change
assert.ok(
originalSocket.bytesRead < 1000000,
`Received ${originalSocket.bytesRead} bytes on paused connection`
);
} finally {
connection.close();
}
});
Loading