Skip to content

Commit c75a716

Browse files
committed
Support to parse the hit inside the ReadableStream
Sometimes it is too much a bother to setup a Transformer stream just to execute a simple transformation of the hits returned by Elasticsearch. Example: ``` var num = 1; new ReadableStream(bulkExec, function parseHit(hit) { return { id: hit._id, version: hit._version, hitNum: num++ }; }); ```
1 parent c86f505 commit c75a716

File tree

4 files changed

+9
-5
lines changed

4 files changed

+9
-5
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ var scrollExec = function scrollExec(from, callback) {
8989
client.search({
9090
index: 'myindex',
9191
scroll: '20s',
92-
size: '3',
92+
size: 42,
9393
body: {
9494
query: { match_all: {} }
9595
}

lib/readable-search.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ module.exports = ReadableHits;
1313
/**
1414
* @param queryExec an executable query functions that takes 2 arguments: the offset and a callback.
1515
*/
16-
function ReadableHits(queryExec) {
16+
function ReadableHits(queryExec, parseHit) {
1717
if (!(this instanceof ReadableHits)) {
1818
return new ReadableHits(queryExec);
1919
}
@@ -26,6 +26,7 @@ function ReadableHits(queryExec) {
2626
// current iteration through the page
2727
this._hits = [];
2828
this._current = 0;
29+
this.parseHit = parseHit || identity;
2930
}
3031

3132
ReadableHits.prototype = Object.create(Readable.prototype, {constructor: {value: ReadableHits}});
@@ -68,5 +69,9 @@ ReadableHits.prototype._fetchNextPage = function() {//size) {
6869
};
6970

7071
ReadableHits.prototype._shift = function() {
71-
this.push(this._hits[this._current]);
72+
this.push(this.parseHit(this._hits[this._current]));
7273
};
74+
75+
function identity(hit) {
76+
return hit;
77+
}

lib/writable-bulk.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ WritableBulk.prototype._write = function(chunk, enc, next) {
4444
}
4545
if (!this.expectingPayload) {
4646
if (!chunk.hasOwnProperty('delete')) {
47-
console.log('humf', chunk);
4847
this.emit('error', new Error('Unexpected chunk, not an ' +
4948
'index/create/update/delete command and ' +
5049
'not a document to index either'));

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "elasticsearch-streams",
33
"description": "Stream in and out of Elasticsearch",
4-
"version": "0.0.5",
4+
"version": "0.0.6",
55
"repository": {
66
"type": "git",
77
"url": "https://github.com/hmalphettes/elasticsearch-streamer.git"

0 commit comments

Comments
 (0)