1111import org .apache .logging .log4j .LogManager ;
1212import org .apache .logging .log4j .Logger ;
1313import org .elasticsearch .ResourceAlreadyExistsException ;
14+ import org .elasticsearch .ResourceNotFoundException ;
1415import org .elasticsearch .action .ActionListener ;
1516import org .elasticsearch .client .Client ;
17+ import org .elasticsearch .client .OriginSettingClient ;
1618import org .elasticsearch .cluster .ClusterChangedEvent ;
1719import org .elasticsearch .cluster .ClusterStateListener ;
1820import org .elasticsearch .cluster .service .ClusterService ;
1921import org .elasticsearch .common .settings .Setting ;
2022import org .elasticsearch .common .settings .Settings ;
23+ import org .elasticsearch .gateway .GatewayService ;
24+ import org .elasticsearch .ingest .IngestService ;
2125import org .elasticsearch .persistent .AllocatedPersistentTask ;
2226import org .elasticsearch .persistent .PersistentTaskState ;
2327import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
2933import java .util .Map ;
3034import java .util .concurrent .atomic .AtomicReference ;
3135
36+ import static org .elasticsearch .ingest .geoip .GeoIpDownloader .DATABASES_INDEX ;
3237import static org .elasticsearch .ingest .geoip .GeoIpDownloader .GEOIP_DOWNLOADER ;
3338
3439/**
@@ -54,15 +59,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
5459
5560 GeoIpDownloaderTaskExecutor (Client client , HttpClient httpClient , ClusterService clusterService , ThreadPool threadPool ) {
5661 super (GEOIP_DOWNLOADER , ThreadPool .Names .GENERIC );
57- this .client = client ;
62+ this .client = new OriginSettingClient ( client , IngestService . INGEST_ORIGIN ) ;
5863 this .httpClient = httpClient ;
5964 this .clusterService = clusterService ;
6065 this .threadPool = threadPool ;
6166 this .settings = clusterService .getSettings ();
6267 persistentTasksService = new PersistentTasksService (clusterService , threadPool , client );
63- if (ENABLED_SETTING .get (settings )) {
64- clusterService .addListener (this );
65- }
68+ clusterService .addListener (this );
69+
6670 clusterService .getClusterSettings ().addSettingsUpdateConsumer (ENABLED_SETTING , this ::setEnabled );
6771 }
6872
@@ -75,8 +79,8 @@ private void setEnabled(boolean enabled) {
7579 startTask (() -> {
7680 });
7781 } else {
78- persistentTasksService . sendRemoveRequest ( GEOIP_DOWNLOADER , ActionListener . wrap ( r -> {
79- }, e -> logger . error ( "failed to remove geoip task" , e )) );
82+ stopTask (() -> {
83+ });
8084 }
8185 }
8286
@@ -86,23 +90,33 @@ protected void nodeOperation(AllocatedPersistentTask task, GeoIpTaskParams param
8690 currentTask .set (downloader );
8791 GeoIpTaskState geoIpTaskState = state == null ? GeoIpTaskState .EMPTY : (GeoIpTaskState ) state ;
8892 downloader .setState (geoIpTaskState );
89- downloader .runDownloader ();
93+ if (ENABLED_SETTING .get (clusterService .state ().metadata ().settings (), settings )) {
94+ downloader .runDownloader ();
95+ }
9096 }
9197
9298 @ Override
9399 protected GeoIpDownloader createTask (long id , String type , String action , TaskId parentTaskId ,
94- PersistentTasksCustomMetadata .PersistentTask <GeoIpTaskParams > taskInProgress ,
95- Map <String , String > headers ) {
100+ PersistentTasksCustomMetadata .PersistentTask <GeoIpTaskParams > taskInProgress ,
101+ Map <String , String > headers ) {
96102 return new GeoIpDownloader (client , httpClient , clusterService , threadPool , settings , id , type , action ,
97103 getDescription (taskInProgress ), parentTaskId , headers );
98104 }
99105
100106 @ Override
101107 public void clusterChanged (ClusterChangedEvent event ) {
108+ if (event .state ().blocks ().hasGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK )){
109+ //wait for state recovered
110+ return ;
111+ }
102112 //bootstrap downloader after first cluster start
103113 clusterService .removeListener (this );
104- if (event .localNodeMaster () && ENABLED_SETTING .get (event .state ().getMetadata ().settings ())) {
105- startTask (() -> clusterService .addListener (this ));
114+ if (event .localNodeMaster ()) {
115+ if (ENABLED_SETTING .get (event .state ().getMetadata ().settings (), settings )) {
116+ startTask (() -> clusterService .addListener (this ));
117+ } else {
118+ stopTask (() -> clusterService .addListener (this ));
119+ }
106120 }
107121 }
108122
@@ -116,7 +130,24 @@ private void startTask(Runnable onFailure) {
116130 }));
117131 }
118132
119- public GeoIpDownloader getCurrentTask (){
133+ private void stopTask (Runnable onFailure ) {
134+ ActionListener <PersistentTasksCustomMetadata .PersistentTask <?>> listener = ActionListener .wrap (r -> {
135+ }, e -> {
136+ if (e instanceof ResourceNotFoundException == false ) {
137+ logger .error ("failed to remove geoip downloader task" , e );
138+ onFailure .run ();
139+ }
140+ });
141+ persistentTasksService .sendRemoveRequest (GEOIP_DOWNLOADER , ActionListener .runAfter (listener , () ->
142+ client .admin ().indices ().prepareDelete (DATABASES_INDEX ).execute (ActionListener .wrap (rr -> {
143+ }, e -> {
144+ if (e instanceof ResourceNotFoundException == false ) {
145+ logger .warn ("failed to remove " + DATABASES_INDEX , e );
146+ }
147+ }))));
148+ }
149+
150+ public GeoIpDownloader getCurrentTask () {
120151 return currentTask .get ();
121152 }
122153}
0 commit comments