|
18 | 18 | import org.elasticsearch.action.support.HandledTransportAction; |
19 | 19 | import org.elasticsearch.client.internal.node.NodeClient; |
20 | 20 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
| 21 | +import org.elasticsearch.compute.operator.DriverStatus; |
21 | 22 | import org.elasticsearch.compute.operator.DriverTaskRunner; |
22 | 23 | import org.elasticsearch.injection.guice.Inject; |
23 | 24 | import org.elasticsearch.tasks.Task; |
|
28 | 29 | import org.elasticsearch.xpack.esql.action.EsqlGetQueryRequest; |
29 | 30 | import org.elasticsearch.xpack.esql.action.EsqlQueryAction; |
30 | 31 |
|
| 32 | +import java.util.Set; |
| 33 | +import java.util.TreeSet; |
| 34 | + |
31 | 35 | import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN; |
32 | 36 |
|
33 | 37 | public class TransportEsqlGetQueryAction extends HandledTransportAction<EsqlGetQueryRequest, EsqlGetQueryResponse> { |
@@ -85,14 +89,30 @@ public void onFailure(Exception e) { |
85 | 89 | ); |
86 | 90 | } |
87 | 91 |
|
88 | | - private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo task, ListTasksResponse response) { |
| 92 | + private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo main, ListTasksResponse sub) { |
| 93 | + String query = main.description(); |
| 94 | + String coordinatingNode = main.node(); |
| 95 | + |
| 96 | + // TODO include completed drivers in documentsFound and valuesLoaded |
| 97 | + long documentsFound = 0; |
| 98 | + long valuesLoaded = 0; |
| 99 | + Set<String> dataNodes = new TreeSet<>(); |
| 100 | + for (TaskInfo info : sub.getTasks()) { |
| 101 | + DriverStatus status = (DriverStatus) info.status(); |
| 102 | + documentsFound += status.documentsFound(); |
| 103 | + valuesLoaded += status.valuesLoaded(); |
| 104 | + dataNodes.add(info.node()); |
| 105 | + } |
| 106 | + |
89 | 107 | return new EsqlGetQueryResponse.DetailedQuery( |
90 | | - task.taskId(), |
91 | | - task.startTime(), |
92 | | - task.runningTimeNanos(), |
93 | | - task.description(), // Query |
94 | | - task.node(), // Coordinating node |
95 | | - response.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes |
| 108 | + main.taskId(), |
| 109 | + main.startTime(), |
| 110 | + main.runningTimeNanos(), |
| 111 | + documentsFound, |
| 112 | + valuesLoaded, |
| 113 | + query, |
| 114 | + coordinatingNode, |
| 115 | + sub.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes |
96 | 116 | ); |
97 | 117 | } |
98 | 118 | } |
0 commit comments