Skip to content

Commit 616ba6d

Browse files
authored
Merge branch 'main' into random-sampling-adding-test-feature-flag
2 parents f09dfcf + 79e53bf commit 616ba6d

File tree

12 files changed

+306
-342
lines changed

12 files changed

+306
-342
lines changed

server/src/internalClusterTest/java/org/elasticsearch/readiness/ReadinessClusterIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
106106
return Collections.unmodifiableList(plugins);
107107
}
108108

109-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108613")
110109
public void testReadinessDuringRestarts() throws Exception {
111110
internalCluster().setBootstrapMasterNodeIndex(0);
112111
writeFileSettings(testJSON);
@@ -258,7 +257,7 @@ public void testNotReadyOnBadFileSettings() throws Exception {
258257

259258
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
260259

261-
assertTrue(masterFileSettingsService.watching());
260+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
262261
assertFalse(dataFileSettingsService.watching());
263262

264263
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
@@ -315,7 +314,7 @@ public void testReadyWhenMissingFileSettings() throws Exception {
315314

316315
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
317316

318-
assertTrue(masterFileSettingsService.watching());
317+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
319318

320319
boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);
321320
assertTrue(awaitSuccessful);
@@ -369,7 +368,7 @@ public void testReadyAfterCorrectFileSettings() throws Exception {
369368

370369
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
371370

372-
assertTrue(masterFileSettingsService.watching());
371+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
373372
assertFalse(dataFileSettingsService.watching());
374373

375374
boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 13 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.action.support.IndicesOptions;
1919
import org.elasticsearch.action.support.PlainActionFuture;
2020
import org.elasticsearch.action.support.RefCountingListener;
21-
import org.elasticsearch.action.support.RefCountingRunnable;
2221
import org.elasticsearch.client.internal.RemoteClusterClient;
2322
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2423
import org.elasticsearch.cluster.metadata.ProjectId;
@@ -34,7 +33,6 @@
3433
import org.elasticsearch.core.TimeValue;
3534
import org.elasticsearch.indices.IndicesExpressionGrouper;
3635
import org.elasticsearch.node.ReportingService;
37-
import org.elasticsearch.transport.RemoteClusterCredentialsManager.UpdateRemoteClusterCredentialsResult;
3836

3937
import java.io.Closeable;
4038
import java.io.IOException;
@@ -50,7 +48,6 @@
5048
import java.util.concurrent.TimeoutException;
5149
import java.util.function.BiFunction;
5250
import java.util.function.Function;
53-
import java.util.function.Supplier;
5451
import java.util.stream.Collectors;
5552
import java.util.stream.Stream;
5653

@@ -106,6 +103,10 @@ public boolean isRemoteClusterServerEnabled() {
106103
this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false);
107104
}
108105

106+
public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
107+
return remoteClusterCredentialsManager;
108+
}
109+
109110
/**
110111
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
111112
* @param remoteClusterNames Set of configured remote cluster names.
@@ -185,6 +186,13 @@ public Set<String> getRegisteredRemoteClusterNames() {
185186
return getConnectionsMapForCurrentProject().keySet();
186187
}
187188

189+
/**
190+
* Returns the registered linked project aliases for the provided origin Project ID.
191+
*/
192+
public Set<String> getRegisteredRemoteClusterNames(ProjectId originProjectId) {
193+
return getConnectionsMapForProject(originProjectId).keySet();
194+
}
195+
188196
/**
189197
* Returns a connection to the given node on the given remote cluster
190198
*
@@ -298,79 +306,6 @@ public void skipUnavailableChanged(
298306
}
299307
}
300308

301-
@FixForMultiProject(description = "Refactor as needed to support project specific changes to linked remotes.")
302-
public synchronized void updateRemoteClusterCredentials(Supplier<Settings> settingsSupplier, ActionListener<Void> listener) {
303-
final var projectId = projectResolver.getProjectId();
304-
final Settings settings = settingsSupplier.get();
305-
final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings);
306-
// We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential
307-
// value was updated. Therefore, only consider added or removed aliases
308-
final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size();
309-
if (totalConnectionsToRebuild == 0) {
310-
logger.debug("project [{}] no connection rebuilding required after credentials update", projectId);
311-
listener.onResponse(null);
312-
return;
313-
}
314-
logger.info("project [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild);
315-
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
316-
for (var clusterAlias : result.addedClusterAliases()) {
317-
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
318-
}
319-
for (var clusterAlias : result.removedClusterAliases()) {
320-
maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
321-
}
322-
}
323-
}
324-
325-
private void maybeRebuildConnectionOnCredentialsChange(
326-
ProjectId projectId,
327-
String clusterAlias,
328-
Settings newSettings,
329-
RefCountingRunnable connectionRefs
330-
) {
331-
final var connectionsMap = getConnectionsMapForProject(projectId);
332-
if (false == connectionsMap.containsKey(clusterAlias)) {
333-
// A credential was added or removed before a remote connection was configured.
334-
// Without an existing connection, there is nothing to rebuild.
335-
logger.info(
336-
"project [{}] no connection rebuild required for remote cluster [{}] after credentials change",
337-
projectId,
338-
clusterAlias
339-
);
340-
return;
341-
}
342-
343-
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
344-
final var config = RemoteClusterSettings.toConfig(projectId, ProjectId.DEFAULT, clusterAlias, mergedSettings);
345-
updateRemoteCluster(config, true, ActionListener.releaseAfter(new ActionListener<>() {
346-
@Override
347-
public void onResponse(RemoteClusterConnectionStatus status) {
348-
logger.info(
349-
"project [{}] remote cluster connection [{}] updated after credentials change: [{}]",
350-
projectId,
351-
clusterAlias,
352-
status
353-
);
354-
}
355-
356-
@Override
357-
public void onFailure(Exception e) {
358-
// We don't want to return an error to the upstream listener here since a connection rebuild failure
359-
// does *not* imply a failure to reload secure settings; however, that's how it would surface in the reload-settings call.
360-
// Instead, we log a warning which is also consistent with how we handle remote cluster settings updates (logging instead of
361-
// returning an error)
362-
logger.warn(
363-
() -> "project ["
364-
+ projectId
365-
+ "] failed to update remote cluster connection ["
366-
+ clusterAlias
367-
+ "] after credentials change",
368-
e
369-
);
370-
}
371-
}, connectionRefs.acquire()));
372-
}
373-
374309
@Override
375310
public void updateLinkedProject(LinkedProjectConfig config) {
376311
final var projectId = config.originProjectId();
@@ -412,8 +347,7 @@ public void onFailure(Exception e) {
412347
* @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it.
413348
* @param listener The listener invoked once the configured cluster has been connected.
414349
*/
415-
// Package-access for testing.
416-
synchronized void updateRemoteCluster(
350+
public synchronized void updateRemoteCluster(
417351
LinkedProjectConfig config,
418352
boolean forceRebuild,
419353
ActionListener<RemoteClusterConnectionStatus> listener
@@ -455,7 +389,7 @@ synchronized void updateRemoteCluster(
455389
}
456390
}
457391

458-
enum RemoteClusterConnectionStatus {
392+
public enum RemoteClusterConnectionStatus {
459393
CONNECTED,
460394
DISCONNECTED,
461395
RECONNECTED,

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.support.ActionTestUtils;
1818
import org.elasticsearch.action.support.IndicesOptions;
1919
import org.elasticsearch.action.support.PlainActionFuture;
20+
import org.elasticsearch.action.support.RefCountingRunnable;
2021
import org.elasticsearch.cluster.metadata.ProjectId;
2122
import org.elasticsearch.cluster.node.DiscoveryNode;
2223
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -1625,9 +1626,12 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi
16251626
{
16261627
final MockSecureSettings secureSettings = new MockSecureSettings();
16271628
secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10));
1628-
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
1629+
final PlainActionFuture<RemoteClusterService.RemoteClusterConnectionStatus> listener = new PlainActionFuture<>();
16291630
final Settings settings = Settings.builder().put(clusterSettings).setSecureSettings(secureSettings).build();
1630-
service.updateRemoteClusterCredentials(() -> settings, listener);
1631+
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings);
1632+
assertThat(result.addedClusterAliases(), equalTo(Set.of("cluster_1")));
1633+
final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, settings);
1634+
service.updateRemoteCluster(config, true, listener);
16311635
listener.actionGet(10, TimeUnit.SECONDS);
16321636
}
16331637

@@ -1637,12 +1641,13 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi
16371641
);
16381642

16391643
{
1640-
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
1641-
service.updateRemoteClusterCredentials(
1642-
// Settings without credentials constitute credentials removal
1643-
() -> clusterSettings,
1644-
listener
1645-
);
1644+
final PlainActionFuture<RemoteClusterService.RemoteClusterConnectionStatus> listener = new PlainActionFuture<>();
1645+
// Settings without credentials constitute credentials removal
1646+
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(clusterSettings);
1647+
assertThat(result.addedClusterAliases().size(), equalTo(0));
1648+
assertThat(result.removedClusterAliases(), equalTo(Set.of("cluster_1")));
1649+
final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, clusterSettings);
1650+
service.updateRemoteCluster(config, true, listener);
16461651
listener.actionGet(10, TimeUnit.SECONDS);
16471652
}
16481653

@@ -1718,6 +1723,8 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
17181723
assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default");
17191724
assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default");
17201725
expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster));
1726+
final Set<String> aliases = Set.of(badCluster, goodCluster, missingCluster);
1727+
final ActionListener<RemoteClusterService.RemoteClusterConnectionStatus> noop = ActionListener.noop();
17211728

17221729
{
17231730
final MockSecureSettings secureSettings = new MockSecureSettings();
@@ -1730,7 +1737,14 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
17301737
.put(cluster2Settings)
17311738
.setSecureSettings(secureSettings)
17321739
.build();
1733-
service.updateRemoteClusterCredentials(() -> settings, listener);
1740+
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings);
1741+
assertThat(result.addedClusterAliases(), equalTo(aliases));
1742+
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
1743+
for (String alias : aliases) {
1744+
final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings);
1745+
service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire()));
1746+
}
1747+
}
17341748
listener.actionGet(10, TimeUnit.SECONDS);
17351749
}
17361750

@@ -1747,11 +1761,16 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
17471761
{
17481762
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
17491763
final Settings settings = Settings.builder().put(cluster1Settings).put(cluster2Settings).build();
1750-
service.updateRemoteClusterCredentials(
1751-
// Settings without credentials constitute credentials removal
1752-
() -> settings,
1753-
listener
1754-
);
1764+
// Settings without credentials constitute credentials removal
1765+
final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings);
1766+
assertThat(result.addedClusterAliases().size(), equalTo(0));
1767+
assertThat(result.removedClusterAliases(), equalTo(aliases));
1768+
try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
1769+
for (String alias : aliases) {
1770+
final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings);
1771+
service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire()));
1772+
}
1773+
}
17551774
listener.actionGet(10, TimeUnit.SECONDS);
17561775
}
17571776

@@ -1828,6 +1847,12 @@ public void testLogsConnectionResult() throws IOException {
18281847
}
18291848
}
18301849

1850+
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
1851+
private LinkedProjectConfig buildLinkedProjectConfig(String alias, Settings staticSettings, Settings newSettings) {
1852+
final var mergedSettings = Settings.builder().put(staticSettings, false).put(newSettings, false).build();
1853+
return RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, mergedSettings);
1854+
}
1855+
18311856
private void updateRemoteCluster(
18321857
RemoteClusterService service,
18331858
String alias,
@@ -1846,10 +1871,7 @@ private void updateRemoteCluster(
18461871
Settings newSettings,
18471872
ActionListener<RemoteClusterService.RemoteClusterConnectionStatus> listener
18481873
) {
1849-
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
1850-
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
1851-
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, mergedSettings);
1852-
service.updateRemoteCluster(config, false, listener);
1874+
service.updateRemoteCluster(buildLinkedProjectConfig(alias, settings, newSettings), false, listener);
18531875
}
18541876

18551877
private void initializeRemoteClusters(RemoteClusterService remoteClusterService) {

test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ public enum FeatureFlag {
2626
Version.fromString("9.2.0"),
2727
null
2828
),
29-
RANDOM_SAMPLING("es.random_sampling_feature_flag_enabled=true", Version.fromString("9.2.0"), null),
30-
ELASTIC_RERANKER_CHUNKING("es.elastic_reranker_chunking_long_documents=true", Version.fromString("9.2.0"), null);
29+
RANDOM_SAMPLING("es.random_sampling_feature_flag_enabled=true", Version.fromString("9.2.0"), null);
3130

3231
public final String systemProperty;
3332
public final Version from;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/chunking/RerankRequestChunker.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,13 @@ public List<String> getChunkedInputs() {
5353
return chunkedInputs;
5454
}
5555

56-
public ActionListener<InferenceServiceResults> parseChunkedRerankResultsListener(ActionListener<InferenceServiceResults> listener) {
56+
public ActionListener<InferenceServiceResults> parseChunkedRerankResultsListener(
57+
ActionListener<InferenceServiceResults> listener,
58+
boolean returnDocuments
59+
) {
5760
return ActionListener.wrap(results -> {
5861
if (results instanceof RankedDocsResults rankedDocsResults) {
59-
listener.onResponse(parseRankedDocResultsForChunks(rankedDocsResults));
62+
listener.onResponse(parseRankedDocResultsForChunks(rankedDocsResults, returnDocuments));
6063

6164
} else {
6265
listener.onFailure(new IllegalArgumentException("Expected RankedDocsResults but got: " + results.getClass()));
@@ -65,7 +68,7 @@ public ActionListener<InferenceServiceResults> parseChunkedRerankResultsListener
6568
}, listener::onFailure);
6669
}
6770

68-
private RankedDocsResults parseRankedDocResultsForChunks(RankedDocsResults rankedDocsResults) {
71+
private RankedDocsResults parseRankedDocResultsForChunks(RankedDocsResults rankedDocsResults, boolean returnDocuments) {
6972
List<RankedDocsResults.RankedDoc> topRankedDocs = new ArrayList<>();
7073
Set<Integer> docIndicesSeen = new HashSet<>();
7174

@@ -80,7 +83,7 @@ private RankedDocsResults parseRankedDocResultsForChunks(RankedDocsResults ranke
8083
RankedDocsResults.RankedDoc updatedRankedDoc = new RankedDocsResults.RankedDoc(
8184
docIndex,
8285
rankedDoc.relevanceScore(),
83-
inputs.get(docIndex)
86+
returnDocuments ? inputs.get(docIndex) : null
8487
);
8588
topRankedDocs.add(updatedRankedDoc);
8689
docIndicesSeen.add(docIndex);

0 commit comments

Comments
 (0)