Skip to content

Commit 93750d6

Browse files
committed
WriteableBulk accepts a stream of bulk cmds only
before it would be configurable to generate a default command when it was passed a document. It was mixing concern. An example transformer stream that generates bulk commands to index documents was added as `require('elasticsearch-streams').TransformToBulk
1 parent f0a699a commit 93750d6

File tree

5 files changed

+44
-32
lines changed

5 files changed

+44
-32
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ var bulkExec = function(bulkCmds, callback) {
2525
}, callback);
2626
};
2727
var ws = new WritableBulk(bulkExec);
28-
28+
var toBulk = new TransformToBulk(function getIndexTypeId(doc) { return { _id: doc.id }; });
2929
// stream 42 random records into ES
30-
require('random-document-stream')(42).pipe(ws).on('finish', done);
30+
require('random-document-stream')(42).pipe(toBulk).pipe(ws).on('finish', done);
3131
```
3232

3333
## Stream search results from Elasticsearch

index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
module.exports = {
2+
TransformToBulk: require('./lib/transform-to-bulk.js'),
23
WritableBulk : require('./lib/writable-bulk'),
34
ReadableSearch : require('./lib/readable-search'),
45
PipableDocs : require('./lib/transform-mget')

lib/transform-to-bulk.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Transform a stream of documents into a stream of bulk commands to index them.
3+
* This is more an example than anything else.
4+
*/
5+
'use strict';
6+
var Transform = require('stream').Transform;
7+
8+
module.exports = TransformToBulk;
9+
10+
/**
11+
* @param getIndexTypeId function that is passed a document and returns:
12+
* { _index: the_index?, _type: the_type?, _id: the_id? }
13+
*/
14+
function TransformToBulk(getIndexTypeId) {
15+
Transform.call(this, {objectMode:true});
16+
this.getIndexTypeId = getIndexTypeId;
17+
}
18+
19+
TransformToBulk.prototype = Object.create(Transform.prototype, {constructor: {value: TransformToBulk}});
20+
21+
TransformToBulk.prototype._transform = function(chunk, encoding, callback) {
22+
var params = this.getIndexTypeId(chunk);
23+
if (params) {
24+
this.push({ index: params });
25+
this.push(chunk);
26+
}
27+
callback();
28+
};

lib/writable-bulk.js

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,13 @@ module.exports = WritableBulk;
99

1010
/**
1111
* @param bulkExec closure invoked with the bulk cmds as an array and a callback
12-
* @param defaults: { op: 'index or create, index by default'
13-
* id: 'name of the property that is the id, by default _id',
14-
* _index: 'name of the index or nothing',
15-
* _type: 'name of the type or nothing' }
16-
* nothing to emit an error on unknown document command
17-
* @param bulkSize number of bulk commands executed at once. 128 by default.
12+
* @param highWaterMark number of bulk commands executed at once. 128 by default.
1813
*/
19-
function WritableBulk(bulkExec, defaults, bulkSize) {
14+
function WritableBulk(bulkExec, highWaterMark) {
2015
Writable.call(this, {objectMode:true});
2116
this.bulkExec = bulkExec;
2217

23-
if (!bulkSize && typeof defaults === 'number') {
24-
bulkSize = defaults;
25-
defaults = undefined;
26-
}
27-
28-
this.bulkSize = bulkSize || 128;
29-
this.defaults = defaults || {};
30-
this.defaults.id = this.defaults.id || '_id';
31-
this.defaults.op = this.defaults.op || 'index';
18+
this.highWaterMark = highWaterMark || 128;
3219

3320
this.bulk = [];
3421
this.bulkCount = 0;
@@ -44,15 +31,6 @@ WritableBulk.prototype._write = function(chunk, enc, next) {
4431
if (this.expectingPayload) {
4532
this.bulkCount++;
4633
this.expectingPayload = false;
47-
} else if (chunk.hasOwnProperty(this.defaults.id)) {
48-
var defaultCmd = {};
49-
defaultCmd[this.defaults.op] = {
50-
_index: this.defaults._index,
51-
_type: this.defaults._type,
52-
_id: chunk[this.defaults.id]
53-
};
54-
this.bulk.push(defaultCmd);
55-
this.bulkCount++;
5634
} else {
5735
var willExpectPayload = ['index', 'create', 'update'];
5836
for (var i = 0; i < willExpectPayload.length; i++) {
@@ -73,7 +51,7 @@ WritableBulk.prototype._write = function(chunk, enc, next) {
7351
}
7452
}
7553
this.bulk.push(chunk);
76-
if (this.bulkSize <= this.bulkCount) {
54+
if (this.highWaterMark <= this.bulkCount) {
7755
return this._flushBulk(next);
7856
}
7957
next();

test/test-write.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
'use strict';
22
var expect = require('chai').expect;
33
var WritableBulk = require('..').WritableBulk;
4+
var TransformToBulk = require('..').TransformToBulk;
45
var random = require('random-document-stream');
5-
var client = new require('elasticsearch').Client();
6+
var client = new require('elasticsearch').Client({log: 'warning'});
67

78
describe('When writing', function() {
89
var ws;
910
before(function(done) {
1011
var bulkExec = function(bulkCmds, callback) {
1112
client.bulk({
12-
index : 'myindex',
13+
index : 'myindex2',
1314
type : 'mytype',
1415
body : bulkCmds
1516
}, callback);
@@ -23,16 +24,20 @@ describe('When writing', function() {
2324
}).on('finish', function() {
2425
done(err);
2526
});
27+
28+
var transformToBulk = new TransformToBulk(function(doc) {
29+
return { _id: doc._id };
30+
});
2631
// drop the index then
2732
client.indices.delete({index: 'myindex2'}, function() {
2833
// stream 42 random docs into ES
29-
random(42).pipe(ws);
34+
random(42).pipe(transformToBulk).pipe(ws);
3035
});
3136
});
3237
it('Must have indexed 42 docs', function(done) {
3338
client.indices.refresh({ index: 'myindex2' }, function() {
3439
client.count({
35-
index: 'myindex',
40+
index: 'myindex2',
3641
type: 'mytype'
3742
}, function(e, res) {
3843
if (e) { return done(e); }

0 commit comments

Comments
 (0)