From 5bb31af2f8c722b2e9bed7cf7720f16ea9200c6a Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sun, 16 Nov 2025 22:09:39 -0800 Subject: [PATCH] Fix flaky rollup test by stopping jobs before index cleanup The test was flaky because rollup job coroutines continued running after test cleanup, causing a race condition where the config index was recreated with incorrect dynamic mappings. Root cause: - Tests create enabled rollups -> JobScheduler schedules them - RollupRunner.runJob() launches coroutines via launch {} - Test @After wipes indices - BUT coroutines are still running in background - Coroutines write metadata AFTER indices wiped - Index auto-creates with wrong dynamic mappings (long vs date) - Next test fails with mapping conflict on rollup_metadata.continuous.next_window_end_time Signed-off-by: bowenlan-amzn --- .../rollup/RollupRestTestCase.kt | 32 +++++++++++-------- .../resthandler/RollupRestAPITestCase.kt | 7 ++++ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index f996d9d7c..51effccbd 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -49,7 +49,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { @After @Suppress("UNCHECKED_CAST") - fun KillAllCancallableRunningTasks() { + fun killAllCancellableRunningTasks() { client().makeRequest("POST", "_tasks/_cancel?actions=*") waitFor { val response = client().makeRequest("GET", "_tasks") @@ -68,21 +68,27 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { } @Suppress("UNCHECKED_CAST") - fun waitForCancallableTasksToFinish() { - waitFor { - val response = client().makeRequest("GET", "_tasks") - val nodes = response.asMap()["nodes"] as Map - var hasCancallableRunningTasks = false - nodes.forEach { - val tasks = (it.value as Map)["tasks"] as Map - tasks.forEach { e -> - if ((e.value as Map)["cancellable"] as Boolean) { - hasCancallableRunningTasks = true - logger.info("cancellable task running: ${e.key}") + protected fun stopAllRollupJobs() { + try { + val response = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI?size=1000") + val rollupsList = response.asMap()["rollups"] as? List> ?: emptyList() + + rollupsList.forEach { rollupMap -> + val rollupObj = rollupMap["rollup"] as? Map ?: return@forEach + val id = rollupMap["_id"] as? String ?: return@forEach + val enabled = rollupObj["enabled"] as? Boolean ?: false + + if (enabled) { + try { + client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/$id/_stop") + logger.debug("Stopped rollup job during test cleanup: $id") + } catch (e: Exception) { + logger.debug("Failed to stop rollup $id during cleanup: ${e.message}") } } } - assertFalse(hasCancallableRunningTasks) + } catch (e: Exception) { + logger.warn("Error stopping rollup jobs during test cleanup", e) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt index 26d6f6e78..a5a4cfb7a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt @@ -11,6 +11,13 @@ import org.opensearch.indexmanagement.rollup.RollupRestTestCase abstract class RollupRestAPITestCase : RollupRestTestCase() { @After fun clearIndicesAfterEachTest() { + // Stop all rollup jobs first to prevent new executions and allow running coroutines to complete + stopAllRollupJobs() + + // Wait for in-flight job executions to complete their metadata writes + // This prevents race condition where coroutines recreate config index after wipeAllIndices() + Thread.sleep(2000) + // For API tests, flaky could happen if config index not deleted // metadata creation could cause the mapping to be auto set to // a wrong type, namely, [rollup_metadata.continuous.next_window_end_time] to long