1414import org .elasticsearch .ExceptionsHelper ;
1515import org .elasticsearch .ResourceAlreadyExistsException ;
1616import org .elasticsearch .action .ActionListener ;
17+ import org .elasticsearch .action .LatchedActionListener ;
1718import org .elasticsearch .action .bulk .BulkItemResponse ;
1819import org .elasticsearch .action .bulk .BulkRequest ;
1920import org .elasticsearch .action .bulk .BulkResponse ;
2223import org .elasticsearch .action .index .IndexRequest ;
2324import org .elasticsearch .action .search .SearchRequest ;
2425import org .elasticsearch .action .search .SearchResponse ;
26+ import org .elasticsearch .action .support .SubscribableListener ;
27+ import org .elasticsearch .action .support .master .AcknowledgedResponse ;
2528import org .elasticsearch .common .Strings ;
2629import org .elasticsearch .common .settings .MockSecureSettings ;
2730import org .elasticsearch .common .settings .Settings ;
2831import org .elasticsearch .common .util .CollectionUtils ;
2932import org .elasticsearch .core .TimeValue ;
3033import org .elasticsearch .ingest .EnterpriseGeoIpTask ;
3134import org .elasticsearch .ingest .geoip .direct .DatabaseConfiguration ;
35+ import org .elasticsearch .ingest .geoip .direct .DeleteDatabaseConfigurationAction ;
3236import org .elasticsearch .ingest .geoip .direct .PutDatabaseConfigurationAction ;
3337import org .elasticsearch .persistent .PersistentTasksService ;
3438import org .elasticsearch .plugins .Plugin ;
3842import org .elasticsearch .test .junit .annotations .TestLogging ;
3943import org .elasticsearch .transport .RemoteTransportException ;
4044import org .elasticsearch .xcontent .XContentType ;
45+ import org .junit .After ;
4146import org .junit .ClassRule ;
4247
4348import java .io .IOException ;
4449import java .util .Collection ;
4550import java .util .List ;
4651import java .util .Map ;
52+ import java .util .concurrent .CountDownLatch ;
53+ import java .util .concurrent .TimeUnit ;
4754
4855import static org .elasticsearch .ingest .EnterpriseGeoIpTask .ENTERPRISE_GEOIP_DOWNLOADER ;
4956import static org .elasticsearch .ingest .geoip .EnterpriseGeoIpDownloaderTaskExecutor .IPINFO_TOKEN_SETTING ;
@@ -54,6 +61,8 @@ public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {
5461
5562 private static final String MAXMIND_DATABASE_TYPE = "GeoIP2-City" ;
5663 private static final String IPINFO_DATABASE_TYPE = "asn" ;
64+ private static final String MAXMIND_CONFIGURATION = "test-1" ;
65+ private static final String IPINFO_CONFIGURATION = "test-2" ;
5766
5867 @ ClassRule
5968 public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture (
@@ -118,6 +127,7 @@ public void testEnterpriseDownloaderTask() throws Exception {
118127 * We know that the databases index has been populated (because we waited around, :wink:), but we don't know for sure that
119128 * the databases have been pulled down and made available on all nodes. So we run these ingest-and-check steps in assertBusy blocks.
120129 */
130+
121131 assertBusy (() -> {
122132 logger .info ("Ingesting a test document" );
123133 String documentId = ingestDocument (indexName , geoipPipelineName , sourceField , "89.160.20.128" );
@@ -140,6 +150,41 @@ public void testEnterpriseDownloaderTask() throws Exception {
140150 });
141151 }
142152
153+ @ After
154+ public void cleanup () throws InterruptedException {
155+ /*
156+ * This method cleans up the database configurations that the test created. This allows the test to be run repeatedly.
157+ */
158+ CountDownLatch latch = new CountDownLatch (1 );
159+ LatchedActionListener <AcknowledgedResponse > listener = new LatchedActionListener <>(ActionListener .noop (), latch );
160+ SubscribableListener .<AcknowledgedResponse >newForked (
161+ l -> admin ().cluster ()
162+ .execute (
163+ DeleteDatabaseConfigurationAction .INSTANCE ,
164+ new DeleteDatabaseConfigurationAction .Request (
165+ TimeValue .MAX_VALUE ,
166+ TimeValue .timeValueSeconds (10 ),
167+ MAXMIND_CONFIGURATION
168+ ),
169+ l
170+ )
171+ )
172+ .<AcknowledgedResponse >andThen (
173+ l -> admin ().cluster ()
174+ .execute (
175+ DeleteDatabaseConfigurationAction .INSTANCE ,
176+ new DeleteDatabaseConfigurationAction .Request (
177+ TimeValue .MAX_VALUE ,
178+ TimeValue .timeValueSeconds (10 ),
179+ IPINFO_CONFIGURATION
180+ ),
181+ l
182+ )
183+ )
184+ .addListener (listener );
185+ latch .await (10 , TimeUnit .SECONDS );
186+ }
187+
143188 private void startEnterpriseGeoIpDownloaderTask () {
144189 PersistentTasksService persistentTasksService = internalCluster ().getInstance (PersistentTasksService .class );
145190 persistentTasksService .sendStartRequest (
@@ -163,7 +208,7 @@ private void configureMaxmindDatabase(String databaseType) {
163208 new PutDatabaseConfigurationAction .Request (
164209 TimeValue .MAX_VALUE ,
165210 TimeValue .MAX_VALUE ,
166- new DatabaseConfiguration ("test-1" , databaseType , new DatabaseConfiguration .Maxmind ("test_account" ))
211+ new DatabaseConfiguration (MAXMIND_CONFIGURATION , databaseType , new DatabaseConfiguration .Maxmind ("test_account" ))
167212 )
168213 )
169214 .actionGet ();
@@ -176,7 +221,7 @@ private void configureIpinfoDatabase(String databaseType) {
176221 new PutDatabaseConfigurationAction .Request (
177222 TimeValue .MAX_VALUE ,
178223 TimeValue .MAX_VALUE ,
179- new DatabaseConfiguration ("test-2" , databaseType , new DatabaseConfiguration .Ipinfo ())
224+ new DatabaseConfiguration (IPINFO_CONFIGURATION , databaseType , new DatabaseConfiguration .Ipinfo ())
180225 )
181226 )
182227 .actionGet ();
0 commit comments