Skip to content

Commit acdc79e

Browse files
authored
Merge branch 'main' into arpad-es-10338
2 parents 42aa195 + b7868ef commit acdc79e

File tree

7 files changed

+129
-76
lines changed

7 files changed

+129
-76
lines changed

docs/changelog/123290.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123290
2+
summary: Fix Driver status iterations and `cpuTime`
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 122967

modules/ingest-geoip/src/main/plugin-metadata/entitlement-policy.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
org.elasticsearch.ingest.geoip:
2+
- outbound_network
23
- files:
34
- relative_path: "ingest-geoip"
45
relative_to: config

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,17 @@
99

1010
package org.elasticsearch.action.admin.cluster.snapshots.status;
1111

12-
import org.elasticsearch.ElasticsearchParseException;
1312
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
14-
import org.elasticsearch.cluster.metadata.IndexMetadata;
1513
import org.elasticsearch.common.io.stream.StreamInput;
1614
import org.elasticsearch.common.io.stream.StreamOutput;
17-
import org.elasticsearch.common.xcontent.XContentParserUtils;
18-
import org.elasticsearch.index.Index;
1915
import org.elasticsearch.index.shard.ShardId;
2016
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
21-
import org.elasticsearch.xcontent.ConstructingObjectParser;
22-
import org.elasticsearch.xcontent.ObjectParser;
23-
import org.elasticsearch.xcontent.ParseField;
2417
import org.elasticsearch.xcontent.ToXContentFragment;
2518
import org.elasticsearch.xcontent.XContentBuilder;
26-
import org.elasticsearch.xcontent.XContentParser;
2719

2820
import java.io.IOException;
2921
import java.util.Objects;
3022

31-
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
32-
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
33-
3423
public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {
3524

3625
private final SnapshotIndexShardStage stage;
@@ -149,59 +138,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
149138
return builder;
150139
}
151140

152-
static final ObjectParser.NamedObjectParser<SnapshotIndexShardStatus, String> PARSER;
153-
static {
154-
ConstructingObjectParser<SnapshotIndexShardStatus, ShardId> innerParser = new ConstructingObjectParser<>(
155-
"snapshot_index_shard_status",
156-
true,
157-
(Object[] parsedObjects, ShardId shard) -> {
158-
int i = 0;
159-
String rawStage = (String) parsedObjects[i++];
160-
String nodeId = (String) parsedObjects[i++];
161-
String failure = (String) parsedObjects[i++];
162-
SnapshotStats stats = (SnapshotStats) parsedObjects[i];
163-
164-
SnapshotIndexShardStage stage;
165-
try {
166-
stage = SnapshotIndexShardStage.valueOf(rawStage);
167-
} catch (IllegalArgumentException iae) {
168-
throw new ElasticsearchParseException(
169-
"failed to parse snapshot index shard status [{}][{}], unknown stage [{}]",
170-
shard.getIndex().getName(),
171-
shard.getId(),
172-
rawStage
173-
);
174-
}
175-
return new SnapshotIndexShardStatus(shard, stage, stats, nodeId, failure);
176-
}
177-
);
178-
innerParser.declareString(constructorArg(), new ParseField(Fields.STAGE));
179-
innerParser.declareString(optionalConstructorArg(), new ParseField(Fields.NODE));
180-
innerParser.declareString(optionalConstructorArg(), new ParseField(Fields.REASON));
181-
innerParser.declareObject(constructorArg(), (p, c) -> SnapshotStats.fromXContent(p), new ParseField(SnapshotStats.Fields.STATS));
182-
PARSER = (p, indexId, shardName) -> {
183-
// Combine the index name in the context with the shard name passed in for the named object parser
184-
// into a ShardId to pass as context for the inner parser.
185-
int shard;
186-
try {
187-
shard = Integer.parseInt(shardName);
188-
} catch (NumberFormatException nfe) {
189-
throw new ElasticsearchParseException(
190-
"failed to parse snapshot index shard status [{}], expected numeric shard id but got [{}]",
191-
indexId,
192-
shardName
193-
);
194-
}
195-
ShardId shardId = new ShardId(new Index(indexId, IndexMetadata.INDEX_UUID_NA_VALUE), shard);
196-
return innerParser.parse(p, shardId);
197-
};
198-
}
199-
200-
public static SnapshotIndexShardStatus fromXContent(XContentParser parser, String indexId) throws IOException {
201-
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
202-
return PARSER.parse(parser, indexId, parser.currentName());
203-
}
204-
205141
@Override
206142
public boolean equals(Object o) {
207143
if (this == o) {

server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatusTests.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,23 @@
99

1010
package org.elasticsearch.action.admin.cluster.snapshots.status;
1111

12+
import org.elasticsearch.ElasticsearchParseException;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.common.xcontent.XContentParserUtils;
1415
import org.elasticsearch.index.Index;
1516
import org.elasticsearch.index.shard.ShardId;
1617
import org.elasticsearch.test.AbstractXContentTestCase;
18+
import org.elasticsearch.xcontent.ConstructingObjectParser;
19+
import org.elasticsearch.xcontent.ObjectParser;
20+
import org.elasticsearch.xcontent.ParseField;
1721
import org.elasticsearch.xcontent.XContentParser;
1822

1923
import java.io.IOException;
2024
import java.util.function.Predicate;
2125

26+
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
27+
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
28+
2229
public class SnapshotIndexShardStatusTests extends AbstractXContentTestCase<SnapshotIndexShardStatus> {
2330

2431
@Override
@@ -48,7 +55,7 @@ protected Predicate<String> getRandomFieldsExcludeFilter() {
4855
protected SnapshotIndexShardStatus doParseInstance(XContentParser parser) throws IOException {
4956
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
5057
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
51-
SnapshotIndexShardStatus status = SnapshotIndexShardStatus.fromXContent(parser, parser.currentName());
58+
SnapshotIndexShardStatus status = SnapshotIndexShardStatusTests.fromXContent(parser, parser.currentName());
5259
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
5360
return status;
5461
}
@@ -57,4 +64,58 @@ protected SnapshotIndexShardStatus doParseInstance(XContentParser parser) throws
5764
protected boolean supportsUnknownFields() {
5865
return true;
5966
}
67+
68+
static final ObjectParser.NamedObjectParser<SnapshotIndexShardStatus, String> PARSER;
69+
70+
static {
71+
ConstructingObjectParser<SnapshotIndexShardStatus, ShardId> innerParser = new ConstructingObjectParser<>(
72+
"snapshot_index_shard_status",
73+
true,
74+
(Object[] parsedObjects, ShardId shard) -> {
75+
int i = 0;
76+
String rawStage = (String) parsedObjects[i++];
77+
String nodeId = (String) parsedObjects[i++];
78+
String failure = (String) parsedObjects[i++];
79+
SnapshotStats stats = (SnapshotStats) parsedObjects[i];
80+
81+
SnapshotIndexShardStage stage;
82+
try {
83+
stage = SnapshotIndexShardStage.valueOf(rawStage);
84+
} catch (IllegalArgumentException iae) {
85+
throw new ElasticsearchParseException(
86+
"failed to parse snapshot index shard status [{}][{}], unknown stage [{}]",
87+
shard.getIndex().getName(),
88+
shard.getId(),
89+
rawStage
90+
);
91+
}
92+
return new SnapshotIndexShardStatus(shard, stage, stats, nodeId, failure);
93+
}
94+
);
95+
innerParser.declareString(constructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.STAGE));
96+
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.NODE));
97+
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.REASON));
98+
innerParser.declareObject(constructorArg(), (p, c) -> SnapshotStats.fromXContent(p), new ParseField(SnapshotStats.Fields.STATS));
99+
PARSER = (p, indexId, shardName) -> {
100+
// Combine the index name in the context with the shard name passed in for the named object parser
101+
// into a ShardId to pass as context for the inner parser.
102+
int shard;
103+
try {
104+
shard = Integer.parseInt(shardName);
105+
} catch (NumberFormatException nfe) {
106+
throw new ElasticsearchParseException(
107+
"failed to parse snapshot index shard status [{}], expected numeric shard id but got [{}]",
108+
indexId,
109+
shardName
110+
);
111+
}
112+
ShardId shardId = new ShardId(new Index(indexId, IndexMetadata.INDEX_UUID_NA_VALUE), shard);
113+
return innerParser.parse(p, shardId);
114+
};
115+
}
116+
117+
public static SnapshotIndexShardStatus fromXContent(XContentParser parser, String indexId) throws IOException {
118+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
119+
return PARSER.parse(parser, indexId, parser.currentName());
120+
}
60121
}

server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatusTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class SnapshotIndexStatusTests extends AbstractXContentTestCase<SnapshotI
6060
innerParser.declareObject(constructorArg(), (p, c) -> SnapshotStats.fromXContent(p), new ParseField(SnapshotStats.Fields.STATS));
6161
innerParser.declareNamedObjects(
6262
constructorArg(),
63-
SnapshotIndexShardStatus.PARSER,
63+
SnapshotIndexShardStatusTests.PARSER,
6464
new ParseField(SnapshotIndexStatus.Fields.SHARDS)
6565
);
6666
PARSER = ((p, c, name) -> innerParser.apply(p, name));

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,16 @@ public DriverContext driverContext() {
171171
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
172172
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
173173
long maxTimeNanos = maxTime.nanos();
174+
// Start time, used to stop the calculations after maxTime has passed.
174175
long startTime = nowSupplier.getAsLong();
176+
// The time of the next forced status update.
175177
long nextStatus = startTime + statusNanos;
176-
int iter = 0;
178+
// Total executed iterations this run, used to stop the calculations after maxIterations have passed.
179+
int totalIterationsThisRun = 0;
180+
// The iterations to be reported on the next status update.
181+
int iterationsSinceLastStatusUpdate = 0;
182+
// The time passed since the last status update.
183+
long lastStatusUpdateTime = startTime;
177184
while (true) {
178185
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
179186
try {
@@ -182,29 +189,33 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
182189
closeEarlyFinishedOperators();
183190
assert isFinished() : "not finished after early termination";
184191
}
185-
iter++;
192+
totalIterationsThisRun++;
193+
iterationsSinceLastStatusUpdate++;
194+
195+
long now = nowSupplier.getAsLong();
186196
if (isBlocked.listener().isDone() == false) {
187-
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
197+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
188198
return isBlocked.listener();
189199
}
190200
if (isFinished()) {
191-
finishNanos = nowSupplier.getAsLong();
192-
updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done");
201+
finishNanos = now;
202+
updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
193203
driverContext.finish();
194204
Releasables.close(releasable, driverContext.getSnapshot());
195205
return Operator.NOT_BLOCKED.listener();
196206
}
197-
long now = nowSupplier.getAsLong();
198-
if (iter >= maxIterations) {
199-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations");
207+
if (totalIterationsThisRun >= maxIterations) {
208+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
200209
return Operator.NOT_BLOCKED.listener();
201210
}
202211
if (now - startTime >= maxTimeNanos) {
203-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time");
212+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
204213
return Operator.NOT_BLOCKED.listener();
205214
}
206215
if (now > nextStatus) {
207-
updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running");
216+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
217+
iterationsSinceLastStatusUpdate = 0;
218+
lastStatusUpdateTime = now;
208219
nextStatus = now + statusNanos;
209220
}
210221
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,44 @@ public void testProfileAndStatusTimeout() {
167167
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
168168
}
169169

170+
public void testProfileAndStatusInterval() {
171+
DriverContext driverContext = driverContext();
172+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
173+
List<Page> outPages = new ArrayList<>();
174+
175+
long startEpoch = randomNonNegativeLong();
176+
long startNanos = randomLong();
177+
long waitTime = randomLongBetween(10000, 100000);
178+
long tickTime = randomLongBetween(10000, 100000);
179+
long statusInterval = randomLongBetween(1, 10);
180+
181+
Driver driver = createDriver(startEpoch, startNanos, driverContext, inPages, outPages, TimeValue.timeValueNanos(statusInterval));
182+
183+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
184+
185+
int iterationsPerTick = randomIntBetween(1, 10);
186+
187+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
188+
logger.info("status {} {}", i, driver.status());
189+
assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
190+
assertThat(driver.status().started(), equalTo(startEpoch));
191+
assertThat(driver.status().iterations(), equalTo((long) i));
192+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
193+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
194+
}
195+
196+
logger.info("status {}", driver.status());
197+
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
198+
assertThat(driver.status().started(), equalTo(startEpoch));
199+
assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
200+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));
201+
202+
logger.info("profile {}", driver.profile());
203+
assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
204+
assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
205+
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
206+
}
207+
170208
private static Driver createDriver(
171209
long startEpoch,
172210
long startNanos,

0 commit comments

Comments
 (0)