Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 61 additions & 60 deletions test-complete/nodejs-dmsdk-UpdAndRdAll.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@
* Copyright (c) 2015-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/

var marklogic = require('../');
const marklogic = require('../');

var testconfig = require('../etc/test-config-qa.js');
const testconfig = require('../etc/test-config-qa.js');

const stream = require('stream');
const { expect } = require('chai');
const { pipeline } = require('stream/promises');

var memStore = { };
const memStore = { };
const dbWriter = marklogic.createDatabaseClient(testconfig.dmsdkrestWriterConnection);
const inputJsonUris = [];
const inputContents = [];

var uriStream = new stream.Readable();
var dbWriter = marklogic.createDatabaseClient(testconfig.dmsdkrestWriterConnection);
let inputJsonUris = [];
let inputContents = [];
let uriStream = new stream.Readable();

const TOTAL_DOCS = 1000;

/*
Based on example from
Expand Down Expand Up @@ -42,9 +45,9 @@ class MLQASnapshotTransform extends stream.Transform {
// Filter what we need and push. We will verify only 900.json piped from ReadAll
if (chunk.uri === this.docId) {
//Push transformed content onto the stream with changed key names such as Matched ID and Matched Name
var currId = chunk.content.id;
var currName = chunk.content.name;
var retStr = 'Matched ID:' + currId + ', Matched Name:' + currName;
let currId = chunk.content.id;
let currName = chunk.content.name;
let retStr = 'Matched ID:' + currId + ', Matched Name:' + currName;
this.push(retStr);
}
return setImmediate(callback);
Expand All @@ -68,20 +71,21 @@ class MLQAWritableStream extends stream.Writable {
}

_write(chunk, encoding, callback) {
var buffer = (Buffer.isBuffer(chunk)) ?
let buffer = (Buffer.isBuffer(chunk)) ?
chunk : // already is Buffer use it
new Buffer(chunk, encoding);
Buffer.from(chunk, encoding);
memStore[this.key] = Buffer.concat([memStore[this.key], buffer]);
return setImmediate(callback);
}
}

describe('Update doc and readAll with Snapshot', function () {
before(function (done) {
this.timeout(50000);
var jsonDocreadable = new stream.Readable({ objectMode: true });

for (let i = 0; i < 1000; i++) {
before(async function () {

const jsonDocreadable = new stream.Readable({ objectMode: true });

for (let i = 0; i < TOTAL_DOCS; i++) {
const tempJson = {
uri: '/data/dmsdk/Snap-update-then-readall/' + i + '.json',
contentType: 'application/json',
Expand All @@ -93,70 +97,67 @@ describe('Update doc and readAll with Snapshot', function () {
inputContents.push(tempJson.content);
}
jsonDocreadable.push(null);
dbWriter.documents.writeAll(jsonDocreadable, {
onCompletion: ((summary) => {
setTimeout(() => {
var i = 0; i++;
}, 1000);
summary.docsWrittenSuccessfully.should.be.greaterThanOrEqual(1000);

let summaryPromiseResolve;

// The following pattern uses Promise.all to coordinate the completion of the writeAll operation and its onCompletion callback.
// The first promise initiates the writeAll process, while the second promise is resolved by the onCompletion callback with the summary object.
// This ensures that both the write operation and its completion summary are available before proceeding.
const [result, summary] = await Promise.all([
dbWriter.documents.writeAll(jsonDocreadable, {
onCompletion: (summary) => {
summaryPromiseResolve(summary);
}
}),
new Promise(resolve => {
summaryPromiseResolve = resolve;
})
}); // End of pipe to writeAll
// Use uriStream as the input to readAll()
]);
expect(summary.docsWrittenSuccessfully).to.be.greaterThanOrEqual(1000);

uriStream = new stream.PassThrough({ objectMode: true });
inputJsonUris.forEach(uri => uriStream.push(uri));
uriStream.push(null);
// wait for DB to finish writing
setTimeout(() => {
done();
}, 10000);
});

after((function (done) {
this.timeout(10000);

dbWriter.documents.remove(inputJsonUris)
.result(function (response) {
done();
})
.catch(err => done(err))
.catch(done);
}));
after(async function () {
await dbWriter.documents.remove(inputJsonUris).result();
});

// This test updates an existing doc and then performs readAll
it('update a doc and readAll with snapshot', function (done) {
this.timeout(30000);
it('update a doc and readAll with snapshot', async function () {

// Used in test that updates doc and then does readAll
const UpdBeforeReadAllUriName = '/data/dmsdk/Snap-update-then-readall/900.json';

const filteredSnapshot = new MLQASnapshotTransform(UpdBeforeReadAllUriName, { objectMode: true });

setTimeout(() => {
var i = 0; i++;
}, 3000);
// Initiate a document change on doc id 900.
dbWriter.documents.write({
const writeResponse = await dbWriter.documents.write({
uri: UpdBeforeReadAllUriName,
collections: ['coll5', 'coll6'],
contentType: 'application/json',
quality: 250,
properties: { prop1: 'bar', prop2: 1981 },
content: { id: 88, name: 'David' }
});
// Expected result
content: { id: 88, name: 'David' },
}).result();

// Updated doc should be in db now.
var exptdResult = 'Matched ID:88, Matched Name:David';
var mlqawstream = new MLQAWritableStream('before');
// Have listeners before calling pipe.
setTimeout(() => {
var i = 0; i++;
}, 3000);
mlqawstream.on('finish', function () {
expect(memStore.before.toString()).to.equal(exptdResult);
});
dbWriter.documents.readAll(uriStream, {
inputkind: 'Array',
consistentSnapshot: true,
batch: 50
}).pipe(filteredSnapshot).pipe(mlqawstream);/* Add.pipe(process.stdout) to debug */
done();

// Use pipeline with await to read and confirm, much cleaner and understandable.
await pipeline(
dbWriter.documents.readAll(uriStream, {
inputkind: 'Array',
consistentSnapshot: true,
batch: 50
}),
filteredSnapshot,
mlqawstream
);

// confirm we wrote correct stream to memStore in mlqawstream
expect(memStore.before.toString()).to.equal(exptdResult);
});
});
Loading