@@ -72,115 +72,127 @@ public RestExportAction(Settings settings, Client client,
7272
7373 @ Override
7474 public void handleRequest (final RestRequest request , RestChannel channel ) {
75- final String index = request .param ("index" );
76- final String type = request .param ("type" );
77- final String [] indices = Strings .splitStringByCommaToArray (request .param ("index" , "_all" ));
78- final String [] types = Strings .splitStringByCommaToArray (request .param ("type" ));
79- final String desc = (index != null ? index : "_all" )
80- + (type != null ? "_" + type : "" );
8175 try {
76+ final String index = request .param ("index" );
77+ final String type = request .param ("type" );
78+ final String desc = (index != null ? index : "_all" )
79+ + (type != null ? "_" + type : "" );
8280 XContentBuilder builder = restContentBuilder (request )
8381 .startObject ()
8482 .field ("ok" , true )
8583 .endObject ();
86-
8784 ClusterHealthResponse healthResponse =
8885 client .admin ().cluster ().prepareHealth ().setWaitForYellowStatus ()
8986 .setTimeout ("30s" ).execute ().actionGet (30000 );
90-
9187 if (healthResponse .isTimedOut ()) {
9288 String msg = "cluster not healthy, cowardly refusing to continue with export" ;
9389 logger .error (msg );
9490 throw new IOException (msg );
9591 }
96-
9792 final String target = request .param ("target" , desc );
9893 String scheme = request .param ("scheme" , "targz" );
9994 for (String codec : StreamCodecService .getCodecs ()) {
10095 if (target .endsWith (codec )) {
10196 scheme = "tar" + codec ;
10297 }
10398 }
104-
10599 ConnectionService service = ConnectionService .getInstance ();
106100 ConnectionFactory factory = service .getConnectionFactory (scheme );
107101 final Connection <Session > connection = factory .getConnection (URI .create (scheme + ":" + target ));
108102 final Session session = connection .createSession ();
109103 session .open (Session .Mode .WRITE );
110-
111104 channel .sendResponse (new XContentRestResponse (request , OK , builder ));
112-
113105 EsExecutors .daemonThreadFactory (settings , "Knapsack export [" + desc + "]" )
114- .newThread (new Thread () {
115- @ Override
116- public void run () {
117- setName ("[Exporter Thread " + desc + "]" );
118- long millis = request .paramAsLong ("millis" , 30000L );
119- int size = request .paramAsInt ("size" , 1000 );
120-
121- final KnapsackStatus status = new KnapsackStatus (indices , types , target );
122- try {
123- logger .info ("starting export to {}" , target );
124- knapsackService .addExport (status );
125- // export settings & mappings
126- for (String s : indices ) {
127- Map <String , String > settings = getSettings (s );
128- for (String index : settings .keySet ()) {
129- session .write (new ElasticPacket (index , "_settings" , null , settings .get (index )));
130- Map <String , String > mappings = getMapping (index , ImmutableSet .copyOf (types ));
131- for (String type : mappings .keySet ()) {
132- session .write (new ElasticPacket (index , type , "_mapping" , mappings .get (type )));
133- }
134- }
135- }
106+ .newThread (new ExportThread (request , connection , session , target )).start ();
107+ } catch (Exception ex ) {
108+ try {
109+ logger .error (ex .getMessage (), ex );
110+ channel .sendResponse (new XContentThrowableRestResponse (request , ex ));
111+ } catch (Exception ex2 ) {
112+ logger .error (ex2 .getMessage (), ex2 );
113+ }
114+ }
115+ }
136116
137- // export document _source fields
138- SearchRequestBuilder searchRequest = client .prepareSearch ()
139- .setSize (size )
140- .setSearchType (SearchType .SCAN )
141- .setScroll (TimeValue .timeValueMillis (millis ));
142- if (indices != null && !"_all" .equals (index )) {
143- searchRequest .setIndices (indices );
144- }
145- if (types != null ) {
146- searchRequest .setTypes (types );
147- }
148- SearchResponse searchResponse = searchRequest .execute ().actionGet ();
149- while (searchResponse .getScrollId () != null ) {
150- searchResponse = client .prepareSearchScroll (searchResponse .getScrollId ())
151- .setScroll (TimeValue .timeValueMillis (millis ))
152- .execute ().actionGet ();
153- if (searchResponse .getHits ().getHits ().length == 0 ) {
154- break ;
155- }
156- for (SearchHit hit : searchResponse .getHits ()) {
157- ElasticPacket packet = new ElasticPacket (hit .getIndex (), hit .getType (), hit .getId (), hit .getSourceAsString ());
158- session .write (packet );
159- }
160- }
117+ private class ExportThread extends Thread {
118+
119+ private final RestRequest request ;
120+
121+ private final Connection <Session > connection ;
161122
162- session .close ();
163- connection .close ();
123+ private final Session session ;
164124
165- logger . info ( "export to {} completed" , target ) ;
125+ private final String target ;
166126
167- } catch (Exception e ) {
168- logger .error (e .getMessage (), e );
169- } finally {
170- try {
171- knapsackService .removeExport (status );
172- } catch (IOException e ) {
173- logger .error (e .getMessage (), e );
127+ private final String [] indices ;
128+
129+ private final String [] types ;
130+
131+ public ExportThread (RestRequest request , Connection <Session > connection ,
132+ Session session , String target ) {
133+ this .request = request ;
134+ this .connection = connection ;
135+ this .session = session ;
136+ this .target = target ;
137+ this .indices = Strings .splitStringByCommaToArray (request .param ("index" , "_all" ));
138+ this .types = request .param ("type" ) != null ?
139+ Strings .splitStringByCommaToArray (request .param ("type" )) : null ;
140+ }
141+
142+ @ Override
143+ public void run () {
144+ long millis = request .paramAsLong ("millis" , 30000L );
145+ int size = request .paramAsInt ("size" , 1000 );
146+ final KnapsackStatus status = new KnapsackStatus (indices , types , target );
147+ try {
148+ logger .info ("starting export to {}" , target );
149+ knapsackService .addExport (status );
150+ // export settings & mappings
151+ for (String s : indices ) {
152+ Map <String , String > settings = getSettings (s );
153+ for (String index : settings .keySet ()) {
154+ session .write (new ElasticPacket (index , "_settings" , null , settings .get (index )));
155+ Map <String , String > mappings = getMapping (index , ImmutableSet .copyOf (types ));
156+ for (String type : mappings .keySet ()) {
157+ session .write (new ElasticPacket (index , type , "_mapping" , mappings .get (type )));
174158 }
175159 }
176160 }
177- }).start ();
178-
179- } catch (IOException ex ) {
180- try {
181- channel .sendResponse (new XContentThrowableRestResponse (request , ex ));
182- } catch (Exception ex2 ) {
183- logger .error (ex2 .getMessage (), ex2 );
161+ // export document _source fields
162+ SearchRequestBuilder searchRequest = client .prepareSearch ()
163+ .setSize (size )
164+ .setSearchType (SearchType .SCAN )
165+ .setScroll (TimeValue .timeValueMillis (millis ));
166+ if (indices != null && !"_all" .equals (indices [0 ])) {
167+ searchRequest .setIndices (indices );
168+ }
169+ if (types != null ) {
170+ searchRequest .setTypes (types );
171+ }
172+ SearchResponse searchResponse = searchRequest .execute ().actionGet ();
173+ while (searchResponse .getScrollId () != null ) {
174+ searchResponse = client .prepareSearchScroll (searchResponse .getScrollId ())
175+ .setScroll (TimeValue .timeValueMillis (millis ))
176+ .execute ().actionGet ();
177+ if (searchResponse .getHits ().getHits ().length == 0 ) {
178+ break ;
179+ }
180+ for (SearchHit hit : searchResponse .getHits ()) {
181+ ElasticPacket packet = new ElasticPacket (hit .getIndex (), hit .getType (), hit .getId (), hit .getSourceAsString ());
182+ session .write (packet );
183+ }
184+ }
185+ session .close ();
186+ connection .close ();
187+ logger .info ("export to {} completed" , target );
188+ } catch (Exception e ) {
189+ logger .error (e .getMessage (), e );
190+ } finally {
191+ try {
192+ knapsackService .removeExport (status );
193+ } catch (IOException e ) {
194+ logger .error (e .getMessage (), e );
195+ }
184196 }
185197 }
186198 }
0 commit comments