Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ public class Driver implements Releasable, Describable {
private final String sessionId;

/**
* Description of the task this driver is running. This description should be
* short and meaningful as a grouping identifier. We use the phase of the
* query right now: "data", "node_reduce", "final".
* Description of the driver. This description should be short and meaningful as a grouping identifier.
* We use the phase of the query right now: "data", "node_reduce", "final".
*/
private final String taskDescription;
private final String shortDescription;

/**
* The wall clock time when this driver was created in milliseconds since epoch.
Expand Down Expand Up @@ -103,10 +102,8 @@ public class Driver implements Releasable, Describable {
/**
* Creates a new driver with a chain of operators.
* @param sessionId session Id
* @param taskDescription Description of the task this driver is running. This
* description should be short and meaningful as a grouping
* identifier. We use the phase of the query right now:
* "data", "node_reduce", "final".
* @param shortDescription Description of the driver. This description should be short and meaningful as a grouping identifier.
* We use the phase of the query right now: "data", "node_reduce", "final".
* @param driverContext the driver context
* @param source source operator
* @param intermediateOperators the chain of operators to execute
Expand All @@ -116,7 +113,7 @@ public class Driver implements Releasable, Describable {
*/
public Driver(
String sessionId,
String taskDescription,
String shortDescription,
long startTime,
long startNanos,
DriverContext driverContext,
Expand All @@ -128,7 +125,7 @@ public Driver(
Releasable releasable
) {
this.sessionId = sessionId;
this.taskDescription = taskDescription;
this.shortDescription = shortDescription;
this.startTime = startTime;
this.startNanos = startNanos;
this.driverContext = driverContext;
Expand All @@ -142,7 +139,7 @@ public Driver(
this.status = new AtomicReference<>(
new DriverStatus(
sessionId,
taskDescription,
shortDescription,
startTime,
System.currentTimeMillis(),
0,
Expand All @@ -155,37 +152,6 @@ public Driver(
);
}

/**
* Creates a new driver with a chain of operators.
* @param driverContext the driver context
* @param source source operator
* @param intermediateOperators the chain of operators to execute
* @param sink sink operator
* @param releasable a {@link Releasable} to invoked once the chain of operators has run to completion
*/
public Driver(
String taskDescription,
DriverContext driverContext,
SourceOperator source,
List<Operator> intermediateOperators,
SinkOperator sink,
Releasable releasable
) {
this(
"unset",
taskDescription,
System.currentTimeMillis(),
System.nanoTime(),
driverContext,
() -> null,
source,
intermediateOperators,
sink,
DEFAULT_STATUS_INTERVAL,
releasable
);
}

public DriverContext driverContext() {
return driverContext;
}
Expand Down Expand Up @@ -512,7 +478,7 @@ public DriverProfile profile() {
throw new IllegalStateException("can only get profile from finished driver");
}
return new DriverProfile(
status.taskDescription(),
status.description(),
status.started(),
status.lastUpdated(),
finishNanos - startNanos,
Expand Down Expand Up @@ -559,7 +525,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.

return new DriverStatus(
sessionId,
taskDescription,
shortDescription,
startTime,
now,
prev.cpuNanos() + extraCpuNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
* short and meaningful as a grouping identifier. We use the phase of the
* query right now: "data", "node_reduce", "final".
*/
private final String taskDescription;
private final String description;

/**
* Millis since epoch when the driver started.
Expand Down Expand Up @@ -69,7 +69,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
private final DriverSleeps sleeps;

public DriverProfile(
String taskDescription,
String description,
long startMillis,
long stopMillis,
long tookNanos,
Expand All @@ -78,7 +78,7 @@ public DriverProfile(
List<DriverStatus.OperatorStatus> operators,
DriverSleeps sleeps
) {
this.taskDescription = taskDescription;
this.description = description;
this.startMillis = startMillis;
this.stopMillis = stopMillis;
this.tookNanos = tookNanos;
Expand All @@ -89,7 +89,7 @@ public DriverProfile(
}

public DriverProfile(StreamInput in) throws IOException {
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90) ? in.readString() : "";
this.description = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90) ? in.readString() : "";
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
this.startMillis = in.readVLong();
this.stopMillis = in.readVLong();
Expand All @@ -113,7 +113,7 @@ public DriverProfile(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
out.writeString(taskDescription);
out.writeString(description);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeVLong(startMillis);
Expand All @@ -131,8 +131,8 @@ public void writeTo(StreamOutput out) throws IOException {
/**
* Description of the task this driver is running.
*/
public String taskDescription() {
return taskDescription;
public String description() {
return description;
}

/**
Expand Down Expand Up @@ -189,7 +189,7 @@ public DriverSleeps sleeps() {
@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single((b, p) -> {
b.field("task_description", taskDescription);
b.field("description", description);
b.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis);
b.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis);
b.field("took_nanos", tookNanos);
Expand Down Expand Up @@ -218,7 +218,7 @@ public boolean equals(Object o) {
return false;
}
DriverProfile that = (DriverProfile) o;
return taskDescription.equals(that.taskDescription)
return description.equals(that.description)
&& startMillis == that.startMillis
&& stopMillis == that.stopMillis
&& tookNanos == that.tookNanos
Expand All @@ -230,7 +230,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
return Objects.hash(description, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class DriverStatus implements Task.Status {
/**
* Description of the task this driver is running.
*/
private final String taskDescription;
private final String description;

/**
* Milliseconds since epoch when this driver started.
Expand Down Expand Up @@ -88,7 +88,7 @@ public class DriverStatus implements Task.Status {

DriverStatus(
String sessionId,
String taskDescription,
String description,
long started,
long lastUpdated,
long cpuTime,
Expand All @@ -99,7 +99,7 @@ public class DriverStatus implements Task.Status {
DriverSleeps sleeps
) {
this.sessionId = sessionId;
this.taskDescription = taskDescription;
this.description = description;
this.started = started;
this.lastUpdated = lastUpdated;
this.cpuNanos = cpuTime;
Expand All @@ -112,7 +112,7 @@ public class DriverStatus implements Task.Status {

public DriverStatus(StreamInput in) throws IOException {
this.sessionId = in.readString();
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90) ? in.readString() : "";
this.description = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90) ? in.readString() : "";
this.started = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0;
this.lastUpdated = in.readLong();
this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
Expand All @@ -131,7 +131,7 @@ public DriverStatus(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(sessionId);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
out.writeString(taskDescription);
out.writeString(description);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
out.writeLong(started);
Expand Down Expand Up @@ -166,8 +166,8 @@ public String sessionId() {
* short and meaningful as a grouping identifier. We use the phase of the
* query right now: "data", "node_reduce", "final".
*/
public String taskDescription() {
return taskDescription;
public String description() {
return description;
}

/**
Expand Down Expand Up @@ -232,7 +232,7 @@ public List<OperatorStatus> activeOperators() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("session_id", sessionId);
builder.field("task_description", taskDescription);
builder.field("description", description);
builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started));
builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated));
builder.field("cpu_nanos", cpuNanos);
Expand Down Expand Up @@ -261,7 +261,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
DriverStatus that = (DriverStatus) o;
return sessionId.equals(that.sessionId)
&& taskDescription.equals(that.taskDescription)
&& description.equals(that.description)
&& started == that.started
&& lastUpdated == that.lastUpdated
&& cpuNanos == that.cpuNanos
Expand All @@ -276,7 +276,7 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(
sessionId,
taskDescription,
description,
started,
lastUpdated,
cpuNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.compute.test.BlockTestUtils;
import org.elasticsearch.compute.test.OperatorTestCase;
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
import org.elasticsearch.compute.test.TestDriverFactory;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void testQueryOperator() throws IOException {
}
});
DriverContext driverContext = driverContext();
drivers.add(new Driver("test", driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
drivers.add(TestDriverFactory.create(driverContext, factory.get(driverContext), List.of(), docCollector));
}
OperatorTestCase.runDriver(drivers);
Set<Integer> expectedDocIds = searchForDocIds(reader, query);
Expand Down Expand Up @@ -214,8 +215,7 @@ public String toString() {
driverContext
)
);
Driver driver = new Driver(
"test",
Driver driver = TestDriverFactory.create(
driverContext,
luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext),
operators,
Expand All @@ -228,8 +228,7 @@ public String toString() {
actualCounts.put(BytesRef.deepCopyOf(spare), counts.getLong(i));
}
page.releaseBlocks();
}),
() -> {}
})
);
OperatorTestCase.runDriver(driver);
assertThat(actualCounts, equalTo(expectedCounts));
Expand All @@ -248,8 +247,7 @@ public void testLimitOperator() {
var results = new ArrayList<Long>();
DriverContext driverContext = driverContext();
try (
var driver = new Driver(
"test",
var driver = TestDriverFactory.create(
driverContext,
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
List.of((new LimitOperator.Factory(limit)).get(driverContext)),
Expand All @@ -258,8 +256,7 @@ public void testLimitOperator() {
for (int i = 0; i < page.getPositionCount(); i++) {
results.add(block.getLong(i));
}
}),
() -> {}
})
)
) {
OperatorTestCase.runDriver(driver);
Expand Down Expand Up @@ -336,8 +333,7 @@ public void testHashLookup() {
var actualValues = new ArrayList<>();
var actualPrimeOrds = new ArrayList<>();
try (
var driver = new Driver(
"test",
var driver = TestDriverFactory.create(
driverContext,
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
List.of(
Expand All @@ -354,8 +350,7 @@ public void testHashLookup() {
} finally {
page.releaseBlocks();
}
}),
() -> {}
})
)
) {
OperatorTestCase.runDriver(driver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.compute.test.BlockTestUtils;
import org.elasticsearch.compute.test.CannedSourceOperator;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.compute.test.TestDriverFactory;
import org.elasticsearch.compute.test.TestResultPageSinkOperator;
import org.hamcrest.Matcher;

Expand Down Expand Up @@ -110,13 +111,11 @@ public final void testIgnoresNulls() {
List<Page> origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance());

try (
Driver d = new Driver(
"test",
Driver d = TestDriverFactory.create(
driverContext,
new NullInsertingSourceOperator(new CannedSourceOperator(input.iterator()), blockFactory),
List.of(simple().get(driverContext)),
new TestResultPageSinkOperator(results::add),
() -> {}
new TestResultPageSinkOperator(results::add)
)
) {
runDriver(d);
Expand Down
Loading