Skip to content

Commit 38b7bfc

Browse files
authored
Mark watcher NotMultiProjectCapable and replace deprecated multi-project methods (elastic#131313)
1 parent 9c6cf90 commit 38b7bfc

File tree

14 files changed

+206
-128
lines changed

14 files changed

+206
-128
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.metadata.IndexMetadata;
2222
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2323
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
24+
import org.elasticsearch.cluster.metadata.ProjectId;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.cluster.node.DiscoveryNodes;
2627
import org.elasticsearch.cluster.service.ClusterService;
@@ -35,6 +36,7 @@
3536
import org.elasticsearch.common.unit.ByteSizeValue;
3637
import org.elasticsearch.common.util.concurrent.EsExecutors;
3738
import org.elasticsearch.core.IOUtils;
39+
import org.elasticsearch.core.NotMultiProjectCapable;
3840
import org.elasticsearch.core.TimeValue;
3941
import org.elasticsearch.core.UpdateForV10;
4042
import org.elasticsearch.env.Environment;
@@ -823,10 +825,11 @@ public String getFeatureName() {
823825
}
824826

825827
@Override
828+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
826829
public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener<Map<String, Object>> listener) {
827830
Client originClient = new OriginSettingClient(client, WATCHER_ORIGIN);
828831
boolean manuallyStopped = Optional.ofNullable(
829-
clusterService.state().metadata().getProject().<WatcherMetadata>custom(WatcherMetadata.TYPE)
832+
clusterService.state().metadata().getProject(ProjectId.DEFAULT).<WatcherMetadata>custom(WatcherMetadata.TYPE)
830833
).map(WatcherMetadata::manuallyStopped).orElse(false);
831834

832835
if (manuallyStopped == false) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.cluster.ClusterStateListener;
1515
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
1718
import org.elasticsearch.cluster.routing.AllocationId;
1819
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1920
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
@@ -22,6 +23,7 @@
2223
import org.elasticsearch.common.Strings;
2324
import org.elasticsearch.common.util.Maps;
2425
import org.elasticsearch.common.util.set.Sets;
26+
import org.elasticsearch.core.NotMultiProjectCapable;
2527
import org.elasticsearch.index.engine.Engine;
2628
import org.elasticsearch.index.shard.IndexingOperationListener;
2729
import org.elasticsearch.index.shard.ShardId;
@@ -251,10 +253,12 @@ private void checkWatchIndexHasChanged(IndexMetadata metadata, ClusterChangedEve
251253
* @param event The cluster changed event containing the new cluster state
252254
*/
253255
private void reloadConfiguration(String watchIndex, List<ShardRouting> localShardRouting, ClusterChangedEvent event) {
256+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
257+
ProjectId projectId = ProjectId.DEFAULT;
254258
// changed alias means to always read a new configuration
255259
boolean isAliasChanged = watchIndex.equals(configuration.index) == false;
256-
if (isAliasChanged || hasShardAllocationIdChanged(watchIndex, event.state())) {
257-
IndexRoutingTable watchIndexRoutingTable = event.state().routingTable().index(watchIndex);
260+
if (isAliasChanged || hasShardAllocationIdChanged(projectId, watchIndex, event.state())) {
261+
IndexRoutingTable watchIndexRoutingTable = event.state().routingTable(projectId).index(watchIndex);
258262
Map<ShardId, ShardAllocationConfiguration> ids = getLocalShardAllocationIds(localShardRouting, watchIndexRoutingTable);
259263
configuration = new Configuration(watchIndex, ids);
260264
}
@@ -267,9 +271,9 @@ private void reloadConfiguration(String watchIndex, List<ShardRouting> localShar
267271
* @param state The new cluster state
268272
* @return true if the routing tables has changed and local shards are affected
269273
*/
270-
private boolean hasShardAllocationIdChanged(String watchIndex, ClusterState state) {
271-
List<ShardRouting> allStartedRelocatedShards = state.getRoutingTable().index(watchIndex).shardsWithState(STARTED);
272-
allStartedRelocatedShards.addAll(state.getRoutingTable().index(watchIndex).shardsWithState(RELOCATING));
274+
private boolean hasShardAllocationIdChanged(ProjectId projectId, String watchIndex, ClusterState state) {
275+
List<ShardRouting> allStartedRelocatedShards = state.routingTable(projectId).index(watchIndex).shardsWithState(STARTED);
276+
allStartedRelocatedShards.addAll(state.routingTable(projectId).index(watchIndex).shardsWithState(RELOCATING));
273277

274278
// exit early, when there are shards, but the current configuration is inactive
275279
if (allStartedRelocatedShards.isEmpty() == false && configuration == INACTIVE) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
import org.elasticsearch.cluster.ClusterStateListener;
1414
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
18+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1719
import org.elasticsearch.cluster.routing.RoutingNode;
1820
import org.elasticsearch.cluster.routing.ShardRouting;
1921
import org.elasticsearch.cluster.service.ClusterService;
2022
import org.elasticsearch.common.Strings;
2123
import org.elasticsearch.common.component.LifecycleListener;
24+
import org.elasticsearch.core.NotMultiProjectCapable;
2225
import org.elasticsearch.gateway.GatewayService;
2326
import org.elasticsearch.index.shard.ShardId;
2427
import org.elasticsearch.xpack.core.watcher.WatcherMetadata;
@@ -153,8 +156,11 @@ public void clusterChanged(ClusterChangedEvent event) {
153156
// also check if non local shards have changed, as loosing a shard on a
154157
// remote node or adding a replica on a remote node needs to trigger a reload too
155158
Set<ShardId> localShardIds = localShards.stream().map(ShardRouting::shardId).collect(Collectors.toSet());
156-
List<ShardRouting> allShards = event.state().routingTable().index(watchIndex).shardsWithState(STARTED);
157-
allShards.addAll(event.state().routingTable().index(watchIndex).shardsWithState(RELOCATING));
159+
160+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
161+
IndexRoutingTable routingTable = event.state().routingTable(ProjectId.DEFAULT).index(watchIndex);
162+
List<ShardRouting> allShards = routingTable.shardsWithState(STARTED);
163+
allShards.addAll(routingTable.shardsWithState(RELOCATING));
158164
List<ShardRouting> localAffectedShardRoutings = allShards.stream()
159165
.filter(shardRouting -> localShardIds.contains(shardRouting.shardId()))
160166
// shardrouting is not comparable, so we need some order mechanism
@@ -192,8 +198,9 @@ private void pauseExecution(String reason) {
192198
/**
193199
* check if watcher has been stopped manually via the stop API
194200
*/
201+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
195202
private static boolean isWatcherStoppedManually(ClusterState state) {
196-
WatcherMetadata watcherMetadata = state.getMetadata().getProject().custom(WatcherMetadata.TYPE);
203+
WatcherMetadata watcherMetadata = state.getMetadata().getProject(ProjectId.DEFAULT).custom(WatcherMetadata.TYPE);
197204
return watcherMetadata != null && watcherMetadata.manuallyStopped();
198205
}
199206

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.client.internal.Client;
1919
import org.elasticsearch.cluster.ClusterState;
2020
import org.elasticsearch.cluster.metadata.IndexMetadata;
21+
import org.elasticsearch.cluster.metadata.ProjectId;
2122
import org.elasticsearch.cluster.routing.AllocationId;
2223
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
2324
import org.elasticsearch.cluster.routing.Preference;
@@ -27,6 +28,7 @@
2728
import org.elasticsearch.common.util.Maps;
2829
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2930
import org.elasticsearch.common.util.concurrent.EsExecutors;
31+
import org.elasticsearch.core.NotMultiProjectCapable;
3032
import org.elasticsearch.core.TimeValue;
3133
import org.elasticsearch.search.SearchHit;
3234
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -134,6 +136,8 @@ public class WatcherService {
134136
* @return true if everything is good to go, so that the service can be started
135137
*/
136138
public boolean validate(ClusterState state) {
139+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
140+
ProjectId projectId = ProjectId.DEFAULT;
137141
IndexMetadata watcherIndexMetadata = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metadata());
138142
IndexMetadata triggeredWatchesIndexMetadata = WatchStoreUtils.getConcreteIndex(
139143
TriggeredWatchStoreField.INDEX_NAME,
@@ -160,7 +164,7 @@ public boolean validate(ClusterState state) {
160164

161165
return watcherIndexMetadata == null
162166
|| (watcherIndexMetadata.getState() == IndexMetadata.State.OPEN
163-
&& state.routingTable().index(watcherIndexMetadata.getIndex()).allPrimaryShardsActive());
167+
&& state.routingTable(projectId).index(watcherIndexMetadata.getIndex()).allPrimaryShardsActive());
164168
} catch (IllegalStateException e) {
165169
logger.warn("Validation error: cannot start watcher", e);
166170
return false;
@@ -329,7 +333,8 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
329333
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndexName, RELOCATING, STARTED).toList();
330334

331335
// find out all allocation ids
332-
List<ShardRouting> watchIndexShardRoutings = clusterState.getRoutingTable().allShards(watchIndexName);
336+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
337+
List<ShardRouting> watchIndexShardRoutings = clusterState.routingTable(ProjectId.DEFAULT).allShards(watchIndexName);
333338

334339
SearchRequest searchRequest = new SearchRequest(INDEX).scroll(scrollTimeout)
335340
.preference(Preference.ONLY_LOCAL.toString())

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.ClusterState;
2626
import org.elasticsearch.cluster.metadata.IndexMetadata;
27+
import org.elasticsearch.cluster.metadata.ProjectId;
2728
import org.elasticsearch.cluster.routing.Preference;
2829
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.core.NotMultiProjectCapable;
2931
import org.elasticsearch.core.TimeValue;
3032
import org.elasticsearch.index.IndexNotFoundException;
3133
import org.elasticsearch.search.SearchHit;
@@ -183,10 +185,11 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
183185
return triggeredWatches;
184186
}
185187

188+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
186189
public static boolean validate(ClusterState state) {
187190
IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metadata());
188191
return indexMetadata == null
189192
|| (indexMetadata.getState() == IndexMetadata.State.OPEN
190-
&& state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive());
193+
&& state.routingTable(ProjectId.DEFAULT).index(indexMetadata.getIndex()).allPrimaryShardsActive());
191194
}
192195
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import org.elasticsearch.action.index.IndexRequest;
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
1617
import org.elasticsearch.common.settings.Setting;
1718
import org.elasticsearch.common.settings.Settings;
1819
import org.elasticsearch.common.unit.ByteSizeValue;
20+
import org.elasticsearch.core.NotMultiProjectCapable;
1921
import org.elasticsearch.xcontent.XContentBuilder;
2022
import org.elasticsearch.xcontent.XContentFactory;
2123
import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
@@ -91,10 +93,11 @@ public void forcePut(WatchRecord watchRecord) {
9193
* @param state The current cluster state
9294
* @return true, if history store is ready to be started
9395
*/
96+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
9497
public static boolean validate(ClusterState state) {
9598
IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(HistoryStoreField.DATA_STREAM, state.metadata());
9699
return indexMetadata == null
97100
|| (indexMetadata.getState() == IndexMetadata.State.OPEN
98-
&& state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive());
101+
&& state.routingTable(ProjectId.DEFAULT).index(indexMetadata.getIndex()).allPrimaryShardsActive());
99102
}
100103
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
import org.elasticsearch.client.internal.Client;
1010
import org.elasticsearch.cluster.ClusterState;
1111
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.cluster.service.ClusterService;
1314
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.core.NotMultiProjectCapable;
1416
import org.elasticsearch.threadpool.ThreadPool;
1517
import org.elasticsearch.xcontent.NamedXContentRegistry;
1618
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
@@ -89,9 +91,10 @@ protected String getOrigin() {
8991
return WATCHER_ORIGIN;
9092
}
9193

94+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
9295
public static boolean validate(ClusterState state) {
9396
return state.getMetadata()
94-
.getProject()
97+
.getProject(ProjectId.DEFAULT)
9598
.templatesV2()
9699
.keySet()
97100
.stream()

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1010
import org.elasticsearch.cluster.metadata.IndexMetadata;
1111
import org.elasticsearch.cluster.metadata.Metadata;
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
14+
import org.elasticsearch.core.NotMultiProjectCapable;
1215
import org.elasticsearch.index.Index;
1316
import org.elasticsearch.index.IndexNotFoundException;
1417

@@ -26,7 +29,9 @@ public class WatchStoreUtils {
2629
* @throws IndexNotFoundException If no index exists
2730
*/
2831
public static IndexMetadata getConcreteIndex(String name, Metadata metadata) {
29-
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(name);
32+
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
33+
ProjectMetadata projectMetadata = metadata.getProject(ProjectId.DEFAULT);
34+
IndexAbstraction indexAbstraction = projectMetadata.getIndicesLookup().get(name);
3035
if (indexAbstraction == null) {
3136
return null;
3237
}
@@ -48,7 +53,7 @@ public static IndexMetadata getConcreteIndex(String name, Metadata metadata) {
4853
if (concreteIndex == null) {
4954
concreteIndex = indexAbstraction.getIndices().get(indexAbstraction.getIndices().size() - 1);
5055
}
51-
return metadata.getProject().index(concreteIndex);
56+
return projectMetadata.index(concreteIndex);
5257
}
5358

5459
}

0 commit comments

Comments
 (0)