diff --git a/test-complete/nodejs-dmsdk-UpdAndRdAll.js b/test-complete/nodejs-dmsdk-UpdAndRdAll.js index ecb4677a..c7dcb077 100644 --- a/test-complete/nodejs-dmsdk-UpdAndRdAll.js +++ b/test-complete/nodejs-dmsdk-UpdAndRdAll.js @@ -2,19 +2,22 @@ * Copyright (c) 2015-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. */ -var marklogic = require('../'); +const marklogic = require('../'); -var testconfig = require('../etc/test-config-qa.js'); +const testconfig = require('../etc/test-config-qa.js'); const stream = require('stream'); const { expect } = require('chai'); +const { pipeline } = require('stream/promises'); -var memStore = { }; +const memStore = { }; +const dbWriter = marklogic.createDatabaseClient(testconfig.dmsdkrestWriterConnection); +const inputJsonUris = []; +const inputContents = []; -var uriStream = new stream.Readable(); -var dbWriter = marklogic.createDatabaseClient(testconfig.dmsdkrestWriterConnection); -let inputJsonUris = []; -let inputContents = []; +let uriStream = new stream.Readable(); + +const TOTAL_DOCS = 1000; /* Based on example from @@ -42,9 +45,9 @@ class MLQASnapshotTransform extends stream.Transform { // Filter what we need and push. We will verify only 900.json piped from ReadAll if (chunk.uri === this.docId) { //Push transformed content onto the stream with changed key names such as Matched ID and Matched Name - var currId = chunk.content.id; - var currName = chunk.content.name; - var retStr = 'Matched ID:' + currId + ', Matched Name:' + currName; + let currId = chunk.content.id; + let currName = chunk.content.name; + let retStr = 'Matched ID:' + currId + ', Matched Name:' + currName; this.push(retStr); } return setImmediate(callback); @@ -68,20 +71,21 @@ class MLQAWritableStream extends stream.Writable { } _write(chunk, encoding, callback) { - var buffer = (Buffer.isBuffer(chunk)) ? + let buffer = (Buffer.isBuffer(chunk)) ? chunk : // already is Buffer use it - new Buffer(chunk, encoding); + Buffer.from(chunk, encoding); memStore[this.key] = Buffer.concat([memStore[this.key], buffer]); return setImmediate(callback); } } describe('Update doc and readAll with Snapshot', function () { - before(function (done) { - this.timeout(50000); - var jsonDocreadable = new stream.Readable({ objectMode: true }); - for (let i = 0; i < 1000; i++) { + before(async function () { + + const jsonDocreadable = new stream.Readable({ objectMode: true }); + + for (let i = 0; i < TOTAL_DOCS; i++) { const tempJson = { uri: '/data/dmsdk/Snap-update-then-readall/' + i + '.json', contentType: 'application/json', @@ -93,70 +97,67 @@ describe('Update doc and readAll with Snapshot', function () { inputContents.push(tempJson.content); } jsonDocreadable.push(null); - dbWriter.documents.writeAll(jsonDocreadable, { - onCompletion: ((summary) => { - setTimeout(() => { - var i = 0; i++; - }, 1000); - summary.docsWrittenSuccessfully.should.be.greaterThanOrEqual(1000); + + let summaryPromiseResolve; + + // The following pattern uses Promise.all to coordinate the completion of the writeAll operation and its onCompletion callback. + // The first promise initiates the writeAll process, while the second promise is resolved by the onCompletion callback with the summary object. + // This ensures that both the write operation and its completion summary are available before proceeding. + const [result, summary] = await Promise.all([ + dbWriter.documents.writeAll(jsonDocreadable, { + onCompletion: (summary) => { + summaryPromiseResolve(summary); + } + }), + new Promise(resolve => { + summaryPromiseResolve = resolve; }) - }); // End of pipe to writeAll - // Use uriStream as the input to readAll() + ]); + expect(summary.docsWrittenSuccessfully).to.be.greaterThanOrEqual(1000); + uriStream = new stream.PassThrough({ objectMode: true }); inputJsonUris.forEach(uri => uriStream.push(uri)); uriStream.push(null); - // wait for DB to finish writing - setTimeout(() => { - done(); - }, 10000); }); - after((function (done) { - this.timeout(10000); - - dbWriter.documents.remove(inputJsonUris) - .result(function (response) { - done(); - }) - .catch(err => done(err)) - .catch(done); - })); + after(async function () { + await dbWriter.documents.remove(inputJsonUris).result(); + }); // This test updates an existing doc and then performs readAll - it('update a doc and readAll with snapshot', function (done) { - this.timeout(30000); + it('update a doc and readAll with snapshot', async function () { + // Used in test that updates doc and then does readAll const UpdBeforeReadAllUriName = '/data/dmsdk/Snap-update-then-readall/900.json'; const filteredSnapshot = new MLQASnapshotTransform(UpdBeforeReadAllUriName, { objectMode: true }); - setTimeout(() => { - var i = 0; i++; - }, 3000); // Initiate a document change on doc id 900. - dbWriter.documents.write({ + const writeResponse = await dbWriter.documents.write({ uri: UpdBeforeReadAllUriName, collections: ['coll5', 'coll6'], contentType: 'application/json', quality: 250, properties: { prop1: 'bar', prop2: 1981 }, - content: { id: 88, name: 'David' } - }); - // Expected result + content: { id: 88, name: 'David' }, + }).result(); + + // Updated doc should be in db now. var exptdResult = 'Matched ID:88, Matched Name:David'; var mlqawstream = new MLQAWritableStream('before'); - // Have listeners before calling pipe. - setTimeout(() => { - var i = 0; i++; - }, 3000); - mlqawstream.on('finish', function () { - expect(memStore.before.toString()).to.equal(exptdResult); - }); - dbWriter.documents.readAll(uriStream, { - inputkind: 'Array', - consistentSnapshot: true, - batch: 50 - }).pipe(filteredSnapshot).pipe(mlqawstream);/* Add.pipe(process.stdout) to debug */ - done(); + + // Use pipeline with await to read and confirm, much cleaner and understandable. + await pipeline( + dbWriter.documents.readAll(uriStream, { + inputkind: 'Array', + consistentSnapshot: true, + batch: 50 + }), + filteredSnapshot, + mlqawstream + ); + + // confirm we wrote correct stream to memStore in mlqawstream + expect(memStore.before.toString()).to.equal(exptdResult); }); }); diff --git a/test-complete/nodejs-dmsdk-queryToTransformAll.js b/test-complete/nodejs-dmsdk-queryToTransformAll.js index 1ff17d77..a348f704 100644 --- a/test-complete/nodejs-dmsdk-queryToTransformAll.js +++ b/test-complete/nodejs-dmsdk-queryToTransformAll.js @@ -19,27 +19,23 @@ const q = marklogic.queryBuilder; const query = q.where(ctsQb.cts.directoryQuery('/test/dataMovement/requests/transformAll/')); describe('data movement transformAll - nodejs-dmsdk-queryToTransformAll', function () { - this.timeout(20000); - before(function (done) { - restAdminDB.config.transforms.write(transformName, 'javascript', fs.createReadStream(transformPath)) - .result(() => { - for (let i = 0; i < 100; i++) { - uris.push('/test/dataMovement/requests/transformAll/' + i + '.json'); - } - }) - .then(() => done()) - .catch(error => done(error)); + before(async function () { + await restAdminDB.config.transforms.write(transformName, 'javascript', + fs.createReadStream(transformPath)).result(); + for (let i = 0; i < 100; i++) { + uris.push('/test/dataMovement/requests/transformAll/' + i + '.json'); + } }); - beforeEach(function (done) { + beforeEach(async function () { let readable = new Stream.Readable({ objectMode: true }); transformStream = new Stream.PassThrough({ objectMode: true }); for (let i = 0; i < 100; i++) { const temp = { uri: '/test/dataMovement/requests/transformAll/' + i + '.json', contentType: 'application/json', - content: { ['key']: 'initialValue' } + content: { key: 'initialValue' } }; readable.push(temp); transformStream.push(temp.uri); @@ -47,244 +43,261 @@ describe('data movement transformAll - nodejs-dmsdk-queryToTransformAll', functi readable.push(null); transformStream.push(null); - dbWriter.documents.writeAll(readable, { - onCompletion: ((summary) => { - done(); - }) + return new Promise((resolve, reject) => { + dbWriter.documents.writeAll(readable, { + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); + } + }); }); - }); - afterEach((function (done) { - dbWriter.documents.remove(uris) - .result(function (response) { - done(); - }) - .catch(err => done(err)) - .catch(done); - })); - - it('should queryToTransformAll documents with onCompletion, transform, concurrentRequests and transformStrategy as ignore', - function (done) { + afterEach( async function () { + await dbWriter.documents.remove(uris).result(); + }); + it('should queryToTransformAll documents with onCompletion, transform, concurrentRequests and transformStrategy as ignore', async function () { + const summary = await new Promise((resolve, reject) => { dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, transformStrategy: 'ignore', onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('initialValue', done); - } catch (err) { - done(err); - } - }) + resolve(summary); + }), + onError: (error) => { + reject(error); + } }); }); - it('should transformAll documents with transformStrategy as ignore', function (done) { - - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, - transformStrategy: 'ignore', - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('initialValue', done); - } catch (err) { - done(err); - } - }) - }); + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('initialValue'); }); - it('should work with query and onCompletion function', function (done) { - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + it('should transformAll documents with transformStrategy as ignore', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + transformStrategy: 'ignore', + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('initialValue'); }); - it('should work with query and onCompletion function and batchSize', function (done) { - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - batchSize: 10, - onBatchSuccess: (function (progress, documents) { - try { - progress.docsTransformedSuccessfully.should.be.greaterThanOrEqual(10); - progress.docsFailedToBeTransformed.should.be.equal(0); - progress.timeElapsed.should.be.greaterThanOrEqual(0); - } catch (err) { - done(err); + it('should work with query and onCompletion function', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } + }); + }); - }), - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); + }); + + it('should work with query and onCompletion function and batchSize', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + batchSize: 10, + onBatchSuccess: ((progress) => { + try { + progress.docsFailedToBeTransformed.should.be.equal(0); + progress.timeElapsed.should.be.greaterThanOrEqual(0); + } catch (err) { + reject(err); + } + }), + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } }) }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); }); - it('should transformAll documents with onCompletion, concurrentRequests and transform options', function (done) { - - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + it('should transformAll documents with onCompletion, concurrentRequests and transform options', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + concurrentRequests: { multipleOf: 'hosts', multiplier: 4 }, + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); - }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); - it('should transformAll documents with inputKind as array', function (done) { + await verifyDocs('transformedValue'); + }); + it('should transformAll documents with inputKind as array', async function () { transformStream = new Stream.Readable({ objectMode: true }); for (let i = 0; i + 10 <= uris.length; i = i + 10) { transformStream.push(uris.slice(i, i + 10)); } transformStream.push(null); - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - inputKind: 'aRRaY', - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (error) { - done(error); + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + inputKind: 'aRRaY', + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); - }); - it('should queryToTransformAll documents with onCompletion option', function (done) { + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); + }); - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - onCompletion: ((summary) => { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - }) + it('should queryToTransformAll documents with onCompletion option', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); + } + }); }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); }); - it('should work with batchSize less than 1', function (done) { - dbWriter.documents.queryToTransformAll(query, { - transform: [transformName, { newValue: 'transformedValue' }], - batchSize: 0, - onCompletion: ((summary) => { - try { - summary.docsTransformedSuccessfully.should.be.equal(100); - summary.docsFailedToBeTransformed.should.be.equal(0); - summary.timeElapsed.should.be.greaterThanOrEqual(0); - verifyDocs('transformedValue', done); - } catch (err) { - done(err); + it('should work with batchSize less than 1', async function () { + const summary = await new Promise((resolve, reject) => { + dbWriter.documents.queryToTransformAll(query, { + transform: [transformName, { newValue: 'transformedValue' }], + batchSize: 0, + onCompletion: ((summary) => { + resolve(summary); + }), + onError: (error) => { + reject(error); } - }) + }); }); + + summary.docsTransformedSuccessfully.should.be.equal(100); + summary.docsFailedToBeTransformed.should.be.equal(0); + summary.timeElapsed.should.be.greaterThanOrEqual(0); + + await verifyDocs('transformedValue'); }); - it('should throw error with no query', function (done) { + it('should throw error with no query', async function () { try { - dbWriter.documents.queryToTransformAll('invalid query', {}); + await dbWriter.documents.queryToTransformAll('invalid query', {}); } catch (err) { err.toString().should.equal('Error: Query needs to be a cts query.'); - done(); } }); - it('should throw error with null query', function (done) { + it('should throw error with null query', async function () { try { dbWriter.documents.queryToTransformAll(null, { transform: [transformName, { newValue: 'transformedValue' }], }); } catch (err) { err.toString().should.equal('Error: Query cannot be null or undefined.'); - done(); } }); - it('should throw error with onInitialTimestamp and wrong consistentSnapshot', function (done) { + it('should throw error with onInitialTimestamp and wrong consistentSnapshot', async function () { try { - dbWriter.documents.queryToTransformAll(query, { + await dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], onInitialTimestamp: '1667222674', consistentSnapshot: false, }); } catch (err) { err.toString().should.equal('Error: consistentSnapshot needs to be true when onInitialTimestamp is provided.'); - done(); } }); - it('should throw error with consistentSnapshot another type', function (done) { + it('should throw error with consistentSnapshot another type', async function () { try { - dbWriter.documents.queryToTransformAll(query, { + await dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], consistentSnapshot: 'true', }); } catch (err) { err.toString().should.equal('Error: consistentSnapshot needs to be a boolean or DatabaseClient.Timestamp object.'); - done(); } }); - it('should throw error with batchSize greater than 100000', function (done) { + it('should throw error with batchSize greater than 100000', async function () { try { - dbWriter.documents.queryToTransformAll(query, { + await dbWriter.documents.queryToTransformAll(query, { transform: [transformName, { newValue: 'transformedValue' }], batchSize: 1000000, }); } catch (err) { err.toString().should.equal('Error: batchSize cannot be greater than 100000'); - done(); } }); }); -function verifyDocs(value, done) { - dbWriter.documents.read(uris) - .result(function (documents) { - documents.length.should.equal(100); - for (let i = 0; i < documents.length; i++) { - documents[0].content.key.should.equal(value); - } - }) - .then(() => done()) - .catch(err => done(err)); +async function verifyDocs(value) { + const documents = await dbWriter.documents.read(uris).result(); + documents.length.should.equal(100); + for (let i = 0; i < documents.length; i++) { + documents[i].content.key.should.equal(value); + } } \ No newline at end of file