@@ -147,7 +147,7 @@ BodyDispatcher.prototype.promise = function dispatchBodyPromise(
147147 contentType , response
148148 ) {
149149 var operation = this . operation ;
150-
150+
151151 operation . logger . debug ( 'body promise' ) ;
152152 var collectObject = function collectPromiseBodyObject ( data ) {
153153 operation . data = operation . collectBodyObject ( data ) ;
@@ -166,7 +166,7 @@ BodyDispatcher.prototype.chunkedStream = function dispatchBodyChunkedStream(
166166 contentType , response
167167 ) {
168168 var operation = this . operation ;
169-
169+
170170 operation . logger . debug ( 'body chunked stream' ) ;
171171
172172 response . pipe ( operation . outputStream ) ;
@@ -175,7 +175,7 @@ BodyDispatcher.prototype.objectStream = function dispatchBodyObjectStream(
175175 contentType , response
176176 ) {
177177 var operation = this . operation ;
178-
178+
179179 operation . logger . debug ( 'body object stream' ) ;
180180
181181 var outputStream = operation . outputStream ;
@@ -345,7 +345,7 @@ MultipartDispatcher.prototype.chunkedStream = function dispatchMultipartChunkedS
345345 boundary , response
346346 ) {
347347 var operation = this . operation ;
348-
348+
349349 var errorListener = operation . errorListener ;
350350
351351 var outputStream = operation . outputStream ;
@@ -414,7 +414,7 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
414414 boundary , response
415415 ) {
416416 var operation = this . operation ;
417-
417+
418418 var errorListener = operation . errorListener ;
419419
420420 var rawHeaderQueue = new FifoQueue ( 5 ) ;
@@ -426,31 +426,13 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
426426 var hasParsed = false ;
427427 var hasEnded = false ;
428428
429- var partTransform = function objectPartQueueTransform (
430- isLast , data , objectQueue
431- ) {
429+ var partTransform = function objectPartQueueTransform ( data ) {
432430 parsedParts ++ ;
431+ var objectQueue = queuedReader . getItemQueue ( ) ;
433432 metadataBuffer = operation . queueDocument (
434433 ( data . length === 0 ) ? null : data , rawHeaderQueue , metadataBuffer , objectQueue
435434 ) ;
436-
437- if ( isLast ) {
438- if ( metadataBuffer !== null ) {
439- operation . queueMetadata ( metadataBuffer , objectQueue ) ;
440- metadataBuffer = null ;
441- }
442-
443- rawHeaderQueue = null ;
444- queuedReader = null ;
445- parser = null ;
446- partHeadersListener = null ;
447- partListener = null ;
448- parseFinishListener = null ;
449- responseEndListener = null ;
450- partTransform = null ;
451- } else if ( ! hasEnded && parsedParts === parsingParts ) {
452- parser . emit ( 'drain' ) ;
453- }
435+ doneChecker ( ) ;
454436 } ;
455437
456438 var queuedReader = new QueuedReader (
@@ -470,15 +452,44 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
470452 queuedReader . addReader ( partReadStream ) ;
471453 } ;
472454
473- var parseFinishListener = function promiseParseFinishListener ( ) {
455+ var parseFinishListener = function objectParseFinishListener ( ) {
474456 hasParsed = true ;
475- if ( queuedReader !== null ) {
476- queuedReader . queuedAll ( ) ;
477- }
457+ doneChecker ( ) ;
478458 } ;
479459
480- var responseEndListener = function promiseResponseEndListener ( ) {
460+ var responseEndListener = function objectResponseEndListener ( ) {
481461 hasEnded = true ;
462+ doneChecker ( ) ;
463+ } ;
464+
465+ /**
466+ * Check if HTTP response has ended, Dicer has finished parsing,
467+ * and the Queue is empty. If all are true, then we can call end()
468+ * on output stream.
469+ */
470+ var doneChecker = function doneChecker ( ) {
471+
472+ if ( queuedReader . isQueueEmpty ( ) && hasParsed && hasEnded ) {
473+ if ( metadataBuffer !== null ) {
474+ var objectQueue = queuedReader . getItemQueue ( ) ;
475+ operation . queueMetadata ( metadataBuffer , objectQueue ) ;
476+ metadataBuffer = null ;
477+ }
478+
479+ rawHeaderQueue = null ;
480+ queuedReader = null ;
481+ parser = null ;
482+ partHeadersListener = null ;
483+ partListener = null ;
484+ parseFinishListener = null ;
485+ responseEndListener = null ;
486+ partTransform = null ;
487+
488+ operation . outputStream . end ( ) ;
489+ } else if ( ! hasEnded && parsedParts === parsingParts ) {
490+ parser . emit ( 'drain' ) ;
491+ }
492+
482493 } ;
483494
484495 var parser = new Dicer ( { boundary : boundary } ) ;
@@ -537,13 +548,10 @@ function QueuedReader(options, logger, itemsTransform) {
537548
538549 self . logger . debug ( 'concatenated item' ) ;
539550
540- var isLast = ( self . queueDone && self . readerQueue . length ( ) === 0 &&
541- self . writerQueue . length ( ) === 0 ) ;
542-
543551 var itemQueue = self . itemQueue ;
544552 var beforeLength = itemQueue . length ( ) ;
545553
546- self . itemsTransform ( isLast , data , itemQueue ) ;
554+ self . itemsTransform ( data ) ;
547555
548556 if ( beforeLength < itemQueue . length ( ) ) {
549557 if ( beforeLength === 0 ) {
@@ -569,6 +577,12 @@ QueuedReader.prototype.addReader = function queuedAddReader(reader) {
569577 this . logger . debug ( 'queued item %d' , readerQueue . getTotal ( ) ) ;
570578 this . nextReader ( ) ;
571579} ;
580+ QueuedReader . prototype . getItemQueue = function getItemQueue ( ) {
581+ return this . itemQueue ;
582+ } ;
583+ QueuedReader . prototype . isQueueEmpty = function isQueueEmpty ( ) {
584+ return ( this . readerQueue . length ( ) === 0 && this . writerQueue . length ( ) === 0 ) ;
585+ } ;
572586QueuedReader . prototype . nextReader = function queuedReaderNextReader ( ) {
573587 if ( ! this . isReading ) {
574588 return ;
@@ -635,12 +649,6 @@ QueuedReader.prototype._read = function queuedReaderRead(/*size*/) {
635649 this . nextReader ( ) ;
636650 }
637651} ;
638- QueuedReader . prototype . queuedAll = function queuedReaderAll ( ) {
639- if ( ! this . queueDone ) {
640- this . logger . debug ( 'queued all items' ) ;
641- this . queueDone = true ;
642- }
643- } ;
644652
645653function FifoQueue ( min ) {
646654 if ( ! ( this instanceof FifoQueue ) ) {
@@ -799,13 +807,13 @@ function operationResultPromise(fullfilled, rejected) {
799807 switch ( operation . outputMode ) {
800808 case 'none' :
801809 if ( operation . startedResponse === true ) {
802- throw new Error ( 'cannot create result promise after receiving response' ) ;
810+ throw new Error ( 'cannot create result promise after receiving response' ) ;
803811 }
804812 break ;
805813 case 'promise' :
806- throw new Error ( 'already created result promise' ) ;
814+ throw new Error ( 'already created result promise' ) ;
807815 default :
808- throw new Error ( 'cannot create result promise after creating stream' ) ;
816+ throw new Error ( 'cannot create result promise after creating stream' ) ;
809817 }
810818 operation . outputMode = 'promise' ;
811819
@@ -847,7 +855,7 @@ function resolvedPromise(operation, resolve) {
847855 ) + ' data' ) ;
848856
849857 if ( ! hasData ) {
850- resolve . call ( operation ) ;
858+ resolve . call ( operation ) ;
851859 } else {
852860 resolve . call ( operation , data ) ;
853861 }
@@ -932,7 +940,7 @@ function operationResultStream() {
932940 if ( error != null ) {
933941 var i = 0 ;
934942 for ( ; i < error . length ; i ++ ) {
935- outputStream . emit ( 'error' , error [ i ] ) ;
943+ outputStream . emit ( 'error' , error [ i ] ) ;
936944 }
937945 operation . error = undefined ;
938946 }
0 commit comments