@@ -163,6 +163,7 @@ public void testInvalidTimestamp() throws Exception {
163163
164164 putGeoIpPipeline ();
165165 verifyUpdatedDatabase ();
166+ awaitAllNodesDownloadedDatabases ();
166167
167168 updateClusterSettings (Settings .builder ().put ("ingest.geoip.database_validity" , TimeValue .timeValueMillis (1 )));
168169 updateClusterSettings (
@@ -215,6 +216,7 @@ public void testInvalidTimestamp() throws Exception {
215216 }
216217 }
217218 });
219+ awaitAllNodesDownloadedDatabases ();
218220 }
219221
220222 @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/92888" )
@@ -305,6 +307,7 @@ public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception {
305307 assertNotNull (task );
306308 assertNotNull (task .getState ());
307309 });
310+ awaitAllNodesDownloadedDatabases ();
308311 putNonGeoipPipeline (pipelineId );
309312 assertNotNull (getTask ().getState ()); // removing all geoip processors should not result in the task being stopped
310313 assertBusy (() -> {
@@ -350,6 +353,7 @@ public void testDoNotDownloadDatabaseOnPipelineCreation() throws Exception {
350353 containsInAnyOrder ("GeoLite2-ASN.mmdb" , "GeoLite2-City.mmdb" , "GeoLite2-Country.mmdb" , "MyCustomGeoLite2-City.mmdb" )
351354 );
352355 }, 2 , TimeUnit .MINUTES );
356+ awaitAllNodesDownloadedDatabases ();
353357
354358 // Remove the created index.
355359 assertAcked (indicesAdmin ().prepareDelete (indexIdentifier ).get ());
@@ -411,6 +415,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
411415 }, 20 , TimeUnit .SECONDS );
412416
413417 verifyUpdatedDatabase ();
418+ awaitAllNodesDownloadedDatabases ();
414419
415420 // Disable downloader:
416421 updateClusterSettings (Settings .builder ().put (GeoIpDownloaderTaskExecutor .ENABLED_SETTING .getKey (), false ));
@@ -453,6 +458,7 @@ public void testStartWithNoDatabases() throws Exception {
453458 // Enable downloader:
454459 updateClusterSettings (Settings .builder ().put (GeoIpDownloaderTaskExecutor .ENABLED_SETTING .getKey (), true ));
455460 verifyUpdatedDatabase ();
461+ awaitAllNodesDownloadedDatabases ();
456462 }
457463
458464 private void verifyUpdatedDatabase () throws Exception {
@@ -473,6 +479,27 @@ private void verifyUpdatedDatabase() throws Exception {
473479 });
474480 }
475481
482+ /**
483+ * Waits until all ingest nodes report having downloaded the expected databases. This ensures that all ingest nodes are in a consistent
484+ * state and prevents us from deleting databases before they've been downloaded on all nodes.
485+ */
486+ private void awaitAllNodesDownloadedDatabases () throws Exception {
487+ assertBusy (() -> {
488+ GeoIpStatsAction .Response response = client ().execute (GeoIpStatsAction .INSTANCE , new GeoIpStatsAction .Request ()).actionGet ();
489+ assertThat (response .getNodes (), not (empty ()));
490+
491+ for (GeoIpStatsAction .NodeResponse nodeResponse : response .getNodes ()) {
492+ if (nodeResponse .getNode ().isIngestNode () == false ) {
493+ continue ;
494+ }
495+ assertThat (
496+ nodeResponse .getDatabases (),
497+ containsInAnyOrder ("GeoLite2-Country.mmdb" , "GeoLite2-City.mmdb" , "GeoLite2-ASN.mmdb" , "MyCustomGeoLite2-City.mmdb" )
498+ );
499+ }
500+ });
501+ }
502+
476503 private GeoIpTaskState getGeoIpTaskState () {
477504 PersistentTasksCustomMetadata .PersistentTask <PersistentTaskParams > task = getTask ();
478505 assertNotNull (task );
0 commit comments