Skip to content

Commit 31758a8

Browse files
committed
Mirror upstream elastic#137247 as single snapshot commit for AI review
BASE=60406a6315bb9b1fc847e614175899a9161b2e82 HEAD=5aab46d5a38808333f5f4a432ca3057a015f9162 Branch=main
1 parent 60406a6 commit 31758a8

File tree

20 files changed

+1196
-158
lines changed

20 files changed

+1196
-158
lines changed

server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ResolvedIndices;
1414
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.client.internal.RemoteClusterClient;
1516
import org.elasticsearch.cluster.metadata.DataStream;
1617
import org.elasticsearch.cluster.metadata.IndexMetadata;
1718
import org.elasticsearch.cluster.routing.allocation.DataTier;
@@ -36,11 +37,13 @@
3637
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
3738
import org.elasticsearch.search.builder.PointInTimeBuilder;
3839
import org.elasticsearch.transport.RemoteClusterAware;
40+
import org.elasticsearch.transport.RemoteClusterService;
3941
import org.elasticsearch.xcontent.XContentParser;
4042
import org.elasticsearch.xcontent.XContentParserConfiguration;
4143

4244
import java.util.ArrayList;
4345
import java.util.Collections;
46+
import java.util.HashMap;
4447
import java.util.HashSet;
4548
import java.util.List;
4649
import java.util.Map;
@@ -52,6 +55,8 @@
5255
import java.util.function.Predicate;
5356
import java.util.stream.Collectors;
5457

58+
import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
59+
5560
/**
5661
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
5762
*/
@@ -72,6 +77,7 @@ public class QueryRewriteContext {
7277
protected final Client client;
7378
protected final LongSupplier nowInMillis;
7479
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();
80+
private final Map<String, List<BiConsumer<RemoteClusterClient, ActionListener<?>>>> remoteAsyncActions = new HashMap<>();
7581
protected boolean allowUnmappedFields;
7682
protected boolean mapUnmappedFieldAsString;
7783
protected Predicate<String> allowedFields;
@@ -346,22 +352,35 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
346352
asyncActions.add(asyncAction);
347353
}
348354

355+
public void registerRemoteAsyncAction(String clusterAlias, BiConsumer<RemoteClusterClient, ActionListener<?>> asyncAction) {
356+
List<BiConsumer<RemoteClusterClient, ActionListener<?>>> asyncActions = remoteAsyncActions.computeIfAbsent(
357+
clusterAlias,
358+
k -> new ArrayList<>()
359+
);
360+
asyncActions.add(asyncAction);
361+
}
362+
349363
/**
350364
* Returns <code>true</code> if there are any registered async actions.
351365
*/
352366
public boolean hasAsyncActions() {
353-
return asyncActions.isEmpty() == false;
367+
return asyncActions.isEmpty() == false || remoteAsyncActions.isEmpty() == false;
354368
}
355369

356370
/**
357371
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
358372
* <code>null</code>. The list of registered actions is cleared once this method returns.
359373
*/
360374
public void executeAsyncActions(ActionListener<Void> listener) {
361-
if (asyncActions.isEmpty()) {
375+
if (asyncActions.isEmpty() && remoteAsyncActions.isEmpty()) {
362376
listener.onResponse(null);
363377
} else {
364-
CountDown countDown = new CountDown(asyncActions.size());
378+
int actionCount = asyncActions.size();
379+
for (var remoteAsyncActionList : remoteAsyncActions.values()) {
380+
actionCount += remoteAsyncActionList.size();
381+
}
382+
383+
CountDown countDown = new CountDown(actionCount);
365384
ActionListener<?> internalListener = new ActionListener<>() {
366385
@Override
367386
public void onResponse(Object o) {
@@ -377,12 +396,28 @@ public void onFailure(Exception e) {
377396
}
378397
}
379398
};
399+
380400
// make a copy to prevent concurrent modification exception
381401
List<BiConsumer<Client, ActionListener<?>>> biConsumers = new ArrayList<>(asyncActions);
382402
asyncActions.clear();
383403
for (BiConsumer<Client, ActionListener<?>> action : biConsumers) {
384404
action.accept(client, internalListener);
385405
}
406+
407+
for (var entry : remoteAsyncActions.entrySet()) {
408+
String clusterAlias = entry.getKey();
409+
List<BiConsumer<RemoteClusterClient, ActionListener<?>>> remoteBiConsumers = entry.getValue();
410+
411+
RemoteClusterClient remoteClient = client.getRemoteClusterClient(
412+
clusterAlias,
413+
DIRECT_EXECUTOR_SERVICE,
414+
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
415+
);
416+
for (BiConsumer<RemoteClusterClient, ActionListener<?>> action : remoteBiConsumers) {
417+
action.accept(remoteClient, internalListener);
418+
}
419+
}
420+
remoteAsyncActions.clear();
386421
}
387422
}
388423

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.inference.action;
9+
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.ActionResponse;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.RemoteClusterActionType;
15+
import org.elasticsearch.cluster.metadata.InferenceFieldMetadata;
16+
import org.elasticsearch.common.io.stream.StreamInput;
17+
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.core.Nullable;
19+
import org.elasticsearch.inference.InferenceResults;
20+
21+
import java.io.IOException;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
27+
public class GetInferenceFieldsAction extends ActionType<GetInferenceFieldsAction.Response> {
28+
public static final GetInferenceFieldsAction INSTANCE = new GetInferenceFieldsAction();
29+
public static final RemoteClusterActionType<Response> REMOTE_TYPE = new RemoteClusterActionType<>(INSTANCE.name(), Response::new);
30+
31+
public static final String NAME = "cluster:monitor/xpack/inference_fields/get";
32+
33+
public GetInferenceFieldsAction() {
34+
super(NAME);
35+
}
36+
37+
public static class Request extends ActionRequest {
38+
private final List<String> indices;
39+
private final List<String> fields;
40+
private final boolean resolveWildcards;
41+
private final boolean useDefaultFields;
42+
private final String query;
43+
44+
public Request(
45+
List<String> indices,
46+
List<String> fields,
47+
boolean resolveWildcards,
48+
boolean useDefaultFields,
49+
@Nullable String query
50+
) {
51+
this.indices = indices;
52+
this.fields = fields;
53+
this.resolveWildcards = resolveWildcards;
54+
this.useDefaultFields = useDefaultFields;
55+
this.query = query;
56+
}
57+
58+
public Request(StreamInput in) throws IOException {
59+
super(in);
60+
this.indices = in.readCollectionAsList(StreamInput::readString);
61+
this.fields = in.readCollectionAsList(StreamInput::readString);
62+
this.resolveWildcards = in.readBoolean();
63+
this.useDefaultFields = in.readBoolean();
64+
this.query = in.readOptionalString();
65+
}
66+
67+
@Override
68+
public void writeTo(StreamOutput out) throws IOException {
69+
super.writeTo(out);
70+
out.writeStringCollection(indices);
71+
out.writeStringCollection(fields);
72+
out.writeBoolean(resolveWildcards);
73+
out.writeBoolean(useDefaultFields);
74+
out.writeOptionalString(query);
75+
}
76+
77+
@Override
78+
public ActionRequestValidationException validate() {
79+
return null;
80+
}
81+
82+
public List<String> getIndices() {
83+
return Collections.unmodifiableList(indices);
84+
}
85+
86+
public List<String> getFields() {
87+
return Collections.unmodifiableList(fields);
88+
}
89+
90+
public boolean resolveWildcards() {
91+
return resolveWildcards;
92+
}
93+
94+
public boolean useDefaultFields() {
95+
return useDefaultFields;
96+
}
97+
98+
public String getQuery() {
99+
return query;
100+
}
101+
102+
@Override
103+
public boolean equals(Object o) {
104+
if (this == o) return true;
105+
if (o == null || getClass() != o.getClass()) return false;
106+
Request request = (Request) o;
107+
return Objects.equals(indices, request.indices)
108+
&& Objects.equals(fields, request.fields)
109+
&& resolveWildcards == request.resolveWildcards
110+
&& useDefaultFields == request.useDefaultFields
111+
&& Objects.equals(query, request.query);
112+
}
113+
114+
@Override
115+
public int hashCode() {
116+
return Objects.hash(indices, fields, resolveWildcards, useDefaultFields, query);
117+
}
118+
}
119+
120+
public static class Response extends ActionResponse {
121+
private final Map<String, List<InferenceFieldMetadata>> inferenceFieldsMap;
122+
private final Map<String, InferenceResults> inferenceResultsMap;
123+
124+
public Response(Map<String, List<InferenceFieldMetadata>> inferenceFieldsMap, Map<String, InferenceResults> inferenceResultsMap) {
125+
this.inferenceFieldsMap = inferenceFieldsMap;
126+
this.inferenceResultsMap = inferenceResultsMap;
127+
}
128+
129+
public Response(StreamInput in) throws IOException {
130+
this.inferenceFieldsMap = in.readImmutableMap(i -> i.readCollectionAsImmutableList(InferenceFieldMetadata::new));
131+
this.inferenceResultsMap = in.readImmutableMap(i -> i.readNamedWriteable(InferenceResults.class));
132+
}
133+
134+
@Override
135+
public void writeTo(StreamOutput out) throws IOException {
136+
out.writeMap(inferenceFieldsMap, StreamOutput::writeCollection);
137+
out.writeMap(inferenceResultsMap, StreamOutput::writeNamedWriteable);
138+
}
139+
140+
public Map<String, List<InferenceFieldMetadata>> getInferenceFieldsMap() {
141+
return Collections.unmodifiableMap(this.inferenceFieldsMap);
142+
}
143+
144+
public Map<String, InferenceResults> getInferenceResultsMap() {
145+
return Collections.unmodifiableMap(this.inferenceResultsMap);
146+
}
147+
148+
@Override
149+
public boolean equals(Object o) {
150+
if (this == o) return true;
151+
if (o == null || getClass() != o.getClass()) return false;
152+
Response response = (Response) o;
153+
return Objects.equals(inferenceFieldsMap, response.inferenceFieldsMap)
154+
&& Objects.equals(inferenceResultsMap, response.inferenceResultsMap);
155+
}
156+
157+
@Override
158+
public int hashCode() {
159+
return Objects.hash(inferenceFieldsMap, inferenceResultsMap);
160+
}
161+
}
162+
}

x-pack/plugin/esql/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ dependencies {
7171
testImplementation('org.webjars.npm:fontsource__roboto-mono:4.5.7')
7272

7373
internalClusterTestImplementation project(":modules:mapper-extras")
74+
internalClusterTestImplementation project(xpackModule('inference'))
75+
internalClusterTestImplementation testArtifact(project(xpackModule('inference')))
76+
internalClusterTestImplementation testArtifact(project(xpackModule('inference')), 'internalClusterTest')
7477
}
7578

7679
tasks.named("dependencyLicenses").configure {

0 commit comments

Comments
 (0)