Skip to content

Commit 57ff7f7

Browse files
authored
ESQL: Fix Driver creating status with a live list of operators (#132260) (#132339)
Manual 8.18 backport of #132260
1 parent c492796 commit 57ff7f7

File tree

3 files changed

+60
-1
lines changed

3 files changed

+60
-1
lines changed

docs/changelog/132260.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 132260
2+
summary: FIx Driver creating status with a live list of operators
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 131564

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
537537
prev.cpuNanos() + extraCpuNanos,
538538
prev.iterations() + extraIterations,
539539
status,
540-
statusOfCompletedOperators,
540+
List.copyOf(statusOfCompletedOperators),
541541
activeOperators.stream().map(op -> new DriverStatus.OperatorStatus(op.toString(), op.status())).toList(),
542542
sleeps
543543
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import static org.hamcrest.Matchers.either;
5151
import static org.hamcrest.Matchers.equalTo;
5252
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
53+
import static org.hamcrest.Matchers.not;
54+
import static org.hamcrest.Matchers.sameInstance;
5355

5456
public class DriverTests extends ESTestCase {
5557
/**
@@ -200,6 +202,57 @@ public void testProfileAndStatusTimeout() {
200202
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
201203
}
202204

205+
public void testUnchangedStatus() {
206+
DriverContext driverContext = driverContext();
207+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
208+
List<Page> outPages = new ArrayList<>();
209+
210+
long startEpoch = randomNonNegativeLong();
211+
long startNanos = randomLong();
212+
long waitTime = randomLongBetween(10000, 100000);
213+
long tickTime = randomLongBetween(10000, 100000);
214+
215+
Driver driver = new Driver(
216+
"unset",
217+
startEpoch,
218+
startNanos,
219+
driverContext,
220+
() -> "unset",
221+
new CannedSourceOperator(inPages.iterator()),
222+
List.of(),
223+
new TestResultPageSinkOperator(outPages::add),
224+
TimeValue.timeValueNanos(tickTime),
225+
() -> {}
226+
);
227+
228+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
229+
230+
int iterationsPerTick = randomIntBetween(1, 10);
231+
232+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
233+
DriverStatus initialStatus = driver.status();
234+
long completedOperatorsHash = initialStatus.completedOperators().hashCode();
235+
long activeOperatorsHash = initialStatus.activeOperators().hashCode();
236+
long sleepsHash = initialStatus.sleeps().hashCode();
237+
238+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
239+
240+
DriverStatus newStatus = driver.status();
241+
assertThat(newStatus, not(sameInstance(initialStatus)));
242+
assertThat(
243+
newStatus.completedOperators() != initialStatus.completedOperators()
244+
|| newStatus.completedOperators().hashCode() == completedOperatorsHash,
245+
equalTo(true)
246+
);
247+
assertThat(
248+
newStatus.activeOperators() != initialStatus.activeOperators()
249+
|| newStatus.activeOperators().hashCode() == activeOperatorsHash,
250+
equalTo(true)
251+
);
252+
assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true));
253+
}
254+
}
255+
203256
class NowSupplier implements LongSupplier {
204257
private final long startNanos;
205258
private final long waitTime;

0 commit comments

Comments
 (0)