diff --git a/CHANGES.md b/CHANGES.md index 99b258fc..11713757 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Unreleased +- [FIXED] Restore issue with document values containing `\u2028` and `\u2029` on Node.js 24. +- [FIXED] Restore issue with readline pause/resume after close on Node.js 24. +- [FIXED] Write after destroy errors on Node.js 24. - [UPGRADED] `axios` peerDependency to minimum version `1.13.1` to avoid broken `1.13.0` version. +- [NOTE] Updated Node.js version requirement statement for LTS 24. # 2.11.11 (2025-10-20) - [UPGRADED] `@ibm-cloud/cloudant` dependency to version `0.12.10`. diff --git a/Jenkinsfile b/Jenkinsfile index baad94fb..53146701 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -134,7 +134,7 @@ def runTest(version, filter=null, testSuite='test') { pipeline { agent { kubernetes { - yaml kubePodTemplate(name: 'couchbackup.yaml', full_jnlp: 'sdks-pinned-agent:node-22') + yaml kubePodTemplate(name: 'couchbackup.yaml') } } options { diff --git a/includes/liner.js b/includes/liner.js index 43703a64..8ae971af 100644 --- a/includes/liner.js +++ b/includes/liner.js @@ -1,4 +1,4 @@ -// Copyright © 2017, 2024 IBM Corp. All rights reserved. +// Copyright © 2017, 2025 IBM Corp. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,9 +13,8 @@ // limitations under the License. const { createInterface } = require('node:readline'); -const { PassThrough, Duplex } = require('node:stream'); +const { Duplex, PassThrough, Transform } = require('node:stream'); const debug = require('debug'); - /** * A Duplex stream that converts the input stream to a stream * of line objects using the built-in readline interface. @@ -29,19 +28,17 @@ const debug = require('debug'); */ class Liner extends Duplex { // Configure logging - log = debug(('couchbackup:liner')); + log = debug('couchbackup:liner'); // Flag for whether the readline interface is running isRunning = true; + // Flag for whether the readline interface is closed + isClosed = false; // Line number state lineNumber = 0; // Buffer of processed lines lines = []; - // Stream of bytes that will be processed to lines. - inStream = new PassThrough({ objectMode: false }) - // if there is an error destroy this Duplex with it - .on('error', e => this.destroy(e)); - constructor() { + constructor(sanitize = false) { // Configuration of this Duplex: // objectMode: false on the writable input (file chunks), true on the readable output (line objects) // The readableHighWaterMark controls the number of lines buffered after this implementation calls @@ -49,6 +46,25 @@ class Liner extends Duplex { // there is additional buffering downstream and file processing is faster than the network ops // we don't bottleneck here even without a large buffer. super({ readableObjectMode: true, readableHighWaterMark: 0, writableObjectMode: false }); + // Set up the stream of bytes that will be processed to lines. + if (sanitize) { + // Handle unescaped unicode "newlines" by escaping them before passing to readline + this.inStream = new Transform({ + objectMode: false, + transform(chunk, encoding, callback) { + try { + this.push(chunk.toString('utf-8').replaceAll('\u2028', '\\u2028').replaceAll('\u2029', '\\u2029'), 'utf-8'); + callback(); + } catch (e) { + callback(e); + } + } + }); + } else { + this.inStream = new PassThrough({ objectMode: false }); + } + // if there is an error destroy this Duplex with it + this.inStream.on('error', e => this.destroy(e)); // Built-in readline interface over the inStream this.readlineInterface = createInterface({ input: this.inStream, // the writable side of Liner, passed through @@ -60,7 +76,8 @@ class Liner extends Duplex { const bufferedLines = this.lines.push(this.wrapLine(line)); this.log(`Liner processed line ${this.lineNumber}. Buffered lines available: ${bufferedLines}.`); this.pushAvailable(); - }).on('close', () => { + }).once('close', () => { + this.isClosed = true; this.log('Liner readline interface closed.'); // Push null onto our lines buffer to signal EOF to downstream consumers. this.lines.push(null); @@ -87,13 +104,16 @@ class Liner extends Duplex { // Check readline is running flag and whether there is content to push. while (this.isRunning && this.lines.length > 0) { if (!this.push(this.lines.shift())) { + this.log(`Back-pressure from push. Buffered lines available: ${this.lines.length}.`); // Push returned false, this indicates downstream back-pressure. // Pause the readline interface to stop pushing more lines downstream. // Resumption is triggered by downstream calling _read which happens // when it is ready for more data. - this.log(`Liner pausing after back-pressure from push. Buffered lines available: ${this.lines.length}.`); this.isRunning = false; - this.readlineInterface.pause(); + if (!this.isClosed) { + this.log('Liner pausing.'); + this.readlineInterface.pause(); + } break; } else { this.log(`Liner pushed. Buffered lines available: ${this.lines.length}.`); @@ -114,9 +134,11 @@ class Liner extends Duplex { // is called to ensure that pushes are able to happen (and thereby trigger) // subsequent reads. if (!this.isRunning) { - this.log('Liner resuming after read.'); this.isRunning = true; - this.readlineInterface.resume(); + if (!this.isClosed) { + this.log('Liner resuming after read.'); + this.readlineInterface.resume(); + } } this.pushAvailable(); } diff --git a/includes/restore.js b/includes/restore.js index a163611c..83c812d9 100644 --- a/includes/restore.js +++ b/includes/restore.js @@ -50,7 +50,7 @@ module.exports = function(dbClient, options, readstream, ee) { const batchPreparationStreams = [ readstream, // the backup file - new Liner(), // line by line + new Liner(true), // line by line (for Node.js 24 compatibility santize unicode line separators) new MappingStream(restore.backupLineToDocsArray), // convert line to a docs array new BatchingStream(options.bufferSize, true), // make new arrays of the correct buffer size new MappingStream(restore.docsToRestoreBatch) // make a restore batch diff --git a/includes/transforms.js b/includes/transforms.js index 6ae2f046..d3b33213 100644 --- a/includes/transforms.js +++ b/includes/transforms.js @@ -1,4 +1,4 @@ -// Copyright © 2023, 2024 IBM Corp. All rights reserved. +// Copyright © 2023, 2025 IBM Corp. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -187,45 +187,57 @@ class DelegateWritable extends Writable { _write(chunk, encoding, callback) { const toWrite = (this.chunkMapFn) ? this.chunkMapFn(chunk) : chunk; - this.targetWritable.write(toWrite, encoding, (err) => { - if (!err) { - this.log('completed target chunk write'); - if (this.postWriteFn) { - this.postWriteFn(chunk); + if (!this.targetWritable.destroyed) { + this.targetWritable.write(toWrite, encoding, (err) => { + if (!err) { + this.log('completed target chunk write'); + if (this.postWriteFn) { + this.postWriteFn(chunk); + } } - } - callback(err); - }); + callback(err); + }); + } else { + // Avoid write after destroy errors + this.log('supressing write after destroy error'); + callback(); + } } _final(callback) { this.log('Finalizing'); const lastChunk = (this.lastChunkFn && this.lastChunkFn()) || null; - // We can't 'end' stdout, so use a final write instead for that case - if (this.targetWritable === process.stdout) { - // we can't 'write' null, so don't do anything if there is no last chunk - if (lastChunk) { - this.targetWritable.write(lastChunk, 'utf-8', (err) => { + if (!this.targetWritable.destroyed) { + // We can't 'end' stdout, so use a final write instead for that case + if (this.targetWritable === process.stdout) { + // we can't 'write' null, so don't do anything if there is no last chunk + if (lastChunk) { + this.targetWritable.write(lastChunk, 'utf-8', (err) => { + if (!err) { + this.log('wrote last chunk to stdout'); + } else { + this.log('error writing last chunk to stdout'); + } + callback(err); + }); + } else { + this.log('no last chunk to write to stdout'); + callback(); + } + } else { + this.targetWritable.end(lastChunk, 'utf-8', (err) => { if (!err) { - this.log('wrote last chunk to stdout'); + this.log('wrote last chunk and ended target writable'); } else { - this.log('error writing last chunk to stdout'); + this.log('error ending target writable'); } callback(err); }); - } else { - this.log('no last chunk to write to stdout'); - callback(); } } else { - this.targetWritable.end(lastChunk, 'utf-8', (err) => { - if (!err) { - this.log('wrote last chunk and ended target writable'); - } else { - this.log('error ending target writable'); - } - callback(err); - }); + // Avoid write after destroy errors + this.log('supressing write after destroy error'); + callback(); } } } diff --git a/package-lock.json b/package-lock.json index 29cb2d07..d97a72f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29,7 +29,7 @@ "uuid": "13.0.0" }, "engines": { - "node": "^20 || ^22" + "node": "^20 || ^22 || ^24" }, "peerDependencies": { "axios": "^1.13.1", diff --git a/package.json b/package.json index d354f0d5..d3cf52bd 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ }, "license": "Apache-2.0", "engines": { - "node": "^20 || ^22" + "node": "^20 || ^22 || ^24" }, "dependencies": { "@ibm-cloud/cloudant": "0.12.11", diff --git a/test/ci_concurrent_backups.js b/test/ci_concurrent_backups.js index a52d72ac..85a75ad9 100644 --- a/test/ci_concurrent_backups.js +++ b/test/ci_concurrent_backups.js @@ -1,4 +1,4 @@ -// Copyright © 2018, 2023 IBM Corp. All rights reserved. +// Copyright © 2018, 2025 IBM Corp. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,11 +14,12 @@ /* global describe it */ -const fs = require('fs'); +const assert = require('node:assert'); +const fs = require('node:fs'); const { once } = require('node:events'); -const readline = require('readline'); -const u = require('./citestutils.js'); const uuid = require('uuid').v4; +const u = require('./citestutils.js'); +const { Liner } = require('../includes/liner.js'); const params = { useApi: true }; @@ -28,29 +29,14 @@ describe(u.scenario('Concurrent database backups', params), function() { u.setTimeout(this, 900); const checkForEmptyBatches = async function(fileName) { - let foundEmptyBatch = false; - - const rd = readline.createInterface({ - input: fs.createReadStream(fileName), - output: fs.createWriteStream('/dev/null'), - terminal: false - }); - - rd.on('line', function(line) { - if (JSON.parse(line).length === 0) { - // Note: Empty batch arrays indicate that the running backup is - // incorrectly sharing a log file with another ongoing backup job. - foundEmptyBatch = true; - } - }); - - rd.on('close', function() { - if (foundEmptyBatch) { - return Promise.reject(new Error(`Log file '${fileName}' contains empty batches`)); - } else { - return Promise.resolve(); - } - }); + assert.ok(await fs.createReadStream(fileName) // backup file + .pipe(new Liner(true)) // split to lines + .map(linerLine => JSON.parse(linerLine.line)) // parse JSON + .filter(parsedJson => Array.isArray(parsedJson)) // we want batches so filter to arrays + // Note: Empty batch arrays indicate that the running backup is + // incorrectly sharing a log file with another ongoing backup job. + .every(batch => batch.length > 0), + `Backup file ${fileName} contains empty batches.`); }; const backupPromise = async function() { diff --git a/test/liner.js b/test/liner.js index 387d49c3..00abdf32 100644 --- a/test/liner.js +++ b/test/liner.js @@ -16,9 +16,10 @@ const assert = require('node:assert'); const fs = require('node:fs'); +const { versions } = require('node:process'); +const { Readable, Writable } = require('node:stream'); const { pipeline } = require('node:stream/promises'); const { Liner } = require('../includes/liner.js'); -const { Writable } = require('node:stream'); describe('#unit liner', function() { // Use a liner to make the line objects @@ -55,4 +56,21 @@ describe('#unit liner', function() { await pipeline(inputLines, liner, destination); assert.deepStrictEqual(output, expected); }); + + it('should split on unicode separators if not sanitizing', async function() { + // This test will only split on /u2028 and /u2029 in Node.js >=24 + const nodeMajorVersion = parseInt(versions.node.split('.', 2)[0]); + const expectedLines = nodeMajorVersion >= 24 ? ['foo', 'bar', 'foo', 'bar', 'foo'] : ['foo', 'bar', 'foo\u2028bar\u2029foo']; + const input = 'foo\nbar\nfoo\u2028bar\u2029foo'; + const expected = expectedLines.map((e, i) => { return { lineNumber: i + 1, line: e }; }); + await pipeline(Readable.from(input), liner, destination); + assert.deepStrictEqual(output, expected); + }); + + it('should sanitize unicode separators when enabled', async function() { + const expected = ['foo', 'bar', 'foo\\u2028bar\\u2029foo'].map((e, i) => { return { lineNumber: i + 1, line: e }; }); + const input = 'foo\nbar\nfoo\u2028bar\u2029foo'; + await pipeline(Readable.from(input), new Liner(true), destination); + assert.deepStrictEqual(output, expected); + }); });