Skip to content

Commit 91d75a0

Browse files
authored
Merge branch 'main' into feature/esql-tbucket-function
2 parents 8bf1c07 + 8bcecb9 commit 91d75a0

File tree

10 files changed

+255
-77
lines changed

10 files changed

+255
-77
lines changed

build-tools/src/main/java/org/elasticsearch/gradle/LazyFileOutputStream.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
* or more contributor license agreements. Licensed under the "Elastic License
44
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
55
* Public License v 1"; you may not use this file except in compliance with, at
6-
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7-
* License v3.0 only", or the "Server Side Public License, v 1".
6+
* your election, the "Server Side Public License v3.0 only", or the "Server Side Public License, v 1".
87
*/
98

109
package org.elasticsearch.gradle;
@@ -18,48 +17,58 @@
1817
* An outputstream to a File that is lazily opened on the first write.
1918
*/
2019
class LazyFileOutputStream extends OutputStream {
21-
private OutputStream delegate;
20+
private final File file;
21+
private volatile OutputStream delegate;
22+
private volatile boolean initialized = false;
23+
private final Object lock = new Object();
2224

2325
LazyFileOutputStream(File file) {
24-
// use an initial dummy delegate to avoid doing a conditional on every write
25-
this.delegate = new OutputStream() {
26-
private void bootstrap() throws IOException {
27-
file.getParentFile().mkdirs();
28-
delegate = new FileOutputStream(file);
29-
}
30-
31-
@Override
32-
public void write(int b) throws IOException {
33-
bootstrap();
34-
delegate.write(b);
35-
}
36-
37-
@Override
38-
public void write(byte b[], int off, int len) throws IOException {
39-
bootstrap();
40-
delegate.write(b, off, len);
41-
}
26+
this.file = file;
27+
}
4228

43-
@Override
44-
public void write(byte b[]) throws IOException {
45-
bootstrap();
46-
delegate.write(b);
29+
private void ensureInitialized() throws IOException {
30+
if (initialized == false) {
31+
synchronized (lock) {
32+
if (initialized == false) {
33+
file.getParentFile().mkdirs();
34+
delegate = new FileOutputStream(file);
35+
initialized = true;
36+
}
4737
}
48-
};
38+
}
4939
}
5040

5141
@Override
5242
public void write(int b) throws IOException {
43+
ensureInitialized();
5344
delegate.write(b);
5445
}
5546

5647
@Override
5748
public void write(byte b[], int off, int len) throws IOException {
49+
ensureInitialized();
5850
delegate.write(b, off, len);
5951
}
6052

53+
@Override
54+
public void write(byte b[]) throws IOException {
55+
ensureInitialized();
56+
delegate.write(b);
57+
}
58+
6159
@Override
6260
public void close() throws IOException {
63-
delegate.close();
61+
synchronized (lock) {
62+
if (initialized && delegate != null) {
63+
delegate.close();
64+
}
65+
}
66+
}
67+
68+
@Override
69+
public void flush() throws IOException {
70+
if (initialized && delegate != null) {
71+
delegate.flush();
72+
}
6473
}
6574
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.ccs;
11+
12+
import org.elasticsearch.action.search.SearchRequest;
13+
import org.elasticsearch.action.search.SearchResponse;
14+
import org.elasticsearch.action.search.TransportSearchAction;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.CollectionUtils;
18+
import org.elasticsearch.plugins.ClusterPlugin;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
21+
import org.elasticsearch.transport.ConnectTransportException;
22+
import org.elasticsearch.transport.RemoteTransportException;
23+
import org.hamcrest.Matchers;
24+
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.ExecutionException;
29+
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
31+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
32+
33+
// TODO: Move this test to the Serverless repo once the IT framework is ready there.
34+
public class CpsDoesNotUseSkipUnavailableIT extends AbstractMultiClustersTestCase {
35+
private static final String LINKED_CLUSTER_1 = "cluster-a";
36+
37+
public static class CpsPlugin extends Plugin implements ClusterPlugin {
38+
@Override
39+
public List<Setting<?>> getSettings() {
40+
return List.of(CpsEnableSetting);
41+
}
42+
}
43+
44+
private static final Setting<String> CpsEnableSetting = Setting.simpleString(
45+
"serverless.cross_project.enabled",
46+
Setting.Property.NodeScope
47+
);
48+
49+
@Override
50+
protected List<String> remoteClusterAlias() {
51+
return List.of(LINKED_CLUSTER_1);
52+
}
53+
54+
@Override
55+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
56+
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class);
57+
}
58+
59+
@Override
60+
protected Settings nodeSettings() {
61+
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
62+
}
63+
64+
@Override
65+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
66+
// Setting skip_unavailable=false results in a fatal error when the linked cluster is not available.
67+
return Map.of(LINKED_CLUSTER_1, false);
68+
}
69+
70+
public void testCpsShouldNotUseSkipUnavailable() throws Exception {
71+
// Add some dummy data to prove we are communicating fine with the remote.
72+
assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index"));
73+
client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
74+
client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();
75+
76+
// Shut down the linked cluster we'd be targeting in the search.
77+
try {
78+
cluster(LINKED_CLUSTER_1).close();
79+
} catch (Exception e) {
80+
throw new AssertionError(e);
81+
}
82+
83+
/*
84+
* Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster
85+
* and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true
86+
* should not result in a fatal error.
87+
*/
88+
{
89+
var searchRequest = getSearchRequest(true);
90+
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
91+
assertResponse(client().execute(TransportSearchAction.TYPE, searchRequest), result -> {
92+
var originCluster = result.getClusters().getCluster(LOCAL_CLUSTER);
93+
assertThat(originCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SUCCESSFUL));
94+
95+
var linkedCluster = result.getClusters().getCluster(LINKED_CLUSTER_1);
96+
assertThat(linkedCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SKIPPED));
97+
98+
var linkedClusterFailures = result.getClusters().getCluster(LINKED_CLUSTER_1).getFailures();
99+
assertThat(linkedClusterFailures.size(), Matchers.is(1));
100+
// Failure is something along the lines of shard failure and is caused by a connection error.
101+
assertThat(
102+
linkedClusterFailures.getFirst().getCause(),
103+
Matchers.anyOf(
104+
Matchers.instanceOf(RemoteTransportException.class),
105+
Matchers.instanceOf(ConnectTransportException.class)
106+
)
107+
);
108+
});
109+
}
110+
111+
/*
112+
* Previously, we did not get a fatal error even when skip_unavailable=false for the linked cluster.
113+
* Now, we disable partial results and expect a fatal error. This proves that in CPS environment,
114+
* search uses allow_partial_search_results and not skip_unavailable.
115+
*/
116+
{
117+
var searchRequest = getSearchRequest(false);
118+
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
119+
var ae = expectThrows(AssertionError.class, () -> safeGet(client().execute(TransportSearchAction.TYPE, searchRequest)));
120+
assertThat(ae.getCause(), Matchers.instanceOf(ExecutionException.class));
121+
assertThat(
122+
ae.getCause().getCause(),
123+
Matchers.anyOf(Matchers.instanceOf(RemoteTransportException.class), Matchers.instanceOf(ConnectTransportException.class))
124+
);
125+
}
126+
}
127+
128+
private SearchRequest getSearchRequest(boolean allowPartialResults) {
129+
// Include both origin and linked cluster in the search op.
130+
var searchRequest = new SearchRequest("*", "*:*");
131+
searchRequest.allowPartialSearchResults(allowPartialResults);
132+
return searchRequest;
133+
}
134+
}

server/src/main/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A
174174
resolveClusterTask.ensureNotCancelled();
175175
String clusterAlias = remoteIndices.getKey();
176176
OriginalIndices originalIndices = remoteIndices.getValue();
177-
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
177+
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true);
178178
RemoteClusterClient remoteClusterClient = remoteClusterService.getRemoteClusterClient(
179179
clusterAlias,
180180
searchCoordinationExecutor,

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ private void doExecuteForked(
332332
);
333333

334334
boolean ensureConnected = forceConnectTimeoutSecs != null
335-
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
335+
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias).orElse(true) == false;
336336
transportService.getRemoteClusterService()
337337
.maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
338338
}

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,14 +508,16 @@ public static final class Clusters implements ToXContentFragment, Writeable {
508508
* @param localIndices The localIndices to be searched - null if no local indices are to be searched
509509
* @param remoteClusterIndices mapping of clusterAlias -> OriginalIndices for each remote cluster
510510
* @param ccsMinimizeRoundtrips whether minimizing roundtrips for the CCS
511-
* @param skipUnavailablePredicate given a cluster alias, returns true if that cluster is skip_unavailable=true
512-
* and false otherwise
511+
* @param skipOnFailurePredicate given a cluster alias, returns true if that cluster is marked as skippable
512+
* and false otherwise. For a cluster to be considered as skippable, either
513+
* we should be in CPS environment and allow_partial_results=true, or,
514+
* skip_unavailable=true.
513515
*/
514516
public Clusters(
515517
@Nullable OriginalIndices localIndices,
516518
Map<String, OriginalIndices> remoteClusterIndices,
517519
boolean ccsMinimizeRoundtrips,
518-
Predicate<String> skipUnavailablePredicate
520+
Predicate<String> skipOnFailurePredicate
519521
) {
520522
assert remoteClusterIndices.size() > 0 : "At least one remote cluster must be passed into this Cluster constructor";
521523
this.total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1);
@@ -531,8 +533,8 @@ public Clusters(
531533
}
532534
for (Map.Entry<String, OriginalIndices> remote : remoteClusterIndices.entrySet()) {
533535
String clusterAlias = remote.getKey();
534-
boolean skipUnavailable = skipUnavailablePredicate.test(clusterAlias);
535-
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipUnavailable);
536+
boolean skipOnFailure = skipOnFailurePredicate.test(clusterAlias);
537+
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipOnFailure);
536538
m.put(clusterAlias, c);
537539
}
538540
this.clusterInfo = m;

0 commit comments

Comments
 (0)