Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

@After
@Suppress("UNCHECKED_CAST")
fun KillAllCancallableRunningTasks() {
fun killAllCancellableRunningTasks() {
client().makeRequest("POST", "_tasks/_cancel?actions=*")
waitFor {
val response = client().makeRequest("GET", "_tasks")
Expand All @@ -68,21 +68,27 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
}

@Suppress("UNCHECKED_CAST")
fun waitForCancallableTasksToFinish() {
waitFor {
val response = client().makeRequest("GET", "_tasks")
val nodes = response.asMap()["nodes"] as Map<String, Any?>
var hasCancallableRunningTasks = false
nodes.forEach {
val tasks = (it.value as Map<String, Any?>)["tasks"] as Map<String, Any?>
tasks.forEach { e ->
if ((e.value as Map<String, Any?>)["cancellable"] as Boolean) {
hasCancallableRunningTasks = true
logger.info("cancellable task running: ${e.key}")
protected fun stopAllRollupJobs() {
try {
val response = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI?size=1000")
val rollupsList = response.asMap()["rollups"] as? List<Map<String, Any?>> ?: emptyList()

rollupsList.forEach { rollupMap ->
val rollupObj = rollupMap["rollup"] as? Map<String, Any?> ?: return@forEach
val id = rollupMap["_id"] as? String ?: return@forEach
val enabled = rollupObj["enabled"] as? Boolean ?: false

if (enabled) {
try {
client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/$id/_stop")
logger.debug("Stopped rollup job during test cleanup: $id")
} catch (e: Exception) {
logger.debug("Failed to stop rollup $id during cleanup: ${e.message}")
}
}
}
assertFalse(hasCancallableRunningTasks)
} catch (e: Exception) {
logger.warn("Error stopping rollup jobs during test cleanup", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import org.opensearch.indexmanagement.rollup.RollupRestTestCase
abstract class RollupRestAPITestCase : RollupRestTestCase() {
@After
fun clearIndicesAfterEachTest() {
// Stop all rollup jobs first to prevent new executions and allow running coroutines to complete
stopAllRollupJobs()

// Wait for in-flight job executions to complete their metadata writes
// This prevents race condition where coroutines recreate config index after wipeAllIndices()
Thread.sleep(2000)

// For API tests, flaky could happen if config index not deleted
// metadata creation could cause the mapping to be auto set to
// a wrong type, namely, [rollup_metadata.continuous.next_window_end_time] to long
Expand Down
Loading