@@ -62,104 +62,127 @@ func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
6262
6363func TestBulkIndexer (t * testing.T ) {
6464 t .Run ("Basic" , func (t * testing.T ) {
65- var (
66- wg sync.WaitGroup
65+ tests := []struct {
66+ name string
67+ makeClient func (cfg elasticsearch.Config ) (esapi.Transport , error )
68+ }{
69+ {
70+ name : "Client" ,
71+ makeClient : func (cfg elasticsearch.Config ) (esapi.Transport , error ) {
72+ return elasticsearch .NewClient (cfg )
73+ },
74+ },
75+ {
76+ name : "TypedClient" ,
77+ makeClient : func (cfg elasticsearch.Config ) (esapi.Transport , error ) {
78+ return elasticsearch .NewTypedClient (cfg )
79+ },
80+ },
81+ }
82+ for _ , tt := range tests {
83+ tt := tt
6784
68- countReqs int
69- testfile string
70- numItems = 6
71- )
85+ t .Run (tt .name , func (t * testing.T ) {
86+ var (
87+ wg sync.WaitGroup
88+
89+ countReqs int
90+ testfile string
91+ numItems = 6
92+ )
93+
94+ es , _ := tt .makeClient (elasticsearch.Config {Transport : & mockTransport {
95+ RoundTripFunc : func (* http.Request ) (* http.Response , error ) {
96+ countReqs ++
97+ switch countReqs {
98+ case 1 :
99+ testfile = "testdata/bulk_response_1a.json"
100+ case 2 :
101+ testfile = "testdata/bulk_response_1b.json"
102+ case 3 :
103+ testfile = "testdata/bulk_response_1c.json"
104+ }
105+ bodyContent , _ := ioutil .ReadFile (testfile )
106+ return & http.Response {
107+ Body : ioutil .NopCloser (bytes .NewBuffer (bodyContent )),
108+ Header : http.Header {"X-Elastic-Product" : []string {"Elasticsearch" }},
109+ }, nil
110+ },
111+ }})
72112
73- es , _ := elasticsearch .NewClient (elasticsearch.Config {Transport : & mockTransport {
74- RoundTripFunc : func (* http.Request ) (* http.Response , error ) {
75- countReqs ++
76- switch countReqs {
77- case 1 :
78- testfile = "testdata/bulk_response_1a.json"
79- case 2 :
80- testfile = "testdata/bulk_response_1b.json"
81- case 3 :
82- testfile = "testdata/bulk_response_1c.json"
113+ cfg := BulkIndexerConfig {
114+ NumWorkers : 1 ,
115+ FlushBytes : 39 * 2 , // 38 bytes header + body, times 2 to match 2 responses per file in testdata
116+ FlushInterval : time .Hour , // Disable auto-flushing, because response doesn't match number of items
117+ Client : es }
118+ if os .Getenv ("DEBUG" ) != "" {
119+ cfg .DebugLogger = log .New (os .Stdout , "" , 0 )
83120 }
84- bodyContent , _ := ioutil .ReadFile (testfile )
85- return & http.Response {
86- Body : ioutil .NopCloser (bytes .NewBuffer (bodyContent )),
87- Header : http.Header {"X-Elastic-Product" : []string {"Elasticsearch" }},
88- }, nil
89- },
90- }})
91121
92- cfg := BulkIndexerConfig {
93- NumWorkers : 1 ,
94- FlushBytes : 39 * 2 , // 38 bytes header + body, times 2 to match 2 responses per file in testdata
95- FlushInterval : time .Hour , // Disable auto-flushing, because response doesn't match number of items
96- Client : es }
97- if os .Getenv ("DEBUG" ) != "" {
98- cfg .DebugLogger = log .New (os .Stdout , "" , 0 )
99- }
100-
101- bi , _ := NewBulkIndexer (cfg )
122+ bi , _ := NewBulkIndexer (cfg )
123+
124+ for i := 1 ; i <= numItems ; i ++ {
125+ wg .Add (1 )
126+ go func (i int ) {
127+ defer wg .Done ()
128+ err := bi .Add (context .Background (), BulkIndexerItem {
129+ Action : "foo" ,
130+ DocumentID : strconv .Itoa (i ),
131+ Body : strings .NewReader (fmt .Sprintf (`{"title":"foo-%d"}` , i )),
132+ })
133+ if err != nil {
134+ t .Errorf ("Unexpected error: %s" , err )
135+ return
136+ }
137+ }(i )
138+ }
139+ wg .Wait ()
102140
103- for i := 1 ; i <= numItems ; i ++ {
104- wg .Add (1 )
105- go func (i int ) {
106- defer wg .Done ()
107- err := bi .Add (context .Background (), BulkIndexerItem {
108- Action : "foo" ,
109- DocumentID : strconv .Itoa (i ),
110- Body : strings .NewReader (fmt .Sprintf (`{"title":"foo-%d"}` , i )),
111- })
112- if err != nil {
141+ if err := bi .Close (context .Background ()); err != nil {
113142 t .Errorf ("Unexpected error: %s" , err )
114- return
115143 }
116- }(i )
117- }
118- wg .Wait ()
119144
120- if err := bi .Close (context .Background ()); err != nil {
121- t .Errorf ("Unexpected error: %s" , err )
122- }
145+ stats := bi .Stats ()
123146
124- stats := bi .Stats ()
125-
126- // added = numitems
127- if stats .NumAdded != uint64 (numItems ) {
128- t .Errorf ("Unexpected NumAdded: want=%d, got=%d" , numItems , stats .NumAdded )
129- }
147+ // added = numitems
148+ if stats .NumAdded != uint64 (numItems ) {
149+ t .Errorf ("Unexpected NumAdded: want=%d, got=%d" , numItems , stats .NumAdded )
150+ }
130151
131- // flushed = numitems - 1x conflict + 1x not_found
132- if stats .NumFlushed != uint64 (numItems - 2 ) {
133- t .Errorf ("Unexpected NumFlushed: want=%d, got=%d" , numItems - 2 , stats .NumFlushed )
134- }
152+ // flushed = numitems - 1x conflict + 1x not_found
153+ if stats .NumFlushed != uint64 (numItems - 2 ) {
154+ t .Errorf ("Unexpected NumFlushed: want=%d, got=%d" , numItems - 2 , stats .NumFlushed )
155+ }
135156
136- // failed = 1x conflict + 1x not_found
137- if stats .NumFailed != 2 {
138- t .Errorf ("Unexpected NumFailed: want=%d, got=%d" , 2 , stats .NumFailed )
139- }
157+ // failed = 1x conflict + 1x not_found
158+ if stats .NumFailed != 2 {
159+ t .Errorf ("Unexpected NumFailed: want=%d, got=%d" , 2 , stats .NumFailed )
160+ }
140161
141- // indexed = 1x
142- if stats .NumIndexed != 1 {
143- t .Errorf ("Unexpected NumIndexed: want=%d, got=%d" , 1 , stats .NumIndexed )
144- }
162+ // indexed = 1x
163+ if stats .NumIndexed != 1 {
164+ t .Errorf ("Unexpected NumIndexed: want=%d, got=%d" , 1 , stats .NumIndexed )
165+ }
145166
146- // created = 1x
147- if stats .NumCreated != 1 {
148- t .Errorf ("Unexpected NumCreated: want=%d, got=%d" , 1 , stats .NumCreated )
149- }
167+ // created = 1x
168+ if stats .NumCreated != 1 {
169+ t .Errorf ("Unexpected NumCreated: want=%d, got=%d" , 1 , stats .NumCreated )
170+ }
150171
151- // deleted = 1x
152- if stats .NumDeleted != 1 {
153- t .Errorf ("Unexpected NumDeleted: want=%d, got=%d" , 1 , stats .NumDeleted )
154- }
172+ // deleted = 1x
173+ if stats .NumDeleted != 1 {
174+ t .Errorf ("Unexpected NumDeleted: want=%d, got=%d" , 1 , stats .NumDeleted )
175+ }
155176
156- if stats .NumUpdated != 1 {
157- t .Errorf ("Unexpected NumUpdated: want=%d, got=%d" , 1 , stats .NumUpdated )
158- }
177+ if stats .NumUpdated != 1 {
178+ t .Errorf ("Unexpected NumUpdated: want=%d, got=%d" , 1 , stats .NumUpdated )
179+ }
159180
160- // 3 items * 40 bytes, 2 workers, 1 request per worker
161- if stats .NumRequests != 3 {
162- t .Errorf ("Unexpected NumRequests: want=%d, got=%d" , 3 , stats .NumRequests )
181+ // 3 items * 40 bytes, 2 workers, 1 request per worker
182+ if stats .NumRequests != 3 {
183+ t .Errorf ("Unexpected NumRequests: want=%d, got=%d" , 3 , stats .NumRequests )
184+ }
185+ })
163186 }
164187 })
165188
0 commit comments