diff --git a/index.js b/index.js index 9b0d161..f3ee72c 100644 --- a/index.js +++ b/index.js @@ -8,6 +8,7 @@ class PgQueryStream extends Readable { this.cursor = new Cursor(text, values) this._reading = false this._closed = false + this._buffer = [] this.batchSize = (options || { }).batchSize || 100 // delegate Submittable callbacks to cursor @@ -35,7 +36,22 @@ class PgQueryStream extends Readable { return false } this._reading = true - const readAmount = Math.max(size, this.batchSize) + var readAmount = Math.max(size, this.batchSize) + var object; + + while ((object = this._buffer.shift()) && readAmount) { + readAmount--; + if (!this.push(object)) { + this._reading = false + return + } + } + + if (!readAmount) { + this._reading = false + return + } + this.cursor.read(readAmount, (err, rows) => { if (this._closed) { return @@ -52,8 +68,11 @@ class PgQueryStream extends Readable { // push each row into the stream this._reading = false - for (var i = 0; i < rows.length; i++) { - this.push(rows[i]) + while (object = rows.shift()) { + if (!this.push(object)) { + this._buffer = this._buffer.concat(rows) + return + } } }) } diff --git a/test/pauses.js b/test/pauses.js index 8d9beb0..7d6f1b7 100644 --- a/test/pauses.js +++ b/test/pauses.js @@ -1,6 +1,7 @@ var concat = require('concat-stream') var tester = require('stream-tester') var JSONStream = require('JSONStream') +var assert = require('assert'); var QueryStream = require('../') @@ -15,4 +16,29 @@ require('./helper')('pauses', function (client) { done() })) }) + + it('keeps a stable internal buffer size when paused/resumed', function (done) { + this.timeout(5000) + + var stream = client.query(new QueryStream('SELECT * FROM generate_series(0, $1)', [10000], {batchSize: 100})) + var results = [] + var concurrency = 50 + + stream.on('data', function (result) { + results.push(result) + + if (results.length == concurrency) { + stream.pause() + + setTimeout(function () { + results = [] + stream.resume() + }, 10) + } + + assert(stream._readableState.buffer.length <= stream.batchSize) + }) + + stream.on('end', done); + }) }) diff --git a/test/pushes.js b/test/pushes.js new file mode 100644 index 0000000..6633c7e --- /dev/null +++ b/test/pushes.js @@ -0,0 +1,31 @@ +var helper = require('./helper') +var QueryStream = require('../') +var Writable = require('stream').Writable +var assert = require('assert') + +helper('pushes', function (client) { + it('stops pushing data when push() returns false and resumes on _read()', function (done) { + var readable = client.query(new QueryStream('SELECT * FROM generate_series(0, $1)', [500])) + var writable = new Writable({highWaterMark: 100, objectMode: true}) + var shouldPushAgain = true + + readable.original_read = readable._read + readable._read = function (size) { + shouldPushAgain = true + return this.original_read(size) + } + + readable.originalPush = readable.push + readable.push = function (data) { + assert(shouldPushAgain) + shouldPushAgain = this.originalPush(data) + return shouldPushAgain + } + + writable._write = function (chunk, encoding, callback) { + setImmediate(callback) + } + + readable.pipe(writable).on('finish', done) + }) +})