Skip to content

Commit c0afa2e

Browse files
committed
Write Stream emits close - Fixes #1
For a write sream to support flushing on finish, it must listen to itself, flush then emit `close`. USers of such a stream must listen to `close` to know when the data has actually been flushed. Reference: nodejs/node-v0.x-archive#5315 (comment)
1 parent c75a716 commit c0afa2e

File tree

4 files changed

+19
-17
lines changed

4 files changed

+19
-17
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ var toBulk = new TransformToBulk(function getIndexTypeId(doc) { return { _id: do
3030
require('random-document-stream')(42).pipe(toBulk).pipe(ws).on('finish', done);
3131
```
3232

33+
NOTE: One must listen to the `close` event emitted by the write stream to know
34+
when all the data has been written and flushed to Elasticsearch.
35+
36+
Listening to `finish` does not mean much really as we are in this situation:
37+
https://github.com/joyent/node/issues/5315#issuecomment-16670354
38+
3339
## Stream search results from Elasticsearch
3440
```
3541
var ReadableSearch = require('elasticsearch-streams').ReadableSearch;
@@ -53,7 +59,7 @@ ws._write = function(chunk, enc, next) {
5359
next();
5460
};
5561
56-
rs.pipe(ws).on('finish', done);
62+
rs.pipe(ws).on('close', done);
5763
```
5864

5965
If we want to start the stream at an offset and define a limit:

lib/writable-bulk.js

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ function WritableBulk(bulkExec, highWaterMark) {
2323
this.bulk = [];
2424
this.bulkCount = 0;
2525
this.expectingPayload = false;
26+
27+
// when end is called we still need to flush but we must not overwrite end ourself.
28+
// now we need to tell everyone to listen to the close event to know when we are done.
29+
// Not great. See: https://github.com/joyent/node/issues/5315#issuecomment-16670354
30+
this.on('finish', function() {
31+
this._flushBulk(function() {
32+
this.emit('close');
33+
}.bind(this));
34+
}.bind(this));
2635
}
2736

2837
WritableBulk.prototype = Object.create(Writable.prototype, {constructor: {value: WritableBulk}});
@@ -83,17 +92,3 @@ WritableBulk.prototype._flushBulk = function(callback) {
8392
callback();
8493
});
8594
};
86-
87-
WritableBulk.prototype.end = function(data) {
88-
var self = this;
89-
if (!data) {
90-
return this._flushBulk(function() {
91-
self.emit('finish');
92-
});
93-
}
94-
this._write(data, 'json', function() {
95-
self._flushBulk(function() {
96-
self.emit('finish');
97-
});
98-
});
99-
};

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
},
2121
"devDependencies": {
2222
"chai": "*",
23-
"elasticsearch": "*"
23+
"elasticsearch": "*",
24+
"random-document-stream": "*"
2425
}
2526
}

test/test-write.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ describe('When writing', function() {
2121
ws = new WritableBulk(bulkExec);
2222
ws.on('error', function(e) {
2323
err = e;
24-
}).on('finish', function() {
24+
}).on('close', function() {
2525
done(err);
2626
});
2727

0 commit comments

Comments
 (0)