Skip to content

Commit 80a77e0

Browse files
committed
Initial commit
0 parents  commit 80a77e0

File tree

9 files changed

+429
-0
lines changed

9 files changed

+429
-0
lines changed

.gitignore

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
node_modules
2+
.DS_Store
3+
*~
4+
coverage
5+
Thumbs.db
6+
.bak
7+
.tmp
8+
9+
lib-cov
10+
*.seed
11+
*.log
12+
*.dat
13+
*.out
14+
*.pid
15+
*.gz
16+
17+
pids
18+
logs
19+
results
20+
21+
npm-debug.log
22+
23+
test_migration_remote
24+
pouch__all_dbs__

.jshintrc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
"proto": true,
3+
"browser": true,
4+
"curly": true,
5+
"devel": true,
6+
"eqeqeq": true,
7+
"eqnull": true,
8+
"evil": false,
9+
"immed": false,
10+
"jquery": true,
11+
"latedef": false,
12+
"laxcomma": true,
13+
"newcap": true,
14+
"node": true,
15+
"noempty": true,
16+
"nonew": true,
17+
"predef":
18+
[
19+
"after",
20+
"afterEach",
21+
"before",
22+
"beforeEach",
23+
"describe",
24+
"it",
25+
"unescape",
26+
"setImmediate"
27+
],
28+
"smarttabs": true,
29+
"trailing": false,
30+
"undef": true,
31+
"unused": true,
32+
"strict": false,
33+
"expr": true
34+
}

README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Elasticsearch streams
2+
3+
Expose a Writeable stream for bulk commands and a Readable stream from
4+
hits and documents responses.
5+
6+
Use case: pipe to and from levelup, pouchdb and other friends.
7+
8+
The client is more or less agnostic at the moment.
9+
10+
Examples:
11+
12+
## Stream random records into Elasticsearch
13+
```
14+
var client = new require('elasticsearch').Client();
15+
16+
var bulkExec = function(bulkCmds, callback) {
17+
client.bulk({
18+
index : 'myindex',
19+
type : 'mytype',
20+
body : bulkCmds
21+
}, callback);
22+
};
23+
var ws = new WritableBulk(bulkExec);
24+
require('random-document-stream')(42).pipe(ws);
25+
```
26+
27+
## Stream search results into Elasticsearch
28+
```
29+
var ReadableSearch = require('elasticsearch-streams')
30+
var client = new require('elasticsearch').Client();
31+
32+
var search = {
33+
index: 'myindex',
34+
from: 0,
35+
size: 12,
36+
body: {
37+
query: { match_all: {} }
38+
}
39+
};
40+
var queryExec = client.search.bind(client);
41+
42+
var rs = new ReadableSearch(queryExec, search);
43+
rs.pipe(ws);
44+
```
45+
46+
## TODO
47+
### Short term
48+
* Document more
49+
* Handle errors correctly
50+
51+
## Later
52+
Streaming http client
53+
54+
# LICENSE
55+
elasticsearch-streams is freely distributable under the terms of the MIT license.
56+
57+
Copyright (c) 2014 Sutoiku, Inc.
58+
59+
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
60+
documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
61+
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
62+
persons to whom the Software is furnished to do so, subject to the following conditions:
63+
64+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
65+
Software.
66+
67+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
68+
WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
69+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
70+
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

index.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
module.exports = {
2+
WritableBulk: require('./lib/writable-bulk'),
3+
ReadableSearch: require('./lib/readable-search')
4+
};

lib/readable-hits.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Expose an elasticsearch query that returns hits or docs as a stream of hits or docs.
3+
*
4+
* Expect the query to be a JSON object where the from property defines the offset
5+
* and the limit defines the page size.
6+
* Expect the client to return a parsed JSON.
7+
*/
8+
'use strict';
9+
var Readable = require('stream').Readable;
10+
11+
module.exports = ReadableHits;
12+
13+
/**
14+
* @param queryExec an executable query functions that takes 2 arguments: the query and its callback.
15+
* @param pushKeysFirst true to stream the name of the properties first and values second
16+
* by default false
17+
*/
18+
function ReadableHits(queryExec, query, limit, emitSourceOnly) {
19+
Readable.call(this, {objectMode:true});
20+
this.queryExec = queryExec;
21+
this.query = query;
22+
this.pageSize = query.size || 256;
23+
this.emitSourceOnly = !!emitSourceOnly;
24+
this.total = -1;
25+
this.from = query.form || 0;
26+
this._next = true;
27+
if (limit > 0) {
28+
this.limit = limit;
29+
}
30+
31+
// current iteration through the page
32+
this._hits = [];
33+
this._current = 0;
34+
}
35+
36+
ReadableHits.prototype = Object.create(Readable.prototype, {constructor: {value: ReadableHits}});
37+
38+
ReadableHits.prototype._read = function() {//size) {
39+
this._current++;
40+
if (this._current >= this._hits.length) {
41+
if (!this._next) {
42+
return this.push(null);
43+
}
44+
this._fetchNextPage();
45+
} else {
46+
this._shift();
47+
}
48+
};
49+
50+
ReadableHits.prototype._fetchNextPage = function() {//size) {
51+
this.from += this.pageSize;
52+
var self = this;
53+
this.queryExec(this.query, function(e, resp) {
54+
self._current = 0;
55+
self._hits = resp.hits ? resp.hits.hits : resp.docs.docs;
56+
if (self.from + self.pageSize > self._hits.length) {
57+
self._next = false;
58+
}
59+
self._shift();
60+
});
61+
};
62+
63+
ReadableHits.prototype._shift = function() {//size) {
64+
this.push(this._hits[this._current]);
65+
};

lib/writable-bulk.js

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/**
2+
* Expose a writeable stream and execute it as a set of bulk requests.
3+
*/
4+
'use strict';
5+
6+
var Writable = require('stream').Writable;
7+
8+
module.exports = WritableBulk;
9+
10+
/**
11+
* @param bulkExec closure pass the bulk cmds as an array and a callback
12+
* @param defaults: { op: 'index or create, index by default'
13+
id: 'name of the property that is the id, by default _id',
14+
* _index: 'name of the index or nothing',
15+
* _type: 'name of the type or nothing' }
16+
* nothing to emit an error on unknown document command
17+
*/
18+
function WritableBulk(bulkExec, defaults, bulkSize) {
19+
Writable.call(this, {objectMode:true});
20+
this.bulkExec = bulkExec;
21+
this.bulkSize = bulkSize || 128;
22+
this.defaults = defaults || {};
23+
this.defaults.id = this.defaults.id || '_id';
24+
this.defaults.op = this.defaults.op || 'index';
25+
26+
this.bulk = [];
27+
this.bulkCount = 0;
28+
this.expectingPayload = false;
29+
}
30+
31+
WritableBulk.prototype = Object.create(Writable.prototype, {constructor: {value: WritableBulk}});
32+
33+
/**
34+
* @param chunk a piece of a bulk request as json.
35+
*/
36+
WritableBulk.prototype._write = function(chunk, enc, next) {
37+
if (this.expectingPayload) {
38+
this.bulkCount++;
39+
this.expectingPayload = false;
40+
} else if (chunk.hasOwnProperty(this.defaults.id)) {
41+
var defaultCmd = {};
42+
defaultCmd[this.defaults.op] = {
43+
_index: this.defaults._index,
44+
_type: this.defaults._type,
45+
_id: chunk[this.defaults.id]
46+
};
47+
this.bulk.push(defaultCmd);
48+
this.bulkCount++;
49+
} else {
50+
var willExpectPayload = ['index', 'create', 'update'];
51+
for (var i = 0; i < willExpectPayload.length; i++) {
52+
if (chunk.hasOwnProperty(willExpectPayload[i])) {
53+
this.expectingPayload = willExpectPayload[i];
54+
break;
55+
}
56+
}
57+
if (!this.expectingPayload) {
58+
if (!chunk.hasOwnProperty('delete')) {
59+
this.emit('error', new Error('Unexpected chunk, not index create update delete'));
60+
return next();
61+
}
62+
this.bulkCount++;
63+
}
64+
}
65+
this.bulk.push(chunk);
66+
if (this.bulkSize <= this.bulkCount) {
67+
return this._flushBulk(next);
68+
}
69+
next();
70+
};
71+
72+
WritableBulk.prototype._flushBulk = function(callback) {
73+
if (!this.bulkCount) {
74+
return setImmediate(callback);
75+
}
76+
var self = this;
77+
this.bulkExec(this.bulk, function(e) {
78+
if (e) {
79+
// TODO: better than this?
80+
// - Introspect the response for individual errors
81+
// - Stream out the responses and correlate with the inputs?
82+
self.emit('error', e);
83+
}
84+
self.bulk = [];
85+
self.bulkCount = 0;
86+
self.expectingPayload = false;
87+
callback();
88+
});
89+
};
90+
91+
WritableBulk.prototype.end = function(data) {
92+
var self = this;
93+
if (!data) {
94+
return this._flushBulk(function() {
95+
self.emit('end');
96+
});
97+
}
98+
this._write(data, 'json', function() {
99+
self._flushBulk(function() {
100+
self.emit('end');
101+
});
102+
});
103+
};

package.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "elasticsearch-streams",
3+
"description": "Stream in and out of Elasticsearch",
4+
"version": "0.0.1",
5+
"repository": {
6+
"type": "git",
7+
"url": "https://github.com/hmalphettes/elasticsearch-streamer.git"
8+
},
9+
"main": "index.js",
10+
"scripts": {
11+
"test": "jshint lib/*.js test/*.js && mocha"
12+
},
13+
"author": "Hugues Malphettes",
14+
"license": "MIT",
15+
"keywords": [
16+
"elasticsearch",
17+
"stream"
18+
],
19+
"dependencies": {
20+
},
21+
"devDependencies": {
22+
"chai": "*",
23+
"elasticsearch": "*"
24+
}
25+
}

test/test-search.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
'use strict';
2+
var expect = require('chai').expect;
3+
var ReadableHits = require('../lib/readable-hits');
4+
var Writable = require('stream').Writable;
5+
var client = new require('elasticsearch').Client();
6+
7+
describe('When searching', function() {
8+
var rs;
9+
before(function(done) {
10+
client.bulk({
11+
body: [
12+
{ index: { _index: 'myindex', _type: 'mytype', _id: 1 } },
13+
{ title: 'foo' },
14+
{ index: { _index: 'myindex', _type: 'mytype', _id: 2 } },
15+
{ title: 'bar' },
16+
{ index: { _index: 'myindex', _type: 'mytype', _id: 3 } },
17+
{ title: 'joe' }
18+
]
19+
}, function(e) {
20+
if (e) { return done(e); }
21+
var params = {
22+
index: 'myindex',
23+
from: 0,
24+
size: 12,
25+
body: {
26+
query: { match_all: {} }
27+
}
28+
};
29+
var queryExec = function(params, callback) {
30+
client.search(params, callback);
31+
};
32+
rs = new ReadableHits(queryExec, params);
33+
done();
34+
});
35+
});
36+
it('Must find 3 records', function(done) {
37+
var hits = [];
38+
var err;
39+
var ws = new Writable({objectMode:true});
40+
ws._write = function(chunk, enc, next) {
41+
hits.push(chunk);
42+
expect(chunk._index).to.equal('myindex');
43+
expect(chunk._type).to.equal('mytype');
44+
expect(chunk._score).to.equal(1);
45+
expect(chunk._id).to.exist;
46+
expect(chunk._source.title).to.exist;
47+
next();
48+
};
49+
rs.on('error', function(e) {
50+
err = e;
51+
});
52+
rs.on('end', function() {
53+
if (err) { return done(err); }
54+
expect(hits.length).to.equal(3);
55+
done();
56+
});
57+
rs.pipe(ws);
58+
});
59+
});

0 commit comments

Comments
 (0)