Skip to content

Commit 261da78

Browse files
authored
ESQL: Fix Driver creating status with a live list of operators (#132260) (#132340)
Manual 8.19 backport of #132260
1 parent 599b484 commit 261da78

File tree

3 files changed

+62
-1
lines changed

3 files changed

+62
-1
lines changed

docs/changelog/132260.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 132260
2+
summary: FIx Driver creating status with a live list of operators
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 131564

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
571571
prev.cpuNanos() + extraCpuNanos,
572572
prev.iterations() + extraIterations,
573573
status,
574-
statusOfCompletedOperators,
574+
List.copyOf(statusOfCompletedOperators),
575575
activeOperators.stream().map(op -> new OperatorStatus(op.toString(), op.status())).toList(),
576576
sleeps
577577
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949

5050
import static org.hamcrest.Matchers.either;
5151
import static org.hamcrest.Matchers.equalTo;
52+
import static org.hamcrest.Matchers.not;
53+
import static org.hamcrest.Matchers.sameInstance;
5254

5355
public class DriverTests extends ESTestCase {
5456
/**
@@ -252,6 +254,59 @@ public void testProfileAndStatusInterval() {
252254
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
253255
}
254256

257+
public void testUnchangedStatus() {
258+
DriverContext driverContext = driverContext();
259+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
260+
List<Page> outPages = new ArrayList<>();
261+
262+
long startEpoch = randomNonNegativeLong();
263+
long startNanos = randomLong();
264+
long waitTime = randomLongBetween(10000, 100000);
265+
long tickTime = randomLongBetween(10000, 100000);
266+
long statusInterval = randomLongBetween(1, 10);
267+
268+
Driver driver = new Driver(
269+
"unset",
270+
"test",
271+
startEpoch,
272+
startNanos,
273+
driverContext,
274+
() -> "unset",
275+
new CannedSourceOperator(inPages.iterator()),
276+
List.of(),
277+
new TestResultPageSinkOperator(outPages::add),
278+
TimeValue.timeValueNanos(statusInterval),
279+
() -> {}
280+
);
281+
282+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
283+
284+
int iterationsPerTick = randomIntBetween(1, 10);
285+
286+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
287+
DriverStatus initialStatus = driver.status();
288+
long completedOperatorsHash = initialStatus.completedOperators().hashCode();
289+
long activeOperatorsHash = initialStatus.activeOperators().hashCode();
290+
long sleepsHash = initialStatus.sleeps().hashCode();
291+
292+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
293+
294+
DriverStatus newStatus = driver.status();
295+
assertThat(newStatus, not(sameInstance(initialStatus)));
296+
assertThat(
297+
newStatus.completedOperators() != initialStatus.completedOperators()
298+
|| newStatus.completedOperators().hashCode() == completedOperatorsHash,
299+
equalTo(true)
300+
);
301+
assertThat(
302+
newStatus.activeOperators() != initialStatus.activeOperators()
303+
|| newStatus.activeOperators().hashCode() == activeOperatorsHash,
304+
equalTo(true)
305+
);
306+
assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true));
307+
}
308+
}
309+
255310
class NowSupplier implements LongSupplier {
256311
private final long startNanos;
257312
private final long waitTime;

0 commit comments

Comments
 (0)