Skip to content

Commit 2dc5aa2

Browse files
authored
Merge pull request #406 from wooldridge/issues/362-stream-end
Handle when multipart object stream is done
2 parents 812ee98 + 6af309e commit 2dc5aa2

File tree

1 file changed

+54
-46
lines changed

1 file changed

+54
-46
lines changed

lib/responder.js

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ BodyDispatcher.prototype.promise = function dispatchBodyPromise(
147147
contentType, response
148148
) {
149149
var operation = this.operation;
150-
150+
151151
operation.logger.debug('body promise');
152152
var collectObject = function collectPromiseBodyObject(data) {
153153
operation.data = operation.collectBodyObject(data);
@@ -166,7 +166,7 @@ BodyDispatcher.prototype.chunkedStream = function dispatchBodyChunkedStream(
166166
contentType, response
167167
) {
168168
var operation = this.operation;
169-
169+
170170
operation.logger.debug('body chunked stream');
171171

172172
response.pipe(operation.outputStream);
@@ -175,7 +175,7 @@ BodyDispatcher.prototype.objectStream = function dispatchBodyObjectStream(
175175
contentType, response
176176
) {
177177
var operation = this.operation;
178-
178+
179179
operation.logger.debug('body object stream');
180180

181181
var outputStream = operation.outputStream;
@@ -345,7 +345,7 @@ MultipartDispatcher.prototype.chunkedStream = function dispatchMultipartChunkedS
345345
boundary, response
346346
) {
347347
var operation = this.operation;
348-
348+
349349
var errorListener = operation.errorListener;
350350

351351
var outputStream = operation.outputStream;
@@ -414,7 +414,7 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
414414
boundary, response
415415
) {
416416
var operation = this.operation;
417-
417+
418418
var errorListener = operation.errorListener;
419419

420420
var rawHeaderQueue = new FifoQueue(5);
@@ -426,31 +426,13 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
426426
var hasParsed = false;
427427
var hasEnded = false;
428428

429-
var partTransform = function objectPartQueueTransform(
430-
isLast, data, objectQueue
431-
) {
429+
var partTransform = function objectPartQueueTransform(data) {
432430
parsedParts++;
431+
var objectQueue = queuedReader.getItemQueue();
433432
metadataBuffer = operation.queueDocument(
434433
(data.length === 0) ? null : data, rawHeaderQueue, metadataBuffer, objectQueue
435434
);
436-
437-
if (isLast) {
438-
if (metadataBuffer !== null) {
439-
operation.queueMetadata(metadataBuffer, objectQueue);
440-
metadataBuffer = null;
441-
}
442-
443-
rawHeaderQueue = null;
444-
queuedReader = null;
445-
parser = null;
446-
partHeadersListener = null;
447-
partListener = null;
448-
parseFinishListener = null;
449-
responseEndListener = null;
450-
partTransform = null;
451-
} else if (!hasEnded && parsedParts === parsingParts) {
452-
parser.emit('drain');
453-
}
435+
doneChecker();
454436
};
455437

456438
var queuedReader = new QueuedReader(
@@ -470,15 +452,44 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
470452
queuedReader.addReader(partReadStream);
471453
};
472454

473-
var parseFinishListener = function promiseParseFinishListener() {
455+
var parseFinishListener = function objectParseFinishListener() {
474456
hasParsed = true;
475-
if (queuedReader !== null) {
476-
queuedReader.queuedAll();
477-
}
457+
doneChecker();
478458
};
479459

480-
var responseEndListener = function promiseResponseEndListener() {
460+
var responseEndListener = function objectResponseEndListener() {
481461
hasEnded = true;
462+
doneChecker();
463+
};
464+
465+
/**
466+
* Check if HTTP response has ended, Dicer has finished parsing,
467+
* and the Queue is empty. If all are true, then we can call end()
468+
* on output stream.
469+
*/
470+
var doneChecker = function doneChecker() {
471+
472+
if (queuedReader.isQueueEmpty() && hasParsed && hasEnded) {
473+
if (metadataBuffer !== null) {
474+
var objectQueue = queuedReader.getItemQueue();
475+
operation.queueMetadata(metadataBuffer, objectQueue);
476+
metadataBuffer = null;
477+
}
478+
479+
rawHeaderQueue = null;
480+
queuedReader = null;
481+
parser = null;
482+
partHeadersListener = null;
483+
partListener = null;
484+
parseFinishListener = null;
485+
responseEndListener = null;
486+
partTransform = null;
487+
488+
operation.outputStream.end();
489+
} else if (!hasEnded && parsedParts === parsingParts) {
490+
parser.emit('drain');
491+
}
492+
482493
};
483494

484495
var parser = new Dicer({boundary: boundary});
@@ -537,13 +548,10 @@ function QueuedReader(options, logger, itemsTransform) {
537548

538549
self.logger.debug('concatenated item');
539550

540-
var isLast = (self.queueDone && self.readerQueue.length() === 0 &&
541-
self.writerQueue.length() === 0);
542-
543551
var itemQueue = self.itemQueue;
544552
var beforeLength = itemQueue.length();
545553

546-
self.itemsTransform(isLast, data, itemQueue);
554+
self.itemsTransform(data);
547555

548556
if (beforeLength < itemQueue.length()) {
549557
if (beforeLength === 0) {
@@ -569,6 +577,12 @@ QueuedReader.prototype.addReader = function queuedAddReader(reader) {
569577
this.logger.debug('queued item %d', readerQueue.getTotal());
570578
this.nextReader();
571579
};
580+
QueuedReader.prototype.getItemQueue = function getItemQueue() {
581+
return this.itemQueue;
582+
};
583+
QueuedReader.prototype.isQueueEmpty = function isQueueEmpty() {
584+
return (this.readerQueue.length() === 0 && this.writerQueue.length() === 0);
585+
};
572586
QueuedReader.prototype.nextReader = function queuedReaderNextReader() {
573587
if (!this.isReading) {
574588
return;
@@ -635,12 +649,6 @@ QueuedReader.prototype._read = function queuedReaderRead(/*size*/) {
635649
this.nextReader();
636650
}
637651
};
638-
QueuedReader.prototype.queuedAll = function queuedReaderAll() {
639-
if (!this.queueDone) {
640-
this.logger.debug('queued all items');
641-
this.queueDone = true;
642-
}
643-
};
644652

645653
function FifoQueue(min) {
646654
if (!(this instanceof FifoQueue)) {
@@ -799,13 +807,13 @@ function operationResultPromise(fullfilled, rejected) {
799807
switch (operation.outputMode) {
800808
case 'none':
801809
if (operation.startedResponse === true) {
802-
throw new Error('cannot create result promise after receiving response');
810+
throw new Error('cannot create result promise after receiving response');
803811
}
804812
break;
805813
case 'promise':
806-
throw new Error('already created result promise');
814+
throw new Error('already created result promise');
807815
default:
808-
throw new Error('cannot create result promise after creating stream');
816+
throw new Error('cannot create result promise after creating stream');
809817
}
810818
operation.outputMode = 'promise';
811819

@@ -847,7 +855,7 @@ function resolvedPromise(operation, resolve) {
847855
)+' data');
848856

849857
if (!hasData) {
850-
resolve.call(operation);
858+
resolve.call(operation);
851859
} else {
852860
resolve.call(operation, data);
853861
}
@@ -932,7 +940,7 @@ function operationResultStream() {
932940
if (error != null) {
933941
var i = 0;
934942
for (; i < error.length; i++) {
935-
outputStream.emit('error', error[i]);
943+
outputStream.emit('error', error[i]);
936944
}
937945
operation.error = undefined;
938946
}

0 commit comments

Comments
 (0)