Skip to content

Commit 8329151

Browse files
committed
Add multi-get support as a transform stream
1 parent f868a39 commit 8329151

File tree

5 files changed

+140
-18
lines changed

5 files changed

+140
-18
lines changed

README.md

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var bulkExec = function(bulkCmds, callback) {
2727
var ws = new WritableBulk(bulkExec);
2828
2929
// stream 42 random records into ES
30-
require('random-document-stream')(42).pipe(ws);
30+
require('random-document-stream')(42).pipe(ws).on('finish', done);
3131
```
3232

3333
## Stream search results from Elasticsearch
@@ -53,7 +53,7 @@ ws._write = function(chunk, enc, next) {
5353
next();
5454
};
5555
56-
rs.pipe(ws);
56+
rs.pipe(ws).on('finish', done);
5757
```
5858

5959
If we want to start the stream at an offset and define a limit:
@@ -103,13 +103,37 @@ rs = new ReadableSearch(scrollExec);
103103

104104
## Stream IDs into Elasticsearch multi-get and get documents out.
105105
```
106-
# TODO a duplex stream
106+
var mgetExec = function(docs, callback) {
107+
client.mget({
108+
index: 'myindex',
109+
type: 'mytype',
110+
body: {
111+
docs: { ids: docs }
112+
}
113+
}, callback);
114+
};
115+
ts = new PipableDocs(mgetExec, 4);
116+
117+
// Naive read stream of 12 ids that are numbers
118+
var rs = new Readable({objectMode: true});
119+
rs._read = function() {
120+
for (var i = 0; i < 12; i++) {
121+
rs.push(i);
122+
}
123+
rs.push(null);
124+
};
125+
126+
var ws = new require('stream').Writable({objectMode:true});
127+
ws._write = function(chunk, enc, next) {
128+
console.log(hit._id + ' found: ' + hit._found, hit);
129+
next();
130+
};
131+
132+
rs.pipe(ts).pipe(ws).on('finish', onFinish);
107133
```
108134

109135
## TODO
110136
### Short term
111-
* Document more
112-
* Multi-get as a duplex
113137
* Bulk document errors should be emitted as errors
114138

115139
## Later

index.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
module.exports = {
2-
WritableBulk: require('./lib/writable-bulk'),
3-
ReadableSearch: require('./lib/readable-search')
2+
WritableBulk : require('./lib/writable-bulk'),
3+
ReadableSearch : require('./lib/readable-search'),
4+
PipableDocs : require('./lib/transform-mget')
45
};

lib/transform-mget.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Stream document IDs and transform them into Documents.
3+
*/
4+
'use strict';
5+
var Transform = require('stream').Transform;
6+
7+
module.exports = PipableDocs;
8+
9+
/**
10+
* @param mgetExec an executable query functions that takes 2 arguments:
11+
* a list of document IDs and a callback.
12+
* @param bulkSize number of bulk commands executed at once. 128 by default.
13+
*/
14+
function PipableDocs(mgetExec, corkSize) {
15+
Transform.call(this, {objectMode:true});
16+
this.mgetExec = mgetExec;
17+
this.corkSize = corkSize;
18+
19+
// current mget
20+
this._docIds = [];
21+
}
22+
23+
PipableDocs.prototype = Object.create(Transform.prototype, {constructor: {value: PipableDocs}});
24+
25+
PipableDocs.prototype._transform = function(chunk, encoding, callback) {
26+
this._docIds.push(chunk);
27+
if (this._docIds.length < this.corkSize) {
28+
return callback();
29+
}
30+
this._flush(callback);
31+
};
32+
33+
PipableDocs.prototype._flush = function(callback) {
34+
if (!this._docIds.length) {
35+
return callback();
36+
}
37+
var docIds = this._docIds;
38+
this._docIds = [];
39+
var self = this;
40+
this.mgetExec(docIds, function(e, resp) {
41+
if (e) {
42+
self.emit('error', e);
43+
return callback();
44+
}
45+
for (var i = 0; i < resp.docs.length; i++) {
46+
self.push(resp.docs[i]);
47+
}
48+
callback();
49+
});
50+
};

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "elasticsearch-streams",
33
"description": "Stream in and out of Elasticsearch",
4-
"version": "0.0.2",
4+
"version": "0.0.3",
55
"repository": {
66
"type": "git",
77
"url": "https://github.com/hmalphettes/elasticsearch-streamer.git"

test/test-search.js

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
'use strict';
22
var expect = require('chai').expect;
33
var ReadableSearch = require('..').ReadableSearch;
4+
var PipableDocs = require('..').PipableDocs;
45
var Writable = require('stream').Writable;
5-
var client = new require('elasticsearch').Client();
6+
var Readable = require('stream').Readable;
7+
var client = new require('elasticsearch').Client({log: 'warning'});
68
var random = require('random-document-stream');
79

810
describe('When searching', function() {
@@ -25,7 +27,7 @@ describe('When searching', function() {
2527
});
2628
});
2729
it('Must find 42 records by searching them', function(done) {
28-
checkRecords(rs, 42, done);
30+
checkRecords(rs, null, 42, done);
2931
});
3032
});
3133

@@ -60,18 +62,54 @@ describe('When scrolling', function() {
6062
});
6163
});
6264
it('Must find the 42 records by scroll', function(done) {
63-
checkRecords(rs, 42, done);
65+
checkRecords(rs, null, 42, done);
66+
});
67+
});
68+
69+
describe('When getting documents by id', function() {
70+
var ids;
71+
var ts;
72+
before(function(done) {
73+
ids = populateIndex(42, function(e) {
74+
if (e) { return done(e); }
75+
expect(ids.length).to.equal(42);
76+
var mgetExec = function(docs, callback) {
77+
client.mget({
78+
index: 'myindex',
79+
type: 'mytype',
80+
body: {
81+
docs: { ids: docs }
82+
}
83+
}, callback);
84+
};
85+
ts = new PipableDocs(mgetExec, 4);
86+
done();
87+
});
88+
});
89+
it('Must pipe', function(done) {
90+
var i = -1;
91+
var rs = new Readable({objectMode: true});
92+
rs._read = function() {
93+
i++;
94+
if (i >= ids.length) {
95+
return rs.push(null);
96+
}
97+
rs.push(ids[i]);
98+
};
99+
checkRecords(rs, ts, 42, done);
64100
});
65101
});
66102

67103
function populateIndex(nb, done) {
104+
var ids = [];
68105
client.indices.delete({index:'myindex'}, function() {
69106
var cmds = [];
70107
var generator = random(0);
71108
for (var i = 0; i < nb; i++) {
72109
var rec = generator.makeJunk();
73110
cmds.push({ index: { _index: 'myindex', _type: 'mytype', _id: rec._id } });
74111
cmds.push(rec);
112+
ids.push(rec._id);
75113
}
76114
client.bulk({
77115
body: cmds
@@ -80,28 +118,37 @@ function populateIndex(nb, done) {
80118
client.indices.refresh({index: 'myindex'}, done);
81119
});
82120
});
121+
return ids;
83122
}
84123

85-
function checkRecords(rs, nb, done) {
124+
function checkRecords(rs, ts, nb, done) {
86125
var hits = [];
87126
var err;
88127
var ws = new Writable({objectMode:true});
89128
ws._write = function(chunk, enc, next) {
90129
hits.push(chunk);
91130
expect(chunk._index).to.equal('myindex');
92131
expect(chunk._type).to.equal('mytype');
93-
expect(chunk._score).to.equal(1);
94132
expect(chunk._id).to.exist;
95133
expect(chunk._source.name).to.exist;
96134
next();
97135
};
98-
rs.on('error', function(e) {
136+
var errClosure = function(e) {
99137
err = e;
100-
});
101-
rs.on('end', function() {
138+
};
139+
rs.on('error', errClosure);
140+
ws.on('error', errClosure);
141+
if (ts) {
142+
ts.on('error', errClosure);
143+
}
144+
function onFinish() {
102145
if (err) { return done(err); }
103146
expect(hits.length).to.equal(nb);
104147
done();
105-
});
106-
rs.pipe(ws);
148+
}
149+
if (ts) {
150+
rs.pipe(ts).pipe(ws).on('finish', onFinish);
151+
} else {
152+
rs.pipe(ws).on('finish', onFinish);
153+
}
107154
}

0 commit comments

Comments
 (0)