Skip to content

Commit 7720e63

Browse files
authored
ESQL: Fetch min transport version we're going to target when resolving fields (#135633)
Relates #131108 Fetch min transport version we're going to target when resolving fields.
1 parent a98c47b commit 7720e63

File tree

12 files changed

+429
-82
lines changed

12 files changed

+429
-82
lines changed

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public Map<String, Object> runtimeFields() {
288288
return this.runtimeFields;
289289
}
290290

291-
Long nowInMillis() {
291+
public Long nowInMillis() {
292292
return nowInMillis;
293293
}
294294

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
/**
5555
* Dispatches child field-caps requests to old/new data nodes in the local cluster that have shards of the requesting indices.
5656
*/
57-
final class RequestDispatcher {
57+
public final class RequestDispatcher {
5858
static final Logger LOGGER = LogManager.getLogger(RequestDispatcher.class);
5959

6060
private final TransportService transportService;
@@ -74,7 +74,7 @@ final class RequestDispatcher {
7474
private final AtomicInteger executionRound = new AtomicInteger();
7575
private final Map<String, IndexSelector> indexSelectors;
7676

77-
RequestDispatcher(
77+
public RequestDispatcher(
7878
ClusterService clusterService,
7979
TransportService transportService,
8080
ProjectResolver projectResolver,
@@ -127,7 +127,7 @@ final class RequestDispatcher {
127127
}
128128
}
129129

130-
void execute() {
130+
public void execute() {
131131
executor.execute(new AbstractRunnable() {
132132
@Override
133133
public void onFailure(Exception e) {

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 18 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -129,40 +129,19 @@ public TransportFieldCapabilitiesAction(
129129

130130
@Override
131131
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
132-
executeRequest(
133-
task,
134-
request,
135-
(transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest(
136-
conn,
137-
REMOTE_TYPE.name(),
138-
fieldCapabilitiesRequest,
139-
TransportRequestOptions.EMPTY,
140-
responseHandler
141-
),
142-
listener
143-
);
132+
executeRequest(task, request, listener);
144133
}
145134

146-
public void executeRequest(
147-
Task task,
148-
FieldCapabilitiesRequest request,
149-
LinkedRequestExecutor linkedRequestExecutor,
150-
ActionListener<FieldCapabilitiesResponse> listener
151-
) {
135+
public void executeRequest(Task task, FieldCapabilitiesRequest request, ActionListener<FieldCapabilitiesResponse> listener) {
152136
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
153-
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l)));
137+
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l)));
154138
}
155139

156-
private void doExecuteForked(
157-
Task task,
158-
FieldCapabilitiesRequest request,
159-
LinkedRequestExecutor linkedRequestExecutor,
160-
ActionListener<FieldCapabilitiesResponse> listener
161-
) {
140+
private void doExecuteForked(Task task, FieldCapabilitiesRequest request, ActionListener<FieldCapabilitiesResponse> listener) {
162141
if (ccsCheckCompatibility) {
163142
checkCCSVersionCompatibility(request);
164143
}
165-
final Executor singleThreadedExecutor = buildSingleThreadedExecutor();
144+
final Executor singleThreadedExecutor = buildSingleThreadedExecutor(searchCoordinationExecutor, LOGGER);
166145
assert task instanceof CancellableTask;
167146
final CancellableTask fieldCapTask = (CancellableTask) task;
168147
// retrieve the initial timestamp in case the action is a cross cluster search
@@ -322,10 +301,11 @@ private void doExecuteForked(
322301
true,
323302
ActionListener.releaseAfter(remoteListener, refs.acquire())
324303
).delegateFailure(
325-
(responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest(
326-
transportService,
304+
(responseListener, conn) -> transportService.sendRequest(
327305
conn,
306+
REMOTE_TYPE.name(),
328307
remoteRequest,
308+
TransportRequestOptions.EMPTY,
329309
new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor)
330310
)
331311
)
@@ -339,7 +319,7 @@ private void doExecuteForked(
339319
}
340320
}
341321

342-
private Executor buildSingleThreadedExecutor() {
322+
public static Executor buildSingleThreadedExecutor(Executor searchCoordinationExecutor, Logger logger) {
343323
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor);
344324
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
345325
@Override
@@ -362,16 +342,7 @@ public void onFailure(Exception e) {
362342
});
363343
}
364344

365-
public interface LinkedRequestExecutor {
366-
void executeRemoteRequest(
367-
TransportService transportService,
368-
Transport.Connection conn,
369-
FieldCapabilitiesRequest remoteRequest,
370-
ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
371-
);
372-
}
373-
374-
private static void checkIndexBlocks(ProjectState projectState, String[] concreteIndices) {
345+
public static void checkIndexBlocks(ProjectState projectState, String[] concreteIndices) {
375346
var blocks = projectState.blocks();
376347
var projectId = projectState.projectId();
377348
if (blocks.global(projectId).isEmpty() && blocks.indices(projectId).isEmpty()) {
@@ -421,7 +392,7 @@ private static void mergeIndexResponses(
421392
}
422393
}
423394

424-
private static FieldCapabilitiesRequest prepareRemoteRequest(
395+
public static FieldCapabilitiesRequest prepareRemoteRequest(
425396
String clusterAlias,
426397
FieldCapabilitiesRequest request,
427398
OriginalIndices originalIndices,
@@ -615,10 +586,10 @@ private static void innerMerge(
615586
* This collector can contain a failure for an index even if one of its shards was successful. When building the final
616587
* list, these failures will be skipped because they have no affect on the final response.
617588
*/
618-
private static final class FailureCollector {
589+
public static final class FailureCollector {
619590
private final Map<String, Exception> failuresByIndex = new HashMap<>();
620591

621-
List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
592+
public List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
622593
Map<Tuple<String, String>, FieldCapabilitiesFailure> indexFailures = new HashMap<>();
623594
for (Map.Entry<String, Exception> failure : failuresByIndex.entrySet()) {
624595
String index = failure.getKey();
@@ -647,15 +618,15 @@ List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
647618
return new ArrayList<>(indexFailures.values());
648619
}
649620

650-
void collect(String index, Exception e) {
621+
public void collect(String index, Exception e) {
651622
failuresByIndex.putIfAbsent(index, e);
652623
}
653624

654-
void clear() {
625+
public void clear() {
655626
failuresByIndex.clear();
656627
}
657628

658-
boolean isEmpty() {
629+
public boolean isEmpty() {
659630
return failuresByIndex.isEmpty();
660631
}
661632
}
@@ -718,8 +689,8 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
718689
}
719690
}
720691

721-
private static class ForkingOnFailureActionListener<Response> extends AbstractThreadedActionListener<Response> {
722-
ForkingOnFailureActionListener(Executor executor, boolean forceExecution, ActionListener<Response> delegate) {
692+
public static class ForkingOnFailureActionListener<Response> extends AbstractThreadedActionListener<Response> {
693+
public ForkingOnFailureActionListener(Executor executor, boolean forceExecution, ActionListener<Response> delegate) {
723694
super(executor, forceExecution, delegate);
724695
}
725696

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9189000,9185001
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
initial_9.2.0,9185000
1+
esql_resolve_fields_response_created,9185001
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
timestamp_range_telemetry,9188000
1+
esql_resolve_fields_response_created,9189000

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,6 @@ protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException {
130130

131131
protected boolean supportsNodeAssignment() throws IOException {
132132
if (supportsNodeAssignment == null) {
133-
for (NodeInfo i : allNodeToInfo().values()) {
134-
logger.error("NOCOMMIT {}", i);
135-
}
136133
supportsNodeAssignment = allNodeToInfo().values()
137134
.stream()
138135
.allMatch(i -> (i.roles.contains("index") && i.roles.contains("search")) || (i.roles.contains("data")));
@@ -542,10 +539,8 @@ private boolean syntheticSourceByDefault() {
542539
}
543540

544541
private Map<String, NodeInfo> expectedIndices() throws IOException {
545-
logger.error("ADFADF NOCOMMIT");
546542
Map<String, NodeInfo> result = new TreeMap<>();
547543
if (supportsNodeAssignment()) {
548-
logger.error("supports {}", allNodeToInfo());
549544
for (Map.Entry<String, NodeInfo> e : allNodeToInfo().entrySet()) {
550545
String name = indexMode + "_" + e.getKey();
551546
if (e.getValue().cluster != null) {
@@ -554,7 +549,6 @@ private Map<String, NodeInfo> expectedIndices() throws IOException {
554549
result.put(name, e.getValue());
555550
}
556551
} else {
557-
logger.error("one per {}", allNodeToInfo());
558552
for (Map.Entry<String, NodeInfo> e : allNodeToInfo().entrySet()) {
559553
String name = indexMode.toString();
560554
if (e.getValue().cluster != null) {

0 commit comments

Comments
 (0)