22* Copyright (c) 2015-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33*/
44
5- var marklogic = require ( '../' ) ;
5+ const marklogic = require ( '../' ) ;
66
7- var testconfig = require ( '../etc/test-config-qa.js' ) ;
7+ const testconfig = require ( '../etc/test-config-qa.js' ) ;
88
99const stream = require ( 'stream' ) ;
1010const { expect } = require ( 'chai' ) ;
11+ const { pipeline } = require ( 'stream/promises' ) ;
1112
12- var memStore = { } ;
13+ const memStore = { } ;
14+ const dbWriter = marklogic . createDatabaseClient ( testconfig . dmsdkrestWriterConnection ) ;
15+ const inputJsonUris = [ ] ;
16+ const inputContents = [ ] ;
1317
14- var uriStream = new stream . Readable ( ) ;
15- var dbWriter = marklogic . createDatabaseClient ( testconfig . dmsdkrestWriterConnection ) ;
16- let inputJsonUris = [ ] ;
17- let inputContents = [ ] ;
18+ let uriStream = new stream . Readable ( ) ;
19+
20+ const TOTAL_DOCS = 1000 ;
1821
1922/*
2023 Based on example from
@@ -42,9 +45,9 @@ class MLQASnapshotTransform extends stream.Transform {
4245 // Filter what we need and push. We will verify only 900.json piped from ReadAll
4346 if ( chunk . uri === this . docId ) {
4447 //Push transformed content onto the stream with changed key names such as Matched ID and Matched Name
45- var currId = chunk . content . id ;
46- var currName = chunk . content . name ;
47- var retStr = 'Matched ID:' + currId + ', Matched Name:' + currName ;
48+ let currId = chunk . content . id ;
49+ let currName = chunk . content . name ;
50+ let retStr = 'Matched ID:' + currId + ', Matched Name:' + currName ;
4851 this . push ( retStr ) ;
4952 }
5053 return setImmediate ( callback ) ;
@@ -68,20 +71,21 @@ class MLQAWritableStream extends stream.Writable {
6871 }
6972
7073 _write ( chunk , encoding , callback ) {
71- var buffer = ( Buffer . isBuffer ( chunk ) ) ?
74+ let buffer = ( Buffer . isBuffer ( chunk ) ) ?
7275 chunk : // already is Buffer use it
73- new Buffer ( chunk , encoding ) ;
76+ Buffer . from ( chunk , encoding ) ;
7477 memStore [ this . key ] = Buffer . concat ( [ memStore [ this . key ] , buffer ] ) ;
7578 return setImmediate ( callback ) ;
7679 }
7780}
7881
7982describe ( 'Update doc and readAll with Snapshot' , function ( ) {
80- before ( function ( done ) {
81- this . timeout ( 50000 ) ;
82- var jsonDocreadable = new stream . Readable ( { objectMode : true } ) ;
8383
84- for ( let i = 0 ; i < 1000 ; i ++ ) {
84+ before ( async function ( ) {
85+
86+ const jsonDocreadable = new stream . Readable ( { objectMode : true } ) ;
87+
88+ for ( let i = 0 ; i < TOTAL_DOCS ; i ++ ) {
8589 const tempJson = {
8690 uri : '/data/dmsdk/Snap-update-then-readall/' + i + '.json' ,
8791 contentType : 'application/json' ,
@@ -93,70 +97,67 @@ describe('Update doc and readAll with Snapshot', function () {
9397 inputContents . push ( tempJson . content ) ;
9498 }
9599 jsonDocreadable . push ( null ) ;
96- dbWriter . documents . writeAll ( jsonDocreadable , {
97- onCompletion : ( ( summary ) => {
98- setTimeout ( ( ) => {
99- var i = 0 ; i ++ ;
100- } , 1000 ) ;
101- summary . docsWrittenSuccessfully . should . be . greaterThanOrEqual ( 1000 ) ;
100+
101+ let summaryPromiseResolve ;
102+
103+ // The following pattern uses Promise.all to coordinate the completion of the writeAll operation and its onCompletion callback.
104+ // The first promise initiates the writeAll process, while the second promise is resolved by the onCompletion callback with the summary object.
105+ // This ensures that both the write operation and its completion summary are available before proceeding.
106+ const [ result , summary ] = await Promise . all ( [
107+ dbWriter . documents . writeAll ( jsonDocreadable , {
108+ onCompletion : ( summary ) => {
109+ summaryPromiseResolve ( summary ) ;
110+ }
111+ } ) ,
112+ new Promise ( resolve => {
113+ summaryPromiseResolve = resolve ;
102114 } )
103- } ) ; // End of pipe to writeAll
104- // Use uriStream as the input to readAll()
115+ ] ) ;
116+ expect ( summary . docsWrittenSuccessfully ) . to . be . greaterThanOrEqual ( 1000 ) ;
117+
105118 uriStream = new stream . PassThrough ( { objectMode : true } ) ;
106119 inputJsonUris . forEach ( uri => uriStream . push ( uri ) ) ;
107120 uriStream . push ( null ) ;
108- // wait for DB to finish writing
109- setTimeout ( ( ) => {
110- done ( ) ;
111- } , 10000 ) ;
112121 } ) ;
113122
114- after ( ( function ( done ) {
115- this . timeout ( 10000 ) ;
116-
117- dbWriter . documents . remove ( inputJsonUris )
118- . result ( function ( response ) {
119- done ( ) ;
120- } )
121- . catch ( err => done ( err ) )
122- . catch ( done ) ;
123- } ) ) ;
123+ after ( async function ( ) {
124+ await dbWriter . documents . remove ( inputJsonUris ) . result ( ) ;
125+ } ) ;
124126
125127 // This test updates an existing doc and then performs readAll
126- it ( 'update a doc and readAll with snapshot' , function ( done ) {
127- this . timeout ( 30000 ) ;
128+ it ( 'update a doc and readAll with snapshot' , async function ( ) {
129+
128130 // Used in test that updates doc and then does readAll
129131 const UpdBeforeReadAllUriName = '/data/dmsdk/Snap-update-then-readall/900.json' ;
130132
131133 const filteredSnapshot = new MLQASnapshotTransform ( UpdBeforeReadAllUriName , { objectMode : true } ) ;
132134
133- setTimeout ( ( ) => {
134- var i = 0 ; i ++ ;
135- } , 3000 ) ;
136135 // Initiate a document change on doc id 900.
137- dbWriter . documents . write ( {
136+ const writeResponse = await dbWriter . documents . write ( {
138137 uri : UpdBeforeReadAllUriName ,
139138 collections : [ 'coll5' , 'coll6' ] ,
140139 contentType : 'application/json' ,
141140 quality : 250 ,
142141 properties : { prop1 : 'bar' , prop2 : 1981 } ,
143- content : { id : 88 , name : 'David' }
144- } ) ;
145- // Expected result
142+ content : { id : 88 , name : 'David' } ,
143+ } ) . result ( ) ;
144+
145+ // Updated doc should be in db now.
146146 var exptdResult = 'Matched ID:88, Matched Name:David' ;
147147 var mlqawstream = new MLQAWritableStream ( 'before' ) ;
148- // Have listeners before calling pipe.
149- setTimeout ( ( ) => {
150- var i = 0 ; i ++ ;
151- } , 3000 ) ;
152- mlqawstream . on ( 'finish' , function ( ) {
153- expect ( memStore . before . toString ( ) ) . to . equal ( exptdResult ) ;
154- } ) ;
155- dbWriter . documents . readAll ( uriStream , {
156- inputkind : 'Array' ,
157- consistentSnapshot : true ,
158- batch : 50
159- } ) . pipe ( filteredSnapshot ) . pipe ( mlqawstream ) ; /* Add.pipe(process.stdout) to debug */
160- done ( ) ;
148+
149+ // Use pipeline with await to read and confirm, much cleaner and understandable.
150+ await pipeline (
151+ dbWriter . documents . readAll ( uriStream , {
152+ inputkind : 'Array' ,
153+ consistentSnapshot : true ,
154+ batch : 50
155+ } ) ,
156+ filteredSnapshot ,
157+ mlqawstream
158+ ) ;
159+
160+ // confirm we wrote correct stream to memStore in mlqawstream
161+ expect ( memStore . before . toString ( ) ) . to . equal ( exptdResult ) ;
161162 } ) ;
162163} ) ;
0 commit comments