Skip to content

Commit 8dc697a

Browse files
authored
ESQL: Fix Driver creating status with a live list of operators (#132260) (#132333)
`DriverStatus` is an immutable record created by the Driver. However, its components are not inherently immutable. This PR fixes a live collection used by the Driver, that was being directly put into the status, leading to `ConcurrentModificationException`s when reading it (Through the Task list API, for example). Some example errors: ``` java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1096) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1050) at org.elasticsearch.compute.operator.DriverStatus.toXContent(DriverStatus.java:134) at [email protected]/org.elasticsearch.xcontent.XContentBuilder.value(XContentBuilder.java:993) at [email protected]/org.elasticsearch.xcontent.XContentBuilder.field(XContentBuilder.java:978) at [email protected]/org.elasticsearch.tasks.TaskInfo.toXContent(TaskInfo.java:113) at [email protected]/org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup.toXContent(TaskGroup.java:63) at [email protected]/org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup.toXContent(TaskGroup.java:67) at [email protected]/org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse.lambda$groupedByParent$10(ListTasksResponse.java:183) ``` And: ``` java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1096) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1050) at org.elasticsearch.compute.operator.DriverStatus.documentsFound(DriverStatus.java:157) at org.elasticsearch.compute.operator.DriverStatus.toXContent(DriverStatus.java:129) at [email protected]/org.elasticsearch.xcontent.XContentBuilder.value(XContentBuilder.java:993) at [email protected]/org.elasticsearch.xcontent.XContentBuilder.field(XContentBuilder.java:978) at [email protected]/org.elasticsearch.tasks.TaskInfo.toXContent(TaskInfo.java:113) at org.elasticsearch.server@9.2.0org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup.toXContent(TaskGroup.java:63) at [email protected]/org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup.toXContent(TaskGroup.java:67) at [email protected]/org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse.lambda$groupedByParent$10(ListTasksResponse.java:183) ``` Also, this looks like the source of this issue, with another similar case: Fixes #131564
1 parent 043ddc3 commit 8dc697a

File tree

3 files changed

+50
-1
lines changed

3 files changed

+50
-1
lines changed

docs/changelog/132260.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 132260
2+
summary: FIx Driver creating status with a live list of operators
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 131564

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
556556
prev.cpuNanos() + extraCpuNanos,
557557
prev.iterations() + extraIterations,
558558
status,
559-
statusOfCompletedOperators,
559+
List.copyOf(statusOfCompletedOperators),
560560
activeOperators.stream().map(op -> new OperatorStatus(op.toString(), op.status())).toList(),
561561
sleeps
562562
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949

5050
import static org.hamcrest.Matchers.either;
5151
import static org.hamcrest.Matchers.equalTo;
52+
import static org.hamcrest.Matchers.not;
53+
import static org.hamcrest.Matchers.sameInstance;
5254

5355
public class DriverTests extends ESTestCase {
5456
/**
@@ -204,6 +206,47 @@ public void testProfileAndStatusInterval() {
204206
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
205207
}
206208

209+
public void testUnchangedStatus() {
210+
DriverContext driverContext = driverContext();
211+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
212+
List<Page> outPages = new ArrayList<>();
213+
214+
long startEpoch = randomNonNegativeLong();
215+
long startNanos = randomLong();
216+
long waitTime = randomLongBetween(10000, 100000);
217+
long tickTime = randomLongBetween(10000, 100000);
218+
long statusInterval = randomLongBetween(1, 10);
219+
220+
Driver driver = createDriver(startEpoch, startNanos, driverContext, inPages, outPages, TimeValue.timeValueNanos(statusInterval));
221+
222+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
223+
224+
int iterationsPerTick = randomIntBetween(1, 10);
225+
226+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
227+
DriverStatus initialStatus = driver.status();
228+
long completedOperatorsHash = initialStatus.completedOperators().hashCode();
229+
long activeOperatorsHash = initialStatus.activeOperators().hashCode();
230+
long sleepsHash = initialStatus.sleeps().hashCode();
231+
232+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
233+
234+
DriverStatus newStatus = driver.status();
235+
assertThat(newStatus, not(sameInstance(initialStatus)));
236+
assertThat(
237+
newStatus.completedOperators() != initialStatus.completedOperators()
238+
|| newStatus.completedOperators().hashCode() == completedOperatorsHash,
239+
equalTo(true)
240+
);
241+
assertThat(
242+
newStatus.activeOperators() != initialStatus.activeOperators()
243+
|| newStatus.activeOperators().hashCode() == activeOperatorsHash,
244+
equalTo(true)
245+
);
246+
assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true));
247+
}
248+
}
249+
207250
private static Driver createDriver(
208251
long startEpoch,
209252
long startNanos,

0 commit comments

Comments
 (0)