From d8be77b78750f14438482b41e5b34f68bff847d4 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Fri, 21 Mar 2025 17:09:37 +0100 Subject: [PATCH 1/3] Document plan execution across nodes --- .../xpack/esql/plugin/ComputeService.java | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 36d6938644534..99f3c154a8304 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -68,7 +68,41 @@ import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; /** - * Computes the result of a {@link PhysicalPlan}. + * Once query is parsed and validated it is scheduled for execution by {@code org.elasticsearch.xpack.esql.plugin.ComputeService#execute} + * This method is responsible for splitting physical plan into coordinator and data node plans. + * + * Coordinator plan is immediately executed locally (using {@code org.elasticsearch.xpack.esql.plugin.ComputeService#runCompute}) + * and is prepared to collect and merge pages from data nodes into the final qquery result. + * + * Data node plan is passed to {@code org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#startComputeOnDataNodes} + * that is responsible for + *
  • + * Determining list of nodes that contain shards referenced by the query with + * {@code org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender#searchShards} + *
  • + *
  • + * Each node in the list processed in + * {@code org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#startComputeOnDataNodes} + * in order to + *
  • + * Open ExchangeSink on the target data node and link it with local ExchangeSource for the query + * using `internal:data/read/esql/open_exchange` transport request. + * {@see org.elasticsearch.compute.operator.exchange.ExchangeService#openExchange} + *
  • + *
  • + * Start data node plan execution on the target data node + * using `indices:data/read/esql/data` transport request + * {@see org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#messageReceived} + * {@see org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#runComputeOnDataNode} + *
  • + *
  • + * While coordinator plan executor is running it will read data from ExchangeSource that will poll pages + * from linked ExchangeSink on target data nodes or notify zem that data set is already completed + * (for example when running FROM * | LIMIT 10 type of query) or query is canceled + * using `internal:data/read/esql/exchange` transport requests. + * {@see org.elasticsearch.compute.operator.exchange.ExchangeService.ExchangeTransportAction#messageReceived} + *
  • + * */ public class ComputeService { public static final String DATA_ACTION_NAME = EsqlQueryAction.NAME + "/data"; From 0c0f5f5c8a23a14460084b7c77085659a464d0f8 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 24 Mar 2025 08:36:10 +0100 Subject: [PATCH 2/3] fix typos --- .../org/elasticsearch/xpack/esql/plugin/ComputeService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 99f3c154a8304..18cdb52bf6fb1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -72,7 +72,7 @@ * This method is responsible for splitting physical plan into coordinator and data node plans. * * Coordinator plan is immediately executed locally (using {@code org.elasticsearch.xpack.esql.plugin.ComputeService#runCompute}) - * and is prepared to collect and merge pages from data nodes into the final qquery result. + * and is prepared to collect and merge pages from data nodes into the final query result. * * Data node plan is passed to {@code org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#startComputeOnDataNodes} * that is responsible for @@ -97,7 +97,7 @@ * *
  • * While coordinator plan executor is running it will read data from ExchangeSource that will poll pages - * from linked ExchangeSink on target data nodes or notify zem that data set is already completed + * from linked ExchangeSink on target data nodes or notify them that data set is already completed * (for example when running FROM * | LIMIT 10 type of query) or query is canceled * using `internal:data/read/esql/exchange` transport requests. * {@see org.elasticsearch.compute.operator.exchange.ExchangeService.ExchangeTransportAction#messageReceived} From f64a944e5f998ca2f33f995abc1b7203756728ae Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 24 Mar 2025 08:55:34 +0100 Subject: [PATCH 3/3] fix tags --- .../org/elasticsearch/xpack/esql/plugin/ComputeService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 18cdb52bf6fb1..293be0eb3c2b0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -76,6 +76,7 @@ * * Data node plan is passed to {@code org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#startComputeOnDataNodes} * that is responsible for + *
      *
    • * Determining list of nodes that contain shards referenced by the query with * {@code org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender#searchShards} @@ -84,6 +85,7 @@ * Each node in the list processed in * {@code org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#startComputeOnDataNodes} * in order to + *
        *
      • * Open ExchangeSink on the target data node and link it with local ExchangeSource for the query * using `internal:data/read/esql/open_exchange` transport request. @@ -102,7 +104,9 @@ * using `internal:data/read/esql/exchange` transport requests. * {@see org.elasticsearch.compute.operator.exchange.ExchangeService.ExchangeTransportAction#messageReceived} *
      • + *
      *
    • + *
    */ public class ComputeService { public static final String DATA_ACTION_NAME = EsqlQueryAction.NAME + "/data";