|
6 | 6 | import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; |
7 | 7 | import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; |
8 | 8 | import org.elasticsearch.action.index.IndexRequest; |
9 | | -import org.elasticsearch.action.search.SearchOperationThreading; |
10 | 9 | import org.elasticsearch.action.search.SearchRequest; |
11 | 10 | import org.elasticsearch.action.search.SearchResponse; |
12 | 11 | import org.elasticsearch.action.search.SearchType; |
|
37 | 36 | import org.elasticsearch.rest.RestController; |
38 | 37 | import org.elasticsearch.rest.RestHandler; |
39 | 38 | import org.elasticsearch.rest.RestRequest; |
40 | | -import org.elasticsearch.rest.XContentRestResponse; |
41 | | -import org.elasticsearch.rest.XContentThrowableRestResponse; |
42 | 39 | import org.elasticsearch.rest.action.search.RestSearchAction; |
43 | 40 | import org.elasticsearch.search.SearchHit; |
44 | 41 |
|
45 | 42 | import org.xbib.elasticsearch.plugin.knapsack.KnapsackHelper; |
46 | 43 | import org.xbib.elasticsearch.plugin.knapsack.KnapsackPacket; |
47 | 44 | import org.xbib.elasticsearch.plugin.knapsack.KnapsackStatus; |
48 | | -import org.xbib.io.ObjectPacket; |
49 | | -import org.xbib.elasticsearch.support.client.bulk.BulkClient; |
| 45 | +import org.xbib.elasticsearch.rest.action.support.XContentRestResponse; |
| 46 | +import org.xbib.elasticsearch.rest.action.support.XContentThrowableRestResponse; |
| 47 | +import org.xbib.elasticsearch.support.client.bulk.BulkTransportClient; |
50 | 48 | import org.xbib.io.Connection; |
51 | 49 | import org.xbib.io.ConnectionFactory; |
52 | 50 | import org.xbib.io.ConnectionService; |
| 51 | +import org.xbib.io.ObjectPacket; |
53 | 52 | import org.xbib.io.Session; |
54 | 53 |
|
55 | 54 | import java.io.IOException; |
|
68 | 67 | import static org.elasticsearch.rest.RestRequest.Method.POST; |
69 | 68 | import static org.elasticsearch.rest.RestStatus.OK; |
70 | 69 | import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; |
71 | | -import static org.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder; |
| 70 | +import static org.xbib.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder; |
72 | 71 |
|
73 | 72 | /** |
74 | 73 | * The knapsack export action performs a scan/scroll action over a user defined query |
@@ -170,7 +169,7 @@ private class ExportThread extends Thread { |
170 | 169 |
|
171 | 170 | private final RestRequest request; |
172 | 171 |
|
173 | | - private BulkClient bulkClient; |
| 172 | + private BulkTransportClient bulkClient; |
174 | 173 |
|
175 | 174 | private Connection<Session<KnapsackPacket>> connection; |
176 | 175 |
|
@@ -217,7 +216,7 @@ public void run() { |
217 | 216 | try { |
218 | 217 | logger.info("start of export: {}", status); |
219 | 218 | knapsackHelper.addExport(status); |
220 | | - this.bulkClient = new BulkClient(); |
| 219 | + this.bulkClient = new BulkTransportClient(); |
221 | 220 | Map<String,Set<String>> indices = newHashMap(); |
222 | 221 | for (String s : Strings.commaDelimitedListToSet(request.param("index", "_all"))) { |
223 | 222 | indices.put(s, Strings.commaDelimitedListToSet(request.param("type"))); |
@@ -300,14 +299,6 @@ public void run() { |
300 | 299 | request.params().put("size", request.param("maxActionsPerBulkRequest", "1000")); |
301 | 300 | searchRequest = RestSearchAction.parseSearchRequest(request); |
302 | 301 | searchRequest.listenerThreaded(false); |
303 | | - SearchOperationThreading operationThreading = |
304 | | - SearchOperationThreading.fromString(request.param("operation_threading"), null); |
305 | | - if (operationThreading != null) { |
306 | | - if (operationThreading == SearchOperationThreading.NO_THREADS) { |
307 | | - operationThreading = SearchOperationThreading.SINGLE_THREAD; |
308 | | - } |
309 | | - searchRequest.operationThreading(operationThreading); |
310 | | - } |
311 | 302 | for (String index : indices.keySet()) { |
312 | 303 | searchRequest.searchType(SearchType.SCAN).scroll(timeout); |
313 | 304 | if (!"_all".equals(index)) { |
@@ -377,8 +368,10 @@ public void run() { |
377 | 368 | session.close(); |
378 | 369 | } |
379 | 370 | if (copy) { |
380 | | - bulkClient.refresh().shutdown(); |
381 | | - bulkClient = null; |
| 371 | + for (String index : indices.keySet()) { |
| 372 | + bulkClient.refresh(index); |
| 373 | + } |
| 374 | + bulkClient.shutdown(); |
382 | 375 | } |
383 | 376 | if (s3 && request.param("uri") != null) { |
384 | 377 | // s3://auth:host, s3://host |
|
0 commit comments