Skip to content

Commit 6af309e

Browse files
committed
Handle when multipart object stream is done
From dispatchMultipartObjectStream, check that HTTP response has ended, Dicer has finished parsing, and the Queue is empty. Fixes #362
1 parent 812ee98 commit 6af309e

File tree

1 file changed

+54
-46
lines changed

1 file changed

+54
-46
lines changed

lib/responder.js

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ BodyDispatcher.prototype.promise = function dispatchBodyPromise(
147147
contentType, response
148148
) {
149149
var operation = this.operation;
150-
150+
151151
operation.logger.debug('body promise');
152152
var collectObject = function collectPromiseBodyObject(data) {
153153
operation.data = operation.collectBodyObject(data);
@@ -166,7 +166,7 @@ BodyDispatcher.prototype.chunkedStream = function dispatchBodyChunkedStream(
166166
contentType, response
167167
) {
168168
var operation = this.operation;
169-
169+
170170
operation.logger.debug('body chunked stream');
171171

172172
response.pipe(operation.outputStream);
@@ -175,7 +175,7 @@ BodyDispatcher.prototype.objectStream = function dispatchBodyObjectStream(
175175
contentType, response
176176
) {
177177
var operation = this.operation;
178-
178+
179179
operation.logger.debug('body object stream');
180180

181181
var outputStream = operation.outputStream;
@@ -345,7 +345,7 @@ MultipartDispatcher.prototype.chunkedStream = function dispatchMultipartChunkedS
345345
boundary, response
346346
) {
347347
var operation = this.operation;
348-
348+
349349
var errorListener = operation.errorListener;
350350

351351
var outputStream = operation.outputStream;
@@ -414,7 +414,7 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
414414
boundary, response
415415
) {
416416
var operation = this.operation;
417-
417+
418418
var errorListener = operation.errorListener;
419419

420420
var rawHeaderQueue = new FifoQueue(5);
@@ -426,31 +426,13 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
426426
var hasParsed = false;
427427
var hasEnded = false;
428428

429-
var partTransform = function objectPartQueueTransform(
430-
isLast, data, objectQueue
431-
) {
429+
var partTransform = function objectPartQueueTransform(data) {
432430
parsedParts++;
431+
var objectQueue = queuedReader.getItemQueue();
433432
metadataBuffer = operation.queueDocument(
434433
(data.length === 0) ? null : data, rawHeaderQueue, metadataBuffer, objectQueue
435434
);
436-
437-
if (isLast) {
438-
if (metadataBuffer !== null) {
439-
operation.queueMetadata(metadataBuffer, objectQueue);
440-
metadataBuffer = null;
441-
}
442-
443-
rawHeaderQueue = null;
444-
queuedReader = null;
445-
parser = null;
446-
partHeadersListener = null;
447-
partListener = null;
448-
parseFinishListener = null;
449-
responseEndListener = null;
450-
partTransform = null;
451-
} else if (!hasEnded && parsedParts === parsingParts) {
452-
parser.emit('drain');
453-
}
435+
doneChecker();
454436
};
455437

456438
var queuedReader = new QueuedReader(
@@ -470,15 +452,44 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
470452
queuedReader.addReader(partReadStream);
471453
};
472454

473-
var parseFinishListener = function promiseParseFinishListener() {
455+
var parseFinishListener = function objectParseFinishListener() {
474456
hasParsed = true;
475-
if (queuedReader !== null) {
476-
queuedReader.queuedAll();
477-
}
457+
doneChecker();
478458
};
479459

480-
var responseEndListener = function promiseResponseEndListener() {
460+
var responseEndListener = function objectResponseEndListener() {
481461
hasEnded = true;
462+
doneChecker();
463+
};
464+
465+
/**
466+
* Check if HTTP response has ended, Dicer has finished parsing,
467+
* and the Queue is empty. If all are true, then we can call end()
468+
* on output stream.
469+
*/
470+
var doneChecker = function doneChecker() {
471+
472+
if (queuedReader.isQueueEmpty() && hasParsed && hasEnded) {
473+
if (metadataBuffer !== null) {
474+
var objectQueue = queuedReader.getItemQueue();
475+
operation.queueMetadata(metadataBuffer, objectQueue);
476+
metadataBuffer = null;
477+
}
478+
479+
rawHeaderQueue = null;
480+
queuedReader = null;
481+
parser = null;
482+
partHeadersListener = null;
483+
partListener = null;
484+
parseFinishListener = null;
485+
responseEndListener = null;
486+
partTransform = null;
487+
488+
operation.outputStream.end();
489+
} else if (!hasEnded && parsedParts === parsingParts) {
490+
parser.emit('drain');
491+
}
492+
482493
};
483494

484495
var parser = new Dicer({boundary: boundary});
@@ -537,13 +548,10 @@ function QueuedReader(options, logger, itemsTransform) {
537548

538549
self.logger.debug('concatenated item');
539550

540-
var isLast = (self.queueDone && self.readerQueue.length() === 0 &&
541-
self.writerQueue.length() === 0);
542-
543551
var itemQueue = self.itemQueue;
544552
var beforeLength = itemQueue.length();
545553

546-
self.itemsTransform(isLast, data, itemQueue);
554+
self.itemsTransform(data);
547555

548556
if (beforeLength < itemQueue.length()) {
549557
if (beforeLength === 0) {
@@ -569,6 +577,12 @@ QueuedReader.prototype.addReader = function queuedAddReader(reader) {
569577
this.logger.debug('queued item %d', readerQueue.getTotal());
570578
this.nextReader();
571579
};
580+
QueuedReader.prototype.getItemQueue = function getItemQueue() {
581+
return this.itemQueue;
582+
};
583+
QueuedReader.prototype.isQueueEmpty = function isQueueEmpty() {
584+
return (this.readerQueue.length() === 0 && this.writerQueue.length() === 0);
585+
};
572586
QueuedReader.prototype.nextReader = function queuedReaderNextReader() {
573587
if (!this.isReading) {
574588
return;
@@ -635,12 +649,6 @@ QueuedReader.prototype._read = function queuedReaderRead(/*size*/) {
635649
this.nextReader();
636650
}
637651
};
638-
QueuedReader.prototype.queuedAll = function queuedReaderAll() {
639-
if (!this.queueDone) {
640-
this.logger.debug('queued all items');
641-
this.queueDone = true;
642-
}
643-
};
644652

645653
function FifoQueue(min) {
646654
if (!(this instanceof FifoQueue)) {
@@ -799,13 +807,13 @@ function operationResultPromise(fullfilled, rejected) {
799807
switch (operation.outputMode) {
800808
case 'none':
801809
if (operation.startedResponse === true) {
802-
throw new Error('cannot create result promise after receiving response');
810+
throw new Error('cannot create result promise after receiving response');
803811
}
804812
break;
805813
case 'promise':
806-
throw new Error('already created result promise');
814+
throw new Error('already created result promise');
807815
default:
808-
throw new Error('cannot create result promise after creating stream');
816+
throw new Error('cannot create result promise after creating stream');
809817
}
810818
operation.outputMode = 'promise';
811819

@@ -847,7 +855,7 @@ function resolvedPromise(operation, resolve) {
847855
)+' data');
848856

849857
if (!hasData) {
850-
resolve.call(operation);
858+
resolve.call(operation);
851859
} else {
852860
resolve.call(operation, data);
853861
}
@@ -932,7 +940,7 @@ function operationResultStream() {
932940
if (error != null) {
933941
var i = 0;
934942
for (; i < error.length; i++) {
935-
outputStream.emit('error', error[i]);
943+
outputStream.emit('error', error[i]);
936944
}
937945
operation.error = undefined;
938946
}

0 commit comments

Comments
 (0)