22* Copyright (c) 2015-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33*/
44
5- var marklogic = require ( '../' ) ;
5+ const marklogic = require ( '../' ) ;
66
7- var testconfig = require ( '../etc/test-config-qa.js' ) ;
7+ const testconfig = require ( '../etc/test-config-qa.js' ) ;
88
99const stream = require ( 'stream' ) ;
1010const { expect } = require ( 'chai' ) ;
11+ const { pipeline } = require ( 'stream/promises' ) ;
1112
12- var memStore = { } ;
13+ const memStore = { } ;
14+ const dbWriter = marklogic . createDatabaseClient ( testconfig . dmsdkrestWriterConnection ) ;
15+ const inputJsonUris = [ ] ;
16+ const inputContents = [ ] ;
1317
14- var uriStream = new stream . Readable ( ) ;
15- var dbWriter = marklogic . createDatabaseClient ( testconfig . dmsdkrestWriterConnection ) ;
16- let inputJsonUris = [ ] ;
17- let inputContents = [ ] ;
18+ let uriStream = new stream . Readable ( ) ;
19+
20+ const TOTAL_DOCS = 1000 ;
1821
1922/*
2023 Based on example from
@@ -42,9 +45,9 @@ class MLQASnapshotTransform extends stream.Transform {
4245 // Filter what we need and push. We will verify only 900.json piped from ReadAll
4346 if ( chunk . uri === this . docId ) {
4447 //Push transformed content onto the stream with changed key names such as Matched ID and Matched Name
45- var currId = chunk . content . id ;
46- var currName = chunk . content . name ;
47- var retStr = 'Matched ID:' + currId + ', Matched Name:' + currName ;
48+ let currId = chunk . content . id ;
49+ let currName = chunk . content . name ;
50+ let retStr = 'Matched ID:' + currId + ', Matched Name:' + currName ;
4851 this . push ( retStr ) ;
4952 }
5053 return setImmediate ( callback ) ;
@@ -68,20 +71,21 @@ class MLQAWritableStream extends stream.Writable {
6871 }
6972
7073 _write ( chunk , encoding , callback ) {
71- var buffer = ( Buffer . isBuffer ( chunk ) ) ?
74+ let buffer = ( Buffer . isBuffer ( chunk ) ) ?
7275 chunk : // already is Buffer use it
73- new Buffer ( chunk , encoding ) ;
76+ Buffer . from ( chunk , encoding ) ;
7477 memStore [ this . key ] = Buffer . concat ( [ memStore [ this . key ] , buffer ] ) ;
7578 return setImmediate ( callback ) ;
7679 }
7780}
7881
7982describe ( 'Update doc and readAll with Snapshot' , function ( ) {
80- before ( function ( done ) {
81- this . timeout ( 50000 ) ;
82- var jsonDocreadable = new stream . Readable ( { objectMode : true } ) ;
8383
84- for ( let i = 0 ; i < 1000 ; i ++ ) {
84+ before ( async function ( ) {
85+
86+ const jsonDocreadable = new stream . Readable ( { objectMode : true } ) ;
87+
88+ for ( let i = 0 ; i < TOTAL_DOCS ; i ++ ) {
8589 const tempJson = {
8690 uri : '/data/dmsdk/Snap-update-then-readall/' + i + '.json' ,
8791 contentType : 'application/json' ,
@@ -93,70 +97,65 @@ describe('Update doc and readAll with Snapshot', function () {
9397 inputContents . push ( tempJson . content ) ;
9498 }
9599 jsonDocreadable . push ( null ) ;
96- dbWriter . documents . writeAll ( jsonDocreadable , {
97- onCompletion : ( ( summary ) => {
98- setTimeout ( ( ) => {
99- var i = 0 ; i ++ ;
100- } , 1000 ) ;
101- summary . docsWrittenSuccessfully . should . be . greaterThanOrEqual ( 1000 ) ;
100+
101+ // trick (suggested by copilot, works):
102+ // use Promise.all and two promises: writeAll is first and onCompletion
103+ // callback resolves the second, sending the summary object.
104+ const [ result , summary ] = await Promise . all ( [
105+ dbWriter . documents . writeAll ( jsonDocreadable , {
106+ onCompletion : ( summary ) => {
107+ summaryPromiseResolve ( summary ) ;
108+ }
109+ } ) ,
110+ new Promise ( resolve => {
111+ summaryPromiseResolve = resolve ;
102112 } )
103- } ) ; // End of pipe to writeAll
104- // Use uriStream as the input to readAll()
113+ ] ) ;
114+ expect ( summary . docsWrittenSuccessfully ) . to . be . greaterThanOrEqual ( 1000 ) ;
115+
105116 uriStream = new stream . PassThrough ( { objectMode : true } ) ;
106117 inputJsonUris . forEach ( uri => uriStream . push ( uri ) ) ;
107118 uriStream . push ( null ) ;
108- // wait for DB to finish writing
109- setTimeout ( ( ) => {
110- done ( ) ;
111- } , 10000 ) ;
112119 } ) ;
113120
114- after ( ( function ( done ) {
115- this . timeout ( 10000 ) ;
116-
117- dbWriter . documents . remove ( inputJsonUris )
118- . result ( function ( response ) {
119- done ( ) ;
120- } )
121- . catch ( err => done ( err ) )
122- . catch ( done ) ;
123- } ) ) ;
121+ after ( async function ( ) {
122+ await dbWriter . documents . remove ( inputJsonUris ) . result ( ) ;
123+ } ) ;
124124
125125 // This test updates an existing doc and then performs readAll
126- it ( 'update a doc and readAll with snapshot' , function ( done ) {
127- this . timeout ( 30000 ) ;
126+ it ( 'update a doc and readAll with snapshot' , async function ( ) {
127+
128128 // Used in test that updates doc and then does readAll
129129 const UpdBeforeReadAllUriName = '/data/dmsdk/Snap-update-then-readall/900.json' ;
130130
131131 const filteredSnapshot = new MLQASnapshotTransform ( UpdBeforeReadAllUriName , { objectMode : true } ) ;
132132
133- setTimeout ( ( ) => {
134- var i = 0 ; i ++ ;
135- } , 3000 ) ;
136133 // Initiate a document change on doc id 900.
137- dbWriter . documents . write ( {
134+ const writeResponse = await dbWriter . documents . write ( {
138135 uri : UpdBeforeReadAllUriName ,
139136 collections : [ 'coll5' , 'coll6' ] ,
140137 contentType : 'application/json' ,
141138 quality : 250 ,
142139 properties : { prop1 : 'bar' , prop2 : 1981 } ,
143- content : { id : 88 , name : 'David' }
144- } ) ;
145- // Expected result
140+ content : { id : 88 , name : 'David' } ,
141+ } ) . result ( ) ;
142+
143+ // Updated doc should be in db now.
146144 var exptdResult = 'Matched ID:88, Matched Name:David' ;
147145 var mlqawstream = new MLQAWritableStream ( 'before' ) ;
148- // Have listeners before calling pipe.
149- setTimeout ( ( ) => {
150- var i = 0 ; i ++ ;
151- } , 3000 ) ;
152- mlqawstream . on ( 'finish' , function ( ) {
153- expect ( memStore . before . toString ( ) ) . to . equal ( exptdResult ) ;
154- } ) ;
155- dbWriter . documents . readAll ( uriStream , {
156- inputkind : 'Array' ,
157- consistentSnapshot : true ,
158- batch : 50
159- } ) . pipe ( filteredSnapshot ) . pipe ( mlqawstream ) ; /* Add.pipe(process.stdout) to debug */
160- done ( ) ;
146+
147+ // Use pipeline with await to read and confirm, much cleaner and understandable.
148+ await pipeline (
149+ dbWriter . documents . readAll ( uriStream , {
150+ inputkind : 'Array' ,
151+ consistentSnapshot : true ,
152+ batch : 50
153+ } ) ,
154+ filteredSnapshot ,
155+ mlqawstream
156+ ) ;
157+
158+ // confirm we wrote correct stream to memStore in mlqawstream
159+ expect ( memStore . before . toString ( ) ) . to . equal ( exptdResult ) ;
161160 } ) ;
162161} ) ;
0 commit comments