Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.junit.Before;
Expand Down Expand Up @@ -730,6 +731,35 @@ public void testOneSubQuery() {
assertTrue(e.getMessage().contains("Fork requires at least two branches"));
}

public void testProfile() {
var query = """
FROM test
| FORK
( WHERE content:"fox" | SORT id )
( WHERE content:"dog" | SORT id )
| SORT _fork, id
| KEEP _fork, id, content
""";

EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();

request.pragmas(randomPragmas());
request.query(query);
request.profile(true);

try (var resp = run(request)) {
EsqlQueryResponse.Profile profile = resp.profile();
assertNotNull(profile);

List<String> descriptions = profile.drivers().stream().map(DriverProfile::description).sorted().toList();

assertEquals(
List.of("data", "data", "main.final", "node_reduce", "node_reduce", "subplan-0.final", "subplan-1.final"),
descriptions
);
}
}

private void createAndPopulateIndex() {
var indexName = "test";
var client = client().admin().indices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void execute(

// we have no sub plans, so we can just execute the given plan
if (subplans == null || subplans.size() == 0) {
executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, listener, null);
executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, null, listener, null);
return;
}

Expand All @@ -220,7 +220,7 @@ public void execute(
var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId));
var computeContext = new ComputeContext(
mainSessionId,
"single",
"main.final",
LOCAL_CLUSTER,
List.of(),
configuration,
Expand All @@ -244,22 +244,33 @@ public void execute(
) {
runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());

for (PhysicalPlan subplan : subplans) {
for (int i = 0; i < subplans.size(); i++) {
var subplan = subplans.get(i);
var childSessionId = newChildSession(sessionId);
ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize());
// funnel sub plan pages into the main plan exchange source
mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
var subPlanListener = localListener.acquireCompute();

executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> {
exchangeSink.addCompletionListener(
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
);
subPlanListener.onResponse(result.completionInfo());
}, e -> {
exchangeService.finishSinkHandler(childSessionId, e);
subPlanListener.onFailure(e);
}), () -> exchangeSink.createExchangeSink(() -> {}));
executePlan(
childSessionId,
rootTask,
subplan,
configuration,
foldContext,
execInfo,
"subplan-" + i,
ActionListener.wrap(result -> {
exchangeSink.addCompletionListener(
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
);
subPlanListener.onResponse(result.completionInfo());
}, e -> {
exchangeService.finishSinkHandler(childSessionId, e);
subPlanListener.onFailure(e);
}),
() -> exchangeSink.createExchangeSink(() -> {})
);
}
}
}
Expand All @@ -272,6 +283,7 @@ public void executePlan(
Configuration configuration,
FoldContext foldContext,
EsqlExecutionInfo execInfo,
String profileQualifier,
ActionListener<Result> listener,
Supplier<ExchangeSink> exchangeSinkSupplier
) {
Expand Down Expand Up @@ -309,7 +321,7 @@ public void executePlan(
}
var computeContext = new ComputeContext(
newChildSession(sessionId),
"single",
profileDescription(profileQualifier, "single"),
LOCAL_CLUSTER,
List.of(),
configuration,
Expand Down Expand Up @@ -395,7 +407,7 @@ public void executePlan(
rootTask,
new ComputeContext(
sessionId,
"final",
profileDescription(profileQualifier, "final"),
LOCAL_CLUSTER,
List.of(),
configuration,
Expand Down Expand Up @@ -611,6 +623,10 @@ String newChildSession(String session) {
return session + "/" + childSessionIdGenerator.incrementAndGet();
}

String profileDescription(String qualifier, String label) {
return qualifier == null ? label : qualifier + "." + label;
}

Runnable cancelQueryOnFailure(CancellableTask task) {
return new RunOnce(() -> {
LOGGER.debug("cancelling ESQL task {} on failure", task);
Expand Down