From 96dee5519c707896b4db9c45f8eced61fc7b500f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Nov 2025 21:11:30 -0800 Subject: [PATCH] Wait for async actions in testSimpleFinishClose --- .../compute/test/OperatorTestCase.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java index a46dca4ae38cf..2766ede47f5fe 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java @@ -23,6 +23,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.AsyncOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.DriverRunner; @@ -247,11 +248,23 @@ public void testSimpleFinishClose() throws Exception { assert input.size() == 1 : "Expected single page, got: " + input; // eventually, when driverContext always returns a tracking factory, we can enable this assertion // assertThat(driverContext.blockFactory().breaker().getUsed(), greaterThan(0L)); - Page page = input.get(0); try (var operator = simple().get(driverContext)) { assert operator.needsInput(); - operator.addInput(page); + for (Page page : input) { + if (operator.needsInput()) { + operator.addInput(page); + } else { + page.releaseBlocks(); + } + } operator.finish(); + // for async operator, we need to wait for async actions to finish. + if (operator instanceof AsyncOperator || randomBoolean()) { + driverContext.finish(); + PlainActionFuture waitForAsync = new PlainActionFuture<>(); + driverContext.waitForAsyncActions(waitForAsync); + waitForAsync.actionGet(TimeValue.timeValueSeconds(30)); + } } }