Skip to content

Commit 3ceeffa

Browse files
Fix node bootstrap error when enable stream transport and remote cluster state (#19948)
Signed-off-by: bowenlan-amzn <[email protected]>
1 parent 3720d9c commit 3ceeffa

File tree

3 files changed

+40
-11
lines changed

3 files changed

+40
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8585
- Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650))
8686
- Fix ClassCastException in FlightClientChannel for requests larger than 16KB ([#20010](https://github.com/opensearch-project/OpenSearch/pull/20010))
8787
- Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937))
88+
- Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948))
8889

8990
### Dependencies
9091
- Bump Apache Lucene from 10.3.1 to 10.3.2 ([#20026](https://github.com/opensearch-project/OpenSearch/pull/20026))

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1818
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1919
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
20+
import org.opensearch.action.search.StreamSearchIntegrationTests;
2021
import org.opensearch.cluster.ClusterState;
2122
import org.opensearch.cluster.coordination.CoordinationState;
2223
import org.opensearch.cluster.coordination.PersistedStateRegistry;
@@ -74,6 +75,7 @@
7475
import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_REMOTE_STATE_ACTION_NAME;
7576
import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME;
7677
import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals;
78+
import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
7779
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
7880
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
7981
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
@@ -163,9 +165,11 @@ protected Settings nodeSettings(int nodeOrdinal) {
163165
protected Collection<Class<? extends Plugin>> nodePlugins() {
164166
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
165167
plugins.add(InterceptingTransportService.TestPlugin.class);
168+
plugins.add(StreamSearchIntegrationTests.MockStreamTransportPlugin.class);
166169
return plugins;
167170
}
168171

172+
@LockFeatureFlag(STREAM_TRANSPORT)
169173
public void testPublication() throws Exception {
170174
// create cluster with multi node (3 master + 2 data)
171175
prepareCluster(3, 2, INDEX_NAME, 1, 2);

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
630630

631631
final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
632632
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(repositoriesServiceReference::get, threadPool);
633-
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreNodeService);
633+
localNodeFactory = new RemoteStoreVerifyingLocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreNodeService);
634634
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
635635
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
636636
resourcesToClose.add(resourceWatcherService);
@@ -1292,7 +1292,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
12921292
streamTransport,
12931293
threadPool,
12941294
networkModule.getTransportInterceptor(),
1295-
new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreNodeService),
1295+
new LocalNodeFactory(settings, nodeEnvironment.nodeId()),
12961296
settingsModule.getClusterSettings(),
12971297
transportService.getTaskManager(),
12981298
transportService.getRemoteClusterService(),
@@ -2320,16 +2320,17 @@ protected List<AuxTransport> newAuxTransports(NetworkModule networkModule) {
23202320
return networkModule.getAuxServerTransportList();
23212321
}
23222322

2323+
/**
2324+
* Base factory for creating DiscoveryNode instances during node initialization.
2325+
*/
23232326
private static class LocalNodeFactory implements Function<BoundTransportAddress, DiscoveryNode> {
23242327
private final SetOnce<DiscoveryNode> localNode = new SetOnce<>();
23252328
private final String persistentNodeId;
2326-
private final Settings settings;
2327-
private final RemoteStoreNodeService remoteStoreNodeService;
2329+
protected final Settings settings;
23282330

2329-
private LocalNodeFactory(Settings settings, String persistentNodeId, RemoteStoreNodeService remoteStoreNodeService) {
2331+
private LocalNodeFactory(Settings settings, String persistentNodeId) {
23302332
this.persistentNodeId = persistentNodeId;
23312333
this.settings = settings;
2332-
this.remoteStoreNodeService = remoteStoreNodeService;
23332334
}
23342335

23352336
@Override
@@ -2339,11 +2340,6 @@ public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
23392340
boundTransportAddress.publishAddress(),
23402341
persistentNodeId
23412342
);
2342-
2343-
if (isRemoteStoreAttributePresent(settings)) {
2344-
remoteStoreNodeService.createAndVerifyRepositories(discoveryNode);
2345-
}
2346-
23472343
localNode.set(discoveryNode);
23482344
return localNode.get();
23492345
}
@@ -2354,6 +2350,34 @@ DiscoveryNode getNode() {
23542350
}
23552351
}
23562352

2353+
/**
2354+
* Extended factory that verifies remote store repositories during node creation.
2355+
*/
2356+
private static class RemoteStoreVerifyingLocalNodeFactory extends LocalNodeFactory {
2357+
2358+
private final RemoteStoreNodeService remoteStoreNodeService;
2359+
2360+
private RemoteStoreVerifyingLocalNodeFactory(
2361+
Settings settings,
2362+
String persistentNodeId,
2363+
RemoteStoreNodeService remoteStoreNodeService
2364+
) {
2365+
super(settings, persistentNodeId);
2366+
this.remoteStoreNodeService = remoteStoreNodeService;
2367+
}
2368+
2369+
@Override
2370+
public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
2371+
final DiscoveryNode discoveryNode = super.apply(boundTransportAddress);
2372+
2373+
if (isRemoteStoreAttributePresent(settings)) {
2374+
remoteStoreNodeService.createAndVerifyRepositories(discoveryNode);
2375+
}
2376+
2377+
return discoveryNode;
2378+
}
2379+
}
2380+
23572381
/**
23582382
* Initializes the warm cache with a defined capacity.
23592383
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.

0 commit comments

Comments
 (0)