Skip to content

Commit 89b4ddd

Browse files
authored
ESQL: Fix Driver creating status with a live list of operators (#132260) (#132338)
Manual 8.17 backport of #132260
1 parent 202aa92 commit 89b4ddd

File tree

3 files changed

+60
-1
lines changed

3 files changed

+60
-1
lines changed

docs/changelog/132260.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 132260
2+
summary: FIx Driver creating status with a live list of operators
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 131564

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
511511
prev.cpuNanos() + extraCpuNanos,
512512
prev.iterations() + extraIterations,
513513
status,
514-
statusOfCompletedOperators,
514+
List.copyOf(statusOfCompletedOperators),
515515
activeOperators.stream().map(op -> new DriverStatus.OperatorStatus(op.toString(), op.status())).toList(),
516516
sleeps
517517
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.function.LongSupplier;
3939

4040
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.not;
42+
import static org.hamcrest.Matchers.sameInstance;
4143

4244
public class DriverTests extends ESTestCase {
4345
/**
@@ -188,6 +190,57 @@ public void testProfileAndStatusTimeout() {
188190
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
189191
}
190192

193+
public void testUnchangedStatus() {
194+
DriverContext driverContext = driverContext();
195+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
196+
List<Page> outPages = new ArrayList<>();
197+
198+
long startEpoch = randomNonNegativeLong();
199+
long startNanos = randomLong();
200+
long waitTime = randomLongBetween(10000, 100000);
201+
long tickTime = randomLongBetween(10000, 100000);
202+
203+
Driver driver = new Driver(
204+
"unset",
205+
startEpoch,
206+
startNanos,
207+
driverContext,
208+
() -> "unset",
209+
new CannedSourceOperator(inPages.iterator()),
210+
List.of(),
211+
new TestResultPageSinkOperator(outPages::add),
212+
TimeValue.timeValueNanos(tickTime),
213+
() -> {}
214+
);
215+
216+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
217+
218+
int iterationsPerTick = randomIntBetween(1, 10);
219+
220+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
221+
DriverStatus initialStatus = driver.status();
222+
long completedOperatorsHash = initialStatus.completedOperators().hashCode();
223+
long activeOperatorsHash = initialStatus.activeOperators().hashCode();
224+
long sleepsHash = initialStatus.sleeps().hashCode();
225+
226+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
227+
228+
DriverStatus newStatus = driver.status();
229+
assertThat(newStatus, not(sameInstance(initialStatus)));
230+
assertThat(
231+
newStatus.completedOperators() != initialStatus.completedOperators()
232+
|| newStatus.completedOperators().hashCode() == completedOperatorsHash,
233+
equalTo(true)
234+
);
235+
assertThat(
236+
newStatus.activeOperators() != initialStatus.activeOperators()
237+
|| newStatus.activeOperators().hashCode() == activeOperatorsHash,
238+
equalTo(true)
239+
);
240+
assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true));
241+
}
242+
}
243+
191244
class NowSupplier implements LongSupplier {
192245
private final long startNanos;
193246
private final long waitTime;

0 commit comments

Comments
 (0)