|
23 | 23 | import org.elasticsearch.compute.data.Block; |
24 | 24 | import org.elasticsearch.compute.data.BlockFactory; |
25 | 25 | import org.elasticsearch.compute.data.Page; |
| 26 | +import org.elasticsearch.compute.operator.AsyncOperator; |
26 | 27 | import org.elasticsearch.compute.operator.Driver; |
27 | 28 | import org.elasticsearch.compute.operator.DriverContext; |
28 | 29 | import org.elasticsearch.compute.operator.DriverRunner; |
@@ -247,11 +248,23 @@ public void testSimpleFinishClose() throws Exception { |
247 | 248 | assert input.size() == 1 : "Expected single page, got: " + input; |
248 | 249 | // eventually, when driverContext always returns a tracking factory, we can enable this assertion |
249 | 250 | // assertThat(driverContext.blockFactory().breaker().getUsed(), greaterThan(0L)); |
250 | | - Page page = input.get(0); |
251 | 251 | try (var operator = simple().get(driverContext)) { |
252 | 252 | assert operator.needsInput(); |
253 | | - operator.addInput(page); |
| 253 | + for (Page page : input) { |
| 254 | + if (operator.needsInput()) { |
| 255 | + operator.addInput(page); |
| 256 | + } else { |
| 257 | + page.releaseBlocks(); |
| 258 | + } |
| 259 | + } |
254 | 260 | operator.finish(); |
| 261 | + // for async operator, we need to wait for async actions to finish. |
| 262 | + if (operator instanceof AsyncOperator<?> || randomBoolean()) { |
| 263 | + driverContext.finish(); |
| 264 | + PlainActionFuture<Void> waitForAsync = new PlainActionFuture<>(); |
| 265 | + driverContext.waitForAsyncActions(waitForAsync); |
| 266 | + waitForAsync.actionGet(TimeValue.timeValueSeconds(30)); |
| 267 | + } |
255 | 268 | } |
256 | 269 | } |
257 | 270 |
|
|
0 commit comments