Skip to content

Commit dfd5de9

Browse files
committed
Abstract query execution. Scroll/scan example
1 parent def1cb7 commit dfd5de9

File tree

6 files changed

+175
-77
lines changed

6 files changed

+175
-77
lines changed

README.md

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ hits and documents responses.
55

66
Use case: pipe to and from levelup, pouchdb and other friends.
77

8-
The client that executes the requests is more or less abstracted at the moment.
9-
Examples and tests are using the official client.
8+
The client that executes the requests is wrapped in a closure.
9+
It is expected to provide the Elasticsearch reponse's body as a JSON.
10+
11+
See the examples and tests with the official Elasticsearch-js client.
1012

1113
# Examples:
1214

@@ -33,17 +35,18 @@ require('random-document-stream')(42).pipe(ws);
3335
var ReadableSearch = require('elasticsearch-streams').ReadableSearch;
3436
var client = new require('elasticsearch').Client();
3537
36-
var search = {
37-
index: 'myindex',
38-
from: 0,
39-
size: 12,
40-
body: {
41-
query: { match_all: {} }
42-
}
38+
var searchExec = function searchExec(from, callback) {
39+
client.search({
40+
index: 'myindex',
41+
from: from,
42+
size: 12,
43+
body: {
44+
query: { match_all: {} }
45+
}
46+
}, callback);
4347
};
44-
var queryExec = client.search.bind(client);
4548
46-
var rs = new ReadableSearch(queryExec, search);
49+
var rs = new ReadableSearch(searchExec);
4750
var ws = new require('stream').Writable({objectMode:true});
4851
ws._write = function(chunk, enc, next) {
4952
console.log('a hit', hit);
@@ -53,15 +56,45 @@ ws._write = function(chunk, enc, next) {
5356
rs.pipe(ws);
5457
```
5558

59+
## Stream scroll/scan results from Elasticsearch
60+
```
61+
var scrollExec = function scrollExec(from, callback) {
62+
if (this.scroll_id) {
63+
return client.scroll({
64+
scrollId : this.scroll_id,
65+
scroll : '30s'
66+
}, callback);
67+
}
68+
// get a scroll id first
69+
var self = this;
70+
client.search({
71+
index: 'myindex',
72+
scroll: '20s',
73+
size: '3',
74+
body: {
75+
query: { match_all: {} }
76+
}
77+
}, function(e, resp) {
78+
self.scroll_id = resp._scroll_id;
79+
callback(e, resp);
80+
});
81+
};
82+
rs = new ReadableSearch(scrollExec);
83+
```
84+
85+
## Stream IDs into Elasticsearch multi-get and get documents out.
86+
```
87+
# TODO a duplex stream
88+
```
89+
5690
## TODO
5791
### Short term
5892
* Document more
5993
* Multi-get as a duplex
60-
* Bulk document errors emitted as errors.
61-
* Search stream: access to total, aggregations etc
94+
* Bulk document errors should be emitted as errors
6295

6396
## Later
64-
Streaming http client or transport
97+
Streaming http client or elasticsearch-js streaming transport.
6598

6699
# LICENSE
67100
elasticsearch-streams is freely distributable under the terms of the MIT license.

lib/readable-search.js

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,14 @@ var Readable = require('stream').Readable;
1111
module.exports = ReadableHits;
1212

1313
/**
14-
* @param queryExec an executable query functions that takes 2 arguments: the query and its callback.
15-
* @param pushKeysFirst true to stream the name of the properties first and values second
16-
* by default false
14+
* @param queryExec an executable query functions that takes 2 arguments: the offset and a callback.
15+
* @param limit optional number of hits to stream
1716
*/
18-
function ReadableHits(queryExec, query, limit, emitSourceOnly) {
17+
function ReadableHits(queryExec, limit) {
1918
Readable.call(this, {objectMode:true});
2019
this.queryExec = queryExec;
21-
this.query = query;
22-
this.pageSize = query.size || 256;
23-
this.emitSourceOnly = !!emitSourceOnly;
2420
this.total = -1;
25-
this.from = query.form || 0;
21+
this.from = 0;
2622
this._next = true;
2723
if (limit > 0) {
2824
this.limit = limit;
@@ -48,18 +44,30 @@ ReadableHits.prototype._read = function() {//size) {
4844
};
4945

5046
ReadableHits.prototype._fetchNextPage = function() {//size) {
51-
this.from += this.pageSize;
5247
var self = this;
53-
this.queryExec(this.query, function(e, resp) {
48+
this.queryExec(this.from, function(e, resp) {
5449
self._current = 0;
55-
self._hits = resp.hits ? resp.hits.hits : resp.docs.docs;
56-
if (self.from + self.pageSize > self._hits.length) {
50+
if (e) {
51+
self.hits = [];
5752
self._next = false;
53+
self.emit('error', e);
54+
return this.push(null);
55+
}
56+
self.total = resp.hits.total;
57+
self._hits = resp.hits.hits;
58+
self.from += self._hits.length;
59+
if (self.from >= self.total) {
60+
self._next = false; // we got them all.
61+
}
62+
if (!self._hits.length) {
63+
// nothing here: end the stream.
64+
// self._next = false; // precaution but we should not really need this line.
65+
return self.push(null);
5866
}
5967
self._shift();
6068
});
6169
};
6270

63-
ReadableHits.prototype._shift = function() {//size) {
71+
ReadableHits.prototype._shift = function() {
6472
this.push(this._hits[this._current]);
6573
};

lib/writable-bulk.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,23 @@ var Writable = require('stream').Writable;
88
module.exports = WritableBulk;
99

1010
/**
11-
* @param bulkExec closure pass the bulk cmds as an array and a callback
11+
* @param bulkExec closure invoked with the bulk cmds as an array and a callback
1212
* @param defaults: { op: 'index or create, index by default'
13-
id: 'name of the property that is the id, by default _id',
13+
* id: 'name of the property that is the id, by default _id',
1414
* _index: 'name of the index or nothing',
1515
* _type: 'name of the type or nothing' }
1616
* nothing to emit an error on unknown document command
17+
* @param bulkSize number of bulk commands executed at once. 128 by default.
1718
*/
1819
function WritableBulk(bulkExec, defaults, bulkSize) {
1920
Writable.call(this, {objectMode:true});
2021
this.bulkExec = bulkExec;
22+
23+
if (!bulkSize && typeof defaults === 'number') {
24+
bulkSize = defaults;
25+
defaults = undefined;
26+
}
27+
2128
this.bulkSize = bulkSize || 128;
2229
this.defaults = defaults || {};
2330
this.defaults.id = this.defaults.id || '_id';
@@ -56,7 +63,9 @@ WritableBulk.prototype._write = function(chunk, enc, next) {
5663
}
5764
if (!this.expectingPayload) {
5865
if (!chunk.hasOwnProperty('delete')) {
59-
this.emit('error', new Error('Unexpected chunk, not index create update delete'));
66+
this.emit('error', new Error('Unexpected chunk, not an ' +
67+
'index/create/update/delete command and ' +
68+
'not a document to index either'));
6069
return next();
6170
}
6271
this.bulkCount++;

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "elasticsearch-streams",
33
"description": "Stream in and out of Elasticsearch",
4-
"version": "0.0.1",
4+
"version": "0.0.2",
55
"repository": {
66
"type": "git",
77
"url": "https://github.com/hmalphettes/elasticsearch-streamer.git"

test/test-search.js

Lines changed: 90 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,57 +3,105 @@ var expect = require('chai').expect;
33
var ReadableSearch = require('..').ReadableSearch;
44
var Writable = require('stream').Writable;
55
var client = new require('elasticsearch').Client();
6+
var random = require('random-document-stream');
67

78
describe('When searching', function() {
89
var rs;
910
before(function(done) {
10-
client.bulk({
11-
body: [
12-
{ index: { _index: 'myindex', _type: 'mytype', _id: 1 } },
13-
{ title: 'foo' },
14-
{ index: { _index: 'myindex', _type: 'mytype', _id: 2 } },
15-
{ title: 'bar' },
16-
{ index: { _index: 'myindex', _type: 'mytype', _id: 3 } },
17-
{ title: 'joe' }
18-
]
19-
}, function(e) {
11+
populateIndex(42, function(e) {
2012
if (e) { return done(e); }
21-
var params = {
22-
index: 'myindex',
23-
from: 0,
24-
size: 12,
25-
body: {
26-
query: { match_all: {} }
27-
}
13+
var queryExec = function(from, callback) {
14+
client.search({
15+
index: 'myindex',
16+
from: from,
17+
size: 2,
18+
body: {
19+
query: { match_all: {} }
20+
}
21+
}, callback);
2822
};
29-
var queryExec = function(params, callback) {
30-
client.search(params, callback);
31-
};
32-
rs = new ReadableSearch(queryExec, params);
23+
rs = new ReadableSearch(queryExec);
3324
done();
3425
});
3526
});
36-
it('Must find 3 records', function(done) {
37-
var hits = [];
38-
var err;
39-
var ws = new Writable({objectMode:true});
40-
ws._write = function(chunk, enc, next) {
41-
hits.push(chunk);
42-
expect(chunk._index).to.equal('myindex');
43-
expect(chunk._type).to.equal('mytype');
44-
expect(chunk._score).to.equal(1);
45-
expect(chunk._id).to.exist;
46-
expect(chunk._source.title).to.exist;
47-
next();
48-
};
49-
rs.on('error', function(e) {
50-
err = e;
51-
});
52-
rs.on('end', function() {
53-
if (err) { return done(err); }
54-
expect(hits.length).to.equal(3);
27+
it('Must find 42 records by searching them', function(done) {
28+
checkRecords(rs, 42, done);
29+
});
30+
});
31+
32+
describe('When scrolling', function() {
33+
var rs;
34+
before(function(done) {
35+
populateIndex(42, function(e) {
36+
if (e) { return done(e); }
37+
var queryExec = function queryExec(from, callback) {
38+
if (this.scroll_id) {
39+
return client.scroll({
40+
scrollId : this.scroll_id,
41+
scroll : '30s'
42+
}, callback);
43+
}
44+
// get a scroll id first
45+
var self = this;
46+
client.search({
47+
index: 'myindex',
48+
scroll: '20s',
49+
size: '3',
50+
body: {
51+
query: { match_all: {} }
52+
}
53+
}, function(e, resp) {
54+
self.scroll_id = resp._scroll_id;
55+
callback(e, resp);
56+
});
57+
};
58+
rs = new ReadableSearch(queryExec);
5559
done();
5660
});
57-
rs.pipe(ws);
5861
});
59-
});
62+
it('Must find the 42 records by scroll', function(done) {
63+
checkRecords(rs, 42, done);
64+
});
65+
});
66+
67+
function populateIndex(nb, done) {
68+
client.indices.delete({index:'myindex'}, function() {
69+
var cmds = [];
70+
var generator = random(0);
71+
for (var i = 0; i < nb; i++) {
72+
var rec = generator.makeJunk();
73+
cmds.push({ index: { _index: 'myindex', _type: 'mytype', _id: rec._id } });
74+
cmds.push(rec);
75+
}
76+
client.bulk({
77+
body: cmds
78+
}, function(e) {
79+
if (e) { return done(e); }
80+
client.indices.refresh({index: 'myindex'}, done);
81+
});
82+
});
83+
}
84+
85+
function checkRecords(rs, nb, done) {
86+
var hits = [];
87+
var err;
88+
var ws = new Writable({objectMode:true});
89+
ws._write = function(chunk, enc, next) {
90+
hits.push(chunk);
91+
expect(chunk._index).to.equal('myindex');
92+
expect(chunk._type).to.equal('mytype');
93+
expect(chunk._score).to.equal(1);
94+
expect(chunk._id).to.exist;
95+
expect(chunk._source.name).to.exist;
96+
next();
97+
};
98+
rs.on('error', function(e) {
99+
err = e;
100+
});
101+
rs.on('end', function() {
102+
if (err) { return done(err); }
103+
expect(hits.length).to.equal(nb);
104+
done();
105+
});
106+
rs.pipe(ws);
107+
}

test/test-write.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ describe('When writing', function() {
99
before(function(done) {
1010
var bulkExec = function(bulkCmds, callback) {
1111
client.bulk({
12-
index : 'myindex2',
13-
type : 'mytype2',
12+
index : 'myindex',
13+
type : 'mytype',
1414
body : bulkCmds
1515
}, callback);
1616
};
@@ -33,8 +33,8 @@ describe('When writing', function() {
3333
it('Must have indexed 42 docs', function(done) {
3434
client.indices.refresh({ index: 'myindex2' }, function() {
3535
client.count({
36-
index: 'myindex2',
37-
type: 'mytype2'
36+
index: 'myindex',
37+
type: 'mytype'
3838
}, function(e, res) {
3939
if (e) { return done(e); }
4040
expect(res.count).to.equal(42);

0 commit comments

Comments
 (0)