diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index ea8d278c4d202..5423e6b2a917f 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -163,6 +163,7 @@ public void testInvalidTimestamp() throws Exception { putGeoIpPipeline(); verifyUpdatedDatabase(); + awaitAllNodesDownloadedDatabases(); updateClusterSettings(Settings.builder().put("ingest.geoip.database_validity", TimeValue.timeValueMillis(1))); updateClusterSettings( @@ -215,6 +216,7 @@ public void testInvalidTimestamp() throws Exception { } } }); + awaitAllNodesDownloadedDatabases(); } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/92888") @@ -305,6 +307,7 @@ public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception { assertNotNull(task); assertNotNull(task.getState()); }); + awaitAllNodesDownloadedDatabases(); putNonGeoipPipeline(pipelineId); assertNotNull(getTask().getState()); // removing all geoip processors should not result in the task being stopped assertBusy(() -> { @@ -350,6 +353,7 @@ public void testDoNotDownloadDatabaseOnPipelineCreation() throws Exception { containsInAnyOrder("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb") ); }, 2, TimeUnit.MINUTES); + awaitAllNodesDownloadedDatabases(); // Remove the created index. assertAcked(indicesAdmin().prepareDelete(indexIdentifier).get()); @@ -411,6 +415,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { }, 20, TimeUnit.SECONDS); verifyUpdatedDatabase(); + awaitAllNodesDownloadedDatabases(); // Disable downloader: updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false)); @@ -453,6 +458,7 @@ public void testStartWithNoDatabases() throws Exception { // Enable downloader: updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)); verifyUpdatedDatabase(); + awaitAllNodesDownloadedDatabases(); } private void verifyUpdatedDatabase() throws Exception { @@ -473,6 +479,27 @@ private void verifyUpdatedDatabase() throws Exception { }); } + /** + * Waits until all ingest nodes report having downloaded the expected databases. This ensures that all ingest nodes are in a consistent + * state and prevents us from deleting databases before they've been downloaded on all nodes. + */ + private void awaitAllNodesDownloadedDatabases() throws Exception { + assertBusy(() -> { + GeoIpStatsAction.Response response = client().execute(GeoIpStatsAction.INSTANCE, new GeoIpStatsAction.Request()).actionGet(); + assertThat(response.getNodes(), not(empty())); + + for (GeoIpStatsAction.NodeResponse nodeResponse : response.getNodes()) { + if (nodeResponse.getNode().isIngestNode() == false) { + continue; + } + assertThat( + nodeResponse.getDatabases(), + containsInAnyOrder("GeoLite2-Country.mmdb", "GeoLite2-City.mmdb", "GeoLite2-ASN.mmdb", "MyCustomGeoLite2-City.mmdb") + ); + } + }); + } + private GeoIpTaskState getGeoIpTaskState() { PersistentTasksCustomMetadata.PersistentTask task = getTask(); assertNotNull(task);