Skip to content

Commit 50506a1

Browse files
authored
Merge branch '9.0' into build/jdk24_rc_default
2 parents 997e64d + 0903810 commit 50506a1

File tree

8 files changed

+83
-155
lines changed

8 files changed

+83
-155
lines changed

docs/changelog/123290.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123290
2+
summary: Fix Driver status iterations and `cpuTime`
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 122967

gradle/verification-metadata.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@
6969
<sha256 value="10fe288fd7a2cdaf5175332b73529f9abf8fd54dcfff317d6967c0c35ffb133b" origin="Generated by Gradle"/>
7070
</artifact>
7171
</component>
72-
<component group="co.elastic.apm" name="elastic-apm-agent" version="1.52.0">
73-
<artifact name="elastic-apm-agent-1.52.0.jar">
74-
<sha256 value="ef6c8f75bd6181e717cdd172864441580708c7ee8543175621a3f404f4ba6429" origin="Generated by Gradle"/>
72+
<component group="co.elastic.apm" name="elastic-apm-agent" version="1.52.2">
73+
<artifact name="elastic-apm-agent-1.52.2.jar">
74+
<sha256 value="dee18355a06f66a425bd597d6447ccbe7e8b7a3c0667adb7b30da173e31044e0" origin="Generated by Gradle"/>
7575
</artifact>
7676
</component>
7777
<component group="co.elastic.logging" name="ecs-logging-core" version="1.2.0">

modules/apm/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ dependencies {
2020
implementation "io.opentelemetry:opentelemetry-api:${otelVersion}"
2121
implementation "io.opentelemetry:opentelemetry-context:${otelVersion}"
2222
implementation "io.opentelemetry:opentelemetry-semconv:${otelSemconvVersion}"
23-
runtimeOnly "co.elastic.apm:elastic-apm-agent:1.52.0"
23+
runtimeOnly "co.elastic.apm:elastic-apm-agent:1.52.2"
2424

2525
javaRestTestImplementation project(':modules:apm')
2626
javaRestTestImplementation project(':test:framework')

modules/ingest-geoip/src/main/plugin-metadata/entitlement-policy.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
org.elasticsearch.ingest.geoip:
2+
- outbound_network
23
- files:
34
- relative_path: "ingest-geoip"
45
relative_to: config

qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/FileSettingsRoleMappingUpgradeIT.java

Lines changed: 0 additions & 140 deletions
This file was deleted.

test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void testApmIntegration() throws Exception {
127127

128128
var completed = finished.await(30, TimeUnit.SECONDS);
129129
var remainingAssertions = Stream.concat(valueAssertions.keySet().stream(), histogramAssertions.keySet().stream())
130-
.collect(Collectors.joining());
130+
.collect(Collectors.joining(","));
131131
assertTrue("Timeout when waiting for assertions to complete. Remaining assertions to match: " + remainingAssertions, completed);
132132
}
133133

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,16 @@ public DriverContext driverContext() {
198198
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
199199
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
200200
long maxTimeNanos = maxTime.nanos();
201+
// Start time, used to stop the calculations after maxTime has passed.
201202
long startTime = nowSupplier.getAsLong();
203+
// The time of the next forced status update.
202204
long nextStatus = startTime + statusNanos;
203-
int iter = 0;
205+
// Total executed iterations this run, used to stop the calculations after maxIterations have passed.
206+
int totalIterationsThisRun = 0;
207+
// The iterations to be reported on the next status update.
208+
int iterationsSinceLastStatusUpdate = 0;
209+
// The time passed since the last status update.
210+
long lastStatusUpdateTime = startTime;
204211
while (true) {
205212
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
206213
try {
@@ -209,29 +216,33 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
209216
closeEarlyFinishedOperators();
210217
assert isFinished() : "not finished after early termination";
211218
}
212-
iter++;
219+
totalIterationsThisRun++;
220+
iterationsSinceLastStatusUpdate++;
221+
222+
long now = nowSupplier.getAsLong();
213223
if (isBlocked.listener().isDone() == false) {
214-
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
224+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
215225
return isBlocked.listener();
216226
}
217227
if (isFinished()) {
218-
finishNanos = nowSupplier.getAsLong();
219-
updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done");
228+
finishNanos = now;
229+
updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
220230
driverContext.finish();
221231
Releasables.close(releasable, driverContext.getSnapshot());
222232
return Operator.NOT_BLOCKED.listener();
223233
}
224-
long now = nowSupplier.getAsLong();
225-
if (iter >= maxIterations) {
226-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations");
234+
if (totalIterationsThisRun >= maxIterations) {
235+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
227236
return Operator.NOT_BLOCKED.listener();
228237
}
229238
if (now - startTime >= maxTimeNanos) {
230-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time");
239+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
231240
return Operator.NOT_BLOCKED.listener();
232241
}
233242
if (now > nextStatus) {
234-
updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running");
243+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
244+
iterationsSinceLastStatusUpdate = 0;
245+
lastStatusUpdateTime = now;
235246
nextStatus = now + statusNanos;
236247
}
237248
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,56 @@ public void testProfileAndStatusTimeout() {
202202
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
203203
}
204204

205+
public void testProfileAndStatusInterval() {
206+
DriverContext driverContext = driverContext();
207+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
208+
List<Page> outPages = new ArrayList<>();
209+
210+
long startEpoch = randomNonNegativeLong();
211+
long startNanos = randomLong();
212+
long waitTime = randomLongBetween(10000, 100000);
213+
long tickTime = randomLongBetween(10000, 100000);
214+
long statusInterval = randomLongBetween(1, 10);
215+
216+
Driver driver = new Driver(
217+
"unset",
218+
"test",
219+
startEpoch,
220+
startNanos,
221+
driverContext,
222+
() -> "unset",
223+
new CannedSourceOperator(inPages.iterator()),
224+
List.of(),
225+
new TestResultPageSinkOperator(outPages::add),
226+
TimeValue.timeValueNanos(statusInterval),
227+
() -> {}
228+
);
229+
230+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
231+
232+
int iterationsPerTick = randomIntBetween(1, 10);
233+
234+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
235+
logger.info("status {} {}", i, driver.status());
236+
assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
237+
assertThat(driver.status().started(), equalTo(startEpoch));
238+
assertThat(driver.status().iterations(), equalTo((long) i));
239+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
240+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
241+
}
242+
243+
logger.info("status {}", driver.status());
244+
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
245+
assertThat(driver.status().started(), equalTo(startEpoch));
246+
assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
247+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));
248+
249+
logger.info("profile {}", driver.profile());
250+
assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
251+
assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
252+
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
253+
}
254+
205255
class NowSupplier implements LongSupplier {
206256
private final long startNanos;
207257
private final long waitTime;

0 commit comments

Comments
 (0)