Skip to content

Commit acc8f5e

Browse files
authored
[8.x] Finalized refactorings from closed PR elastic#115976 (elastic#116121) (elastic#116129)
* Finalized refactorings from closed PR elastic#115976 (elastic#116121) Pure refactoring PR * Muting tests that are muted on main but not 8.x so I can get the backport to succeed * Muting tests that are muted on main but not 8.x so I can get the backport to succeed, part 2
1 parent 9ad8337 commit acc8f5e

File tree

8 files changed

+295
-203
lines changed

8 files changed

+295
-203
lines changed

muted-tests.yml

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,17 +298,22 @@ tests:
298298
- class: org.elasticsearch.upgrades.FullClusterRestartIT
299299
method: testSnapshotRestore {cluster=OLD}
300300
issue: https://github.com/elastic/elasticsearch/issues/111777
301-
- class: org.elasticsearch.xpack.restart.CoreFullClusterRestartIT
302-
method: testSnapshotRestore {cluster=OLD}
303-
issue: https://github.com/elastic/elasticsearch/issues/111775
304-
- class: org.elasticsearch.xpack.restart.CoreFullClusterRestartIT
305-
method: testSnapshotRestore {cluster=UPGRADED}
306-
issue: https://github.com/elastic/elasticsearch/issues/111799
307301
- class: org.elasticsearch.search.DefaultSearchContextTests
308302
method: testSearchConcurrencyDoesNotCreateMoreTasksThanThreads
309303
issue: https://github.com/elastic/elasticsearch/issues/116048
310304
- class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT
311305
issue: https://github.com/elastic/elasticsearch/issues/116134
306+
method: testLookbackWithIndicesOptions
307+
issue: https://github.com/elastic/elasticsearch/issues/116127
308+
- class: org.elasticsearch.xpack.restart.CoreFullClusterRestartIT
309+
method: testSnapshotRestore {cluster=UPGRADED}
310+
issue: https://github.com/elastic/elasticsearch/issues/111799
311+
- class: org.elasticsearch.xpack.restart.CoreFullClusterRestartIT
312+
method: testSnapshotRestore {cluster=OLD}
313+
issue: https://github.com/elastic/elasticsearch/issues/111774
314+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
315+
method: testSnapshotRestore {cluster=OLD}
316+
issue: https://github.com/elastic/elasticsearch/issues/111777
312317

313318
# Examples:
314319
#

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
229229
b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED));
230230
b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL));
231231
b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED));
232-
// each clusterinfo defines its own field object name
232+
// each Cluster object defines its own field object name
233233
b.xContentObject("details", clusterInfo.values().iterator());
234234
});
235235
}
@@ -352,11 +352,7 @@ public Cluster(
352352
this.successfulShards = successfulShards;
353353
this.skippedShards = skippedShards;
354354
this.failedShards = failedShards;
355-
if (failures == null) {
356-
this.failures = List.of();
357-
} else {
358-
this.failures = failures;
359-
}
355+
this.failures = failures == null ? Collections.emptyList() : failures;
360356
this.took = took;
361357
}
362358

@@ -373,7 +369,7 @@ public Cluster(StreamInput in) throws IOException {
373369
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
374370
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure));
375371
} else {
376-
this.failures = List.of();
372+
this.failures = Collections.emptyList();
377373
}
378374
}
379375

@@ -475,7 +471,7 @@ public Cluster.Builder setTook(TimeValue took) {
475471
@Override
476472
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
477473
String name = clusterAlias;
478-
if (clusterAlias.equals("")) {
474+
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
479475
name = LOCAL_CLUSTER_NAME_REPRESENTATION;
480476
}
481477
builder.startObject(name);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -289,24 +289,17 @@ public void onResponse(Transport.Connection connection) {
289289
RESOLVE_ACTION_NAME,
290290
new LookupRequest(cluster, remotePolicies),
291291
TransportRequestOptions.EMPTY,
292-
new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> {
293-
if (ExceptionsHelper.isRemoteUnavailableException(e)
294-
&& remoteClusterService.isSkipUnavailable(cluster)) {
295-
l.onResponse(new LookupResponse(e));
296-
} else {
297-
l.onFailure(e);
298-
}
299-
}), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH))
292+
new ActionListenerResponseHandler<>(
293+
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)),
294+
LookupResponse::new,
295+
threadPool.executor(ThreadPool.Names.SEARCH)
296+
)
300297
);
301298
}
302299

303300
@Override
304301
public void onFailure(Exception e) {
305-
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
306-
lookupListener.onResponse(new LookupResponse(e));
307-
} else {
308-
lookupListener.onFailure(e);
309-
}
302+
failIfSkipUnavailableFalse(e, cluster, lookupListener);
310303
}
311304
});
312305
}
@@ -331,6 +324,14 @@ public void onFailure(Exception e) {
331324
}
332325
}
333326

327+
private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener<LookupResponse> lookupListener) {
328+
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
329+
lookupListener.onResponse(new LookupResponse(e));
330+
} else {
331+
lookupListener.onFailure(e);
332+
}
333+
}
334+
334335
private static class LookupRequest extends TransportRequest {
335336
private final String clusterAlias;
336337
private final Collection<String> policyNames;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
149149
analyzedPlan(
150150
parse(request.query(), request.params()),
151151
executionInfo,
152-
new CcsUtils.CssPartialErrorsActionListener(executionInfo, listener) {
152+
new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
153153
@Override
154154
public void onResponse(LogicalPlan analyzedPlan) {
155155
executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener);
@@ -171,7 +171,7 @@ public void executeOptimizedPlan(
171171
) {
172172
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
173173
// TODO: this could be snuck into the underlying listener
174-
CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
174+
EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
175175
// execute any potential subplans
176176
executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
177177
}
@@ -308,8 +308,8 @@ private <T> void preAnalyze(
308308
// TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index
309309
// resolution to updateExecutionInfo
310310
if (indexResolution.isValid()) {
311-
CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
312-
CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
311+
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
312+
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
313313
if (executionInfo.isCrossClusterSearch()
314314
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
315315
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
@@ -383,7 +383,7 @@ private void preAnalyzeIndices(
383383
}
384384
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
385385
// based only on available clusters (which could now be an empty list)
386-
String indexExpressionToResolve = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo);
386+
String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
387387
if (indexExpressionToResolve.isEmpty()) {
388388
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
389389
listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/CcsUtils.java renamed to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java

Lines changed: 77 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,30 @@
2323
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2424

2525
import java.util.Collections;
26+
import java.util.HashMap;
2627
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Set;
3031

31-
class CcsUtils {
32+
class EsqlSessionCCSUtils {
3233

33-
private CcsUtils() {}
34+
private EsqlSessionCCSUtils() {}
35+
36+
// visible for testing
37+
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
38+
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
39+
for (FieldCapabilitiesFailure failure : failures) {
40+
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
41+
for (String indexExpression : failure.getIndices()) {
42+
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
43+
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
44+
}
45+
}
46+
}
47+
}
48+
return unavailableRemotes;
49+
}
3450

3551
/**
3652
* ActionListener that receives LogicalPlan or error from logical planning.
@@ -46,70 +62,73 @@ abstract static class CssPartialErrorsActionListener implements ActionListener<L
4662
this.listener = listener;
4763
}
4864

49-
/**
50-
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
51-
*
52-
* For cases where field-caps had no indices to search and the remotes were unavailable, we
53-
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
54-
*
55-
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
56-
* on any of the requested clusters.
57-
*/
58-
private boolean returnSuccessWithEmptyResult(Exception e) {
59-
if (executionInfo.isCrossClusterSearch() == false) {
60-
return false;
65+
@Override
66+
public void onFailure(Exception e) {
67+
if (returnSuccessWithEmptyResult(executionInfo, e)) {
68+
updateExecutionInfoToReturnEmptyResult(executionInfo, e);
69+
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
70+
} else {
71+
listener.onFailure(e);
6172
}
73+
}
74+
}
6275

63-
if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
64-
for (String clusterAlias : executionInfo.clusterAliases()) {
65-
if (executionInfo.isSkipUnavailable(clusterAlias) == false
66-
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
67-
return false;
68-
}
76+
/**
77+
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
78+
*
79+
* For cases where field-caps had no indices to search and the remotes were unavailable, we
80+
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
81+
*
82+
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
83+
* on any of the requested clusters.
84+
*/
85+
static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
86+
if (executionInfo.isCrossClusterSearch() == false) {
87+
return false;
88+
}
89+
90+
if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
91+
for (String clusterAlias : executionInfo.clusterAliases()) {
92+
if (executionInfo.isSkipUnavailable(clusterAlias) == false
93+
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
94+
return false;
6995
}
70-
return true;
7196
}
72-
return false;
97+
return true;
7398
}
99+
return false;
100+
}
74101

75-
@Override
76-
public void onFailure(Exception e) {
77-
if (returnSuccessWithEmptyResult(e)) {
78-
executionInfo.markEndQuery();
79-
Exception exceptionForResponse;
80-
if (e instanceof ConnectTransportException) {
81-
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
82-
// it just throws the first exception in its list, so this odd special handling is here is to avoid
83-
// having one specific remote alias name in all failure lists in the metadata response
84-
exceptionForResponse = new RemoteTransportException(
85-
"connect_transport_exception - unable to connect to remote cluster",
86-
null
87-
);
102+
static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
103+
executionInfo.markEndQuery();
104+
Exception exceptionForResponse;
105+
if (e instanceof ConnectTransportException) {
106+
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
107+
// it just throws the first exception in its list, so this odd special handling is here is to avoid
108+
// having one specific remote alias name in all failure lists in the metadata response
109+
exceptionForResponse = new RemoteTransportException("connect_transport_exception - unable to connect to remote cluster", null);
110+
} else {
111+
exceptionForResponse = e;
112+
}
113+
for (String clusterAlias : executionInfo.clusterAliases()) {
114+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
115+
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook())
116+
.setTotalShards(0)
117+
.setSuccessfulShards(0)
118+
.setSkippedShards(0)
119+
.setFailedShards(0);
120+
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
121+
// never mark local cluster as skipped
122+
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
88123
} else {
89-
exceptionForResponse = e;
90-
}
91-
for (String clusterAlias : executionInfo.clusterAliases()) {
92-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
93-
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(
94-
executionInfo.overallTook()
95-
).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0);
96-
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
97-
// never mark local cluster as skipped
98-
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
99-
} else {
100-
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
101-
// add this exception to the failures list only if there is no failure already recorded there
102-
if (v.getFailures() == null || v.getFailures().size() == 0) {
103-
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
104-
}
105-
}
106-
return builder.build();
107-
});
124+
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
125+
// add this exception to the failures list only if there is no failure already recorded there
126+
if (v.getFailures() == null || v.getFailures().size() == 0) {
127+
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
128+
}
108129
}
109-
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
110-
} else {
111-
listener.onFailure(e);
112-
}
130+
return builder.build();
131+
});
113132
}
114133
}
115134

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.esql.session;
88

9-
import org.elasticsearch.ExceptionsHelper;
109
import org.elasticsearch.action.ActionListener;
11-
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1210
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
1311
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
1412
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
@@ -20,7 +18,6 @@
2018
import org.elasticsearch.index.IndexMode;
2119
import org.elasticsearch.index.mapper.TimeSeriesParams;
2220
import org.elasticsearch.threadpool.ThreadPool;
23-
import org.elasticsearch.transport.RemoteClusterAware;
2421
import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
2522
import org.elasticsearch.xpack.esql.core.type.DataType;
2623
import org.elasticsearch.xpack.esql.core.type.DateEsField;
@@ -159,23 +156,8 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
159156
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
160157
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
161158
}
162-
Map<String, FieldCapabilitiesFailure> unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures());
163-
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes);
164-
}
165-
166-
// visible for testing
167-
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
168-
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
169-
for (FieldCapabilitiesFailure failure : failures) {
170-
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
171-
for (String indexExpression : failure.getIndices()) {
172-
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
173-
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
174-
}
175-
}
176-
}
177-
}
178-
return unavailableRemotes;
159+
EsIndex esIndex = new EsIndex(indexPattern, rootFields, concreteIndices);
160+
return IndexResolution.valid(esIndex, EsqlSessionCCSUtils.determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()));
179161
}
180162

181163
private boolean allNested(List<IndexFieldCapabilities> caps) {

0 commit comments

Comments
 (0)