@@ -2260,6 +2260,7 @@ Documents.prototype.suggest = function suggestDocuments() {
22602260 * defaultMetadata and transform.
22612261 * @method documents#writeAll
22622262 * @since 2.8.0
2263+ * @param {stream } [stream] - stream containing the input.
22632264 * @param {object } [options] - Configures the write operation.
22642265 * @param {function(progress, documents) } [onBatchSuccess] - A callback that can notify other systems about batches
22652266 * written successfully.
@@ -2275,18 +2276,17 @@ Documents.prototype.suggest = function suggestDocuments() {
22752276 * @returns {stream.Writable } - a stream.Writable in object mode that receives document descriptor input from the
22762277 * application for writing to the database.
22772278 */
2278- Documents . prototype . writeAll = function writeAllDocuments ( options ) {
2279+ Documents . prototype . writeAll = function writeAllDocuments ( stream , options ) {
22792280 return writeAllDocumentsImpl . call (
2280- this , options
2281+ this , stream , options
22812282 ) ;
22822283} ;
2283- function writeAllDocumentsImpl ( jobOptions ) {
2284+ function writeAllDocumentsImpl ( inputStream , jobOptions ) {
22842285
22852286 let path = '/v1/internal/forestinfo' ;
22862287 let connectionParams = this . client . getConnectionParams ( ) ;
22872288 let requestOptions = mlutil . copyProperties ( connectionParams ) ;
22882289
2289- let inputStream = new stream . PassThrough ( { objectMode : true } ) ;
22902290 requestOptions . method = 'GET' ;
22912291 requestOptions . headers = {
22922292 'Accept' : 'application/json' ,
@@ -2345,7 +2345,7 @@ function writeAllDocumentsImpl(jobOptions) {
23452345
23462346function finishWriter ( jobState ) {
23472347 jobState . requesterCount -- ;
2348- if ( jobState . requesterCount == = 0 ) {
2348+ if ( jobState . requesterCount < = 0 ) {
23492349 if ( jobState . jobOptions . onCompletion ) {
23502350 const summary = {
23512351 docsWrittenSuccessfully : jobState . docsWrittenSuccessfully ,
@@ -2380,6 +2380,7 @@ function onWriteAllInit(output) {
23802380 }
23812381 }
23822382 jobState . startTime = Date . now ( ) ;
2383+ jobState . requesterCount = 0 ;
23832384 for ( let i = 0 ; i < maxRequesters ; i ++ ) {
23842385 jobState . requesterCount ++ ;
23852386 onWriteAllDocs ( jobState , i ) ;
@@ -2765,6 +2766,7 @@ function urisOutputTransform(headers, data) {
27652766 * concurrentRequests, categories, transform, onInitialTimestamp, outputStreamType and consistentSnapshot.
27662767 * @method documents#readAll
27672768 * @since 2.9.0
2769+ * @param {stream } [stream] - stream containing the input uris.
27682770 * @param {object } [options] - Configures the readAll operation.
27692771 * @param {function(summary) } [onCompletion] - A callback that receives a summary of the results.
27702772 * @param {function(progress, uris, error) } [onBatchError] - A callback that responds to any error while reading a
@@ -2791,13 +2793,14 @@ function urisOutputTransform(headers, data) {
27912793 * object mode (for arrays of strings) and returns document descriptors with the content and/or document uri as output to
27922794 * the application in object mode.
27932795 */
2794- Documents . prototype . readAll = function readAllDocuments ( options ) {
2796+ Documents . prototype . readAll = function readAllDocuments ( stream , options ) {
2797+
27952798 return readAllDocumentsImpl . call (
2796- this , options
2799+ this , stream , options
27972800 ) ;
27982801} ;
27992802
2800- function readAllDocumentsImpl ( jobOptions ) {
2803+ function readAllDocumentsImpl ( inputStream , jobOptions ) {
28012804
28022805 let path = '/v1/internal/forestinfo' ;
28032806 let connectionParams = this . client . getConnectionParams ( ) ;
@@ -2829,8 +2832,7 @@ function readAllDocumentsImpl(jobOptions) {
28292832 throw new Error ( 'Invalid value for inputKind. Value must be array or string.' ) ;
28302833 }
28312834 }
2832-
2833- jobState . inputStream = new stream . PassThrough ( { objectMode : true } ) ;
2835+ jobState . inputStream = inputStream ?inputStream :new stream . PassThrough ( { objectMode : true } ) ;
28342836
28352837 if ( jobState . jobOptions . outputStreamType ) {
28362838 readableObjectMode = jobState . jobOptions . outputStreamType . toString ( ) . toLowerCase ( ) ;
@@ -3242,7 +3244,7 @@ function queryToReadAllDocumentsImpl(query,jobOptions){
32423244 if ( ! pipelined ) {
32433245 pipelined = true ;
32443246
3245- const readAllStream = readAllDocumentsImpl . call ( docInstance , readAllJobOptions ) ;
3247+ const readAllStream = readAllDocumentsImpl . call ( docInstance , null , readAllJobOptions ) ;
32463248 readAllStream . on ( 'error' , function ( err ) {
32473249 queryToReadAllResultStream . emit ( err ) ;
32483250 } ) ;
@@ -3353,7 +3355,8 @@ function copyConstraints(suggestConstraints, searchConstraints) {
33533355 * The options include onCompletion, batchSize, onBatchError, inputKind, concurrentRequests, transform, onBatchSuccess
33543356 * and transformStrategy.
33553357 * @method documents#transformAll
3356- * @since 2.10.0
3358+ * @since 3.0.0
3359+ * @param {stream } [stream] - stream containing the input.
33573360 * @param {object } [options] - Configures the transformAll operation.
33583361 * @param {function(summary) } [onCompletion] - A callback that receives a summary of the results.
33593362 * @param {function(progress, uris, error) } [onBatchError] - A callback that responds to any error while transforming a
@@ -3375,13 +3378,13 @@ function copyConstraints(suggestConstraints, searchConstraints) {
33753378 * @returns {stream.Writable } - returns a writable stream that takes the document uris of the database documents to be
33763379 * transformed.
33773380 */
3378- Documents . prototype . transformAll = function transformAllDocuments ( options ) {
3381+ Documents . prototype . transformAll = function transformAllDocuments ( stream , options ) {
33793382 return transformAllDocumentsImpl . call (
3380- this , options
3383+ this , stream , options
33813384 ) ;
33823385} ;
33833386
3384- function transformAllDocumentsImpl ( jobOptions ) {
3387+ function transformAllDocumentsImpl ( inputStream , jobOptions ) {
33853388 let path = '/v1/internal/forestinfo' ;
33863389 let connectionParams = this . client . getConnectionParams ( ) ;
33873390 let requestOptions = mlutil . copyProperties ( connectionParams ) ;
@@ -3403,7 +3406,7 @@ function transformAllDocumentsImpl(jobOptions){
34033406 docsFailedToBeTransformed : 0 ,
34043407 jobOptions : ( jobOptions ) ? mlutil . copyProperties ( jobOptions ) :{ }
34053408 } ;
3406-
3409+ jobState . stream = inputStream ? inputStream : new stream . PassThrough ( { objectMode : true } ) ;
34073410 if ( ! jobState . jobOptions . transform ) {
34083411 throw new Error ( 'transform name needed while using transformAll api' ) ;
34093412 }
@@ -3416,12 +3419,9 @@ function transformAllDocumentsImpl(jobOptions){
34163419 if ( inputKindValue === 'array' && jobState . jobOptions . batchSize ) {
34173420 throw new Error ( 'batchSize not expected when inputKind is array.' ) ;
34183421 }
3419- jobState . stream = ( inputKindValue === 'string' ) ? new stream . PassThrough ( ) :
3420- new stream . PassThrough ( { objectMode : true } ) ;
34213422 jobState . inputKindValue = inputKindValue ;
34223423 }
34233424 else {
3424- jobState . stream = new stream . PassThrough ( { objectMode : true } ) ;
34253425 jobState . inputKindValue = 'string' ;
34263426 }
34273427
@@ -3674,7 +3674,7 @@ function finishTransformAll(jobState) {
36743674 * The queryToTransformAll is a convenience function that combines the queryAll and transformAll operations.
36753675 * The transform used to configure the documents must be installed on the server beforehand.
36763676 * @method documents#queryToTransformAll
3677- * @since 2.10 .0
3677+ * @since 3.0 .0
36783678 * @param {ctsQuery } query - A query built by the ctsQueryBuilder.
36793679 * @param {object } [options] - Configures the queryToTransformAll operation.
36803680 * @param {boolean|DatabaseClient.timestamp } [consistentSnapshot] - Controls whether to get an immutable view of the result set.
@@ -3765,7 +3765,7 @@ function queryToTransformAllDocumentsImpl(query, jobOptions){
37653765 queryAllStream . on ( 'data' , function ( item ) {
37663766 if ( ! pipelined ) {
37673767 pipelined = true ;
3768- const transformAllStream = transformAllDocumentsImpl . call ( docInstance , transformAllJobOptions ) ;
3768+ const transformAllStream = transformAllDocumentsImpl . call ( docInstance , null , transformAllJobOptions ) ;
37693769
37703770 queryAllStream . on ( 'error' , function ( err ) {
37713771 transformAllStream . emit ( err ) ;
@@ -3779,7 +3779,8 @@ function queryToTransformAllDocumentsImpl(query, jobOptions){
37793779/**
37803780 * The removeAllUris function deletes a set of documents from the database.
37813781 * @method documents#removeAllUris
3782- * @since 2.10.0
3782+ * @since 3.0.0
3783+ * @param {stream } [stream] - stream containing the input uris.
37833784 * @param {object } [options] - Configures the removeAllUris operation.
37843785 * @param {function(summary) } [onCompletion] - A callback that receives a summary of the results.
37853786 * @param {function(progress, uris, error) } [onBatchError] - A callback that responds to any error while transforming a
@@ -3795,13 +3796,13 @@ function queryToTransformAllDocumentsImpl(query, jobOptions){
37953796 * @returns {stream.Writable } - a stream.Writable that receives document URI input from the application in string mode
37963797 * or (for arrays of strings) object mode.
37973798 */
3798- Documents . prototype . removeAllUris = function removeAllUrisDocuments ( options ) {
3799+ Documents . prototype . removeAllUris = function removeAllUrisDocuments ( stream , options ) {
37993800 return removeAllUrisDocumentsImpl . call (
3800- this , options
3801+ this , stream , options
38013802 ) ;
38023803} ;
38033804
3804- function removeAllUrisDocumentsImpl ( jobOptions ) {
3805+ function removeAllUrisDocumentsImpl ( inputStream , jobOptions ) {
38053806 let path = '/v1/internal/forestinfo' ;
38063807 let connectionParams = this . client . getConnectionParams ( ) ;
38073808 let requestOptions = mlutil . copyProperties ( connectionParams ) ;
@@ -3823,7 +3824,7 @@ function removeAllUrisDocumentsImpl(jobOptions){
38233824 docsFailedToBeRemoved : 0 ,
38243825 jobOptions : ( jobOptions ) ? mlutil . copyProperties ( jobOptions ) :{ }
38253826 } ;
3826-
3827+ jobState . stream = inputStream ? inputStream : new stream . PassThrough ( { objectMode : true } ) ;
38273828 if ( jobState . jobOptions . inputKind ) {
38283829 const inputKindValue = jobState . jobOptions . inputKind . toString ( ) . toLowerCase ( ) ;
38293830 if ( ! new Set ( [ 'string' , 'array' ] ) . has ( inputKindValue ) ) {
@@ -3832,12 +3833,9 @@ function removeAllUrisDocumentsImpl(jobOptions){
38323833 if ( inputKindValue === 'array' && jobState . jobOptions . batchSize ) {
38333834 throw new Error ( 'batchSize not expected when inputKind is array.' ) ;
38343835 }
3835- jobState . stream = ( inputKindValue === 'string' ) ? new stream . PassThrough ( ) :
3836- new stream . PassThrough ( { objectMode : true } ) ;
38373836 jobState . inputKindValue = inputKindValue ;
38383837 }
38393838 else {
3840- jobState . stream = new stream . PassThrough ( { objectMode : true } ) ;
38413839 jobState . inputKindValue = 'string' ;
38423840 }
38433841
@@ -4049,7 +4047,7 @@ function removeDocs(jobState, removeBatchArray, writerId){
40494047/**
40504048 * The queryToRemoveAll is a convenience function that combines the queryAll and removeAllUris operations.
40514049 * @method documents#queryToRemoveAll
4052- * @since 2.10 .0
4050+ * @since 3.0 .0
40534051 * @param {ctsQuery } query - A query built by the ctsQueryBuilder.
40544052 * @param {object } [options] - Configures the queryToRemoveAll operation.
40554053 * @param {boolean|DatabaseClient.timestamp } [consistentSnapshot] - Controls whether to get an immutable view of the result set.
@@ -4129,7 +4127,7 @@ function queryToRemoveAllDocumentsImpl(query, jobOptions){
41294127 queryAllStream . on ( 'data' , function ( item ) {
41304128 if ( ! pipelined ) {
41314129 pipelined = true ;
4132- const removeAllStream = removeAllUrisDocumentsImpl . call ( docInstance , removeAllJobOptions ) ;
4130+ const removeAllStream = removeAllUrisDocumentsImpl . call ( docInstance , null , removeAllJobOptions ) ;
41334131
41344132 queryAllStream . on ( 'error' , function ( err ) {
41354133 removeAllStream . emit ( err ) ;
0 commit comments