@@ -63,6 +63,7 @@ WritableBulk.prototype._write = function(chunk, enc, next) {
63
63
}
64
64
if ( ! this . expectingPayload ) {
65
65
if ( ! chunk . hasOwnProperty ( 'delete' ) ) {
66
+ console . log ( 'humf' , chunk ) ;
66
67
this . emit ( 'error' , new Error ( 'Unexpected chunk, not an ' +
67
68
'index/create/update/delete command and ' +
68
69
'not a document to index either' ) ) ;
@@ -83,13 +84,19 @@ WritableBulk.prototype._flushBulk = function(callback) {
83
84
return setImmediate ( callback ) ;
84
85
}
85
86
var self = this ;
86
- this . bulkExec ( this . bulk , function ( e ) {
87
+ this . bulkExec ( this . bulk , function ( e , resp ) {
87
88
if ( e ) {
88
- // TODO: better than this?
89
- // - Introspect the response for individual errors
90
- // - Stream out the responses and correlate with the inputs?
91
89
self . emit ( 'error' , e ) ;
92
90
}
91
+ if ( resp . errors && resp . items ) {
92
+ for ( var i = 0 ; i < resp . items . length ; i ++ ) {
93
+ var bulkItemResp = resp . items [ i ] ;
94
+ var key = Object . keys ( bulkItemResp ) [ 0 ] ;
95
+ if ( bulkItemResp [ key ] . error ) {
96
+ self . emit ( 'error' , new Error ( bulkItemResp [ key ] . error ) ) ;
97
+ }
98
+ }
99
+ }
93
100
self . bulk = [ ] ;
94
101
self . bulkCount = 0 ;
95
102
self . expectingPayload = false ;
@@ -101,12 +108,12 @@ WritableBulk.prototype.end = function(data) {
101
108
var self = this ;
102
109
if ( ! data ) {
103
110
return this . _flushBulk ( function ( ) {
104
- self . emit ( 'end ' ) ;
111
+ self . emit ( 'finish ' ) ;
105
112
} ) ;
106
113
}
107
114
this . _write ( data , 'json' , function ( ) {
108
115
self . _flushBulk ( function ( ) {
109
- self . emit ( 'end ' ) ;
116
+ self . emit ( 'finish ' ) ;
110
117
} ) ;
111
118
} ) ;
112
119
} ;
0 commit comments