Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -823,10 +825,11 @@ public String getFeatureName() {
}

@Override
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener<Map<String, Object>> listener) {
Client originClient = new OriginSettingClient(client, WATCHER_ORIGIN);
boolean manuallyStopped = Optional.ofNullable(
clusterService.state().metadata().getProject().<WatcherMetadata>custom(WatcherMetadata.TYPE)
clusterService.state().metadata().getProject(ProjectId.DEFAULT).<WatcherMetadata>custom(WatcherMetadata.TYPE)
).map(WatcherMetadata::manuallyStopped).orElse(false);

if (manuallyStopped == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
Expand All @@ -22,6 +23,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -251,10 +253,12 @@ private void checkWatchIndexHasChanged(IndexMetadata metadata, ClusterChangedEve
* @param event The cluster changed event containing the new cluster state
*/
private void reloadConfiguration(String watchIndex, List<ShardRouting> localShardRouting, ClusterChangedEvent event) {
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
ProjectId projectId = ProjectId.DEFAULT;
// changed alias means to always read a new configuration
boolean isAliasChanged = watchIndex.equals(configuration.index) == false;
if (isAliasChanged || hasShardAllocationIdChanged(watchIndex, event.state())) {
IndexRoutingTable watchIndexRoutingTable = event.state().routingTable().index(watchIndex);
if (isAliasChanged || hasShardAllocationIdChanged(projectId, watchIndex, event.state())) {
IndexRoutingTable watchIndexRoutingTable = event.state().routingTable(projectId).index(watchIndex);
Map<ShardId, ShardAllocationConfiguration> ids = getLocalShardAllocationIds(localShardRouting, watchIndexRoutingTable);
configuration = new Configuration(watchIndex, ids);
}
Expand All @@ -267,9 +271,9 @@ private void reloadConfiguration(String watchIndex, List<ShardRouting> localShar
* @param state The new cluster state
* @return true if the routing tables has changed and local shards are affected
*/
private boolean hasShardAllocationIdChanged(String watchIndex, ClusterState state) {
List<ShardRouting> allStartedRelocatedShards = state.getRoutingTable().index(watchIndex).shardsWithState(STARTED);
allStartedRelocatedShards.addAll(state.getRoutingTable().index(watchIndex).shardsWithState(RELOCATING));
private boolean hasShardAllocationIdChanged(ProjectId projectId, String watchIndex, ClusterState state) {
List<ShardRouting> allStartedRelocatedShards = state.routingTable(projectId).index(watchIndex).shardsWithState(STARTED);
allStartedRelocatedShards.addAll(state.routingTable(projectId).index(watchIndex).shardsWithState(RELOCATING));

// exit early, when there are shards, but the current configuration is inactive
if (allStartedRelocatedShards.isEmpty() == false && configuration == INACTIVE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.core.watcher.WatcherMetadata;
Expand Down Expand Up @@ -153,8 +156,11 @@ public void clusterChanged(ClusterChangedEvent event) {
// also check if non local shards have changed, as loosing a shard on a
// remote node or adding a replica on a remote node needs to trigger a reload too
Set<ShardId> localShardIds = localShards.stream().map(ShardRouting::shardId).collect(Collectors.toSet());
List<ShardRouting> allShards = event.state().routingTable().index(watchIndex).shardsWithState(STARTED);
allShards.addAll(event.state().routingTable().index(watchIndex).shardsWithState(RELOCATING));

@NotMultiProjectCapable(description = "Watcher is not available in serverless")
IndexRoutingTable routingTable = event.state().routingTable(ProjectId.DEFAULT).index(watchIndex);
List<ShardRouting> allShards = routingTable.shardsWithState(STARTED);
allShards.addAll(routingTable.shardsWithState(RELOCATING));
List<ShardRouting> localAffectedShardRoutings = allShards.stream()
.filter(shardRouting -> localShardIds.contains(shardRouting.shardId()))
// shardrouting is not comparable, so we need some order mechanism
Expand Down Expand Up @@ -192,8 +198,9 @@ private void pauseExecution(String reason) {
/**
* check if watcher has been stopped manually via the stop API
*/
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
private static boolean isWatcherStoppedManually(ClusterState state) {
WatcherMetadata watcherMetadata = state.getMetadata().getProject().custom(WatcherMetadata.TYPE);
WatcherMetadata watcherMetadata = state.getMetadata().getProject(ProjectId.DEFAULT).custom(WatcherMetadata.TYPE);
return watcherMetadata != null && watcherMetadata.manuallyStopped();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.Preference;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -134,6 +136,8 @@ public class WatcherService {
* @return true if everything is good to go, so that the service can be started
*/
public boolean validate(ClusterState state) {
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
ProjectId projectId = ProjectId.DEFAULT;
IndexMetadata watcherIndexMetadata = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metadata());
IndexMetadata triggeredWatchesIndexMetadata = WatchStoreUtils.getConcreteIndex(
TriggeredWatchStoreField.INDEX_NAME,
Expand All @@ -160,7 +164,7 @@ public boolean validate(ClusterState state) {

return watcherIndexMetadata == null
|| (watcherIndexMetadata.getState() == IndexMetadata.State.OPEN
&& state.routingTable().index(watcherIndexMetadata.getIndex()).allPrimaryShardsActive());
&& state.routingTable(projectId).index(watcherIndexMetadata.getIndex()).allPrimaryShardsActive());
} catch (IllegalStateException e) {
logger.warn("Validation error: cannot start watcher", e);
return false;
Expand Down Expand Up @@ -329,7 +333,8 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndexName, RELOCATING, STARTED).toList();

// find out all allocation ids
List<ShardRouting> watchIndexShardRoutings = clusterState.getRoutingTable().allShards(watchIndexName);
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
List<ShardRouting> watchIndexShardRoutings = clusterState.routingTable(ProjectId.DEFAULT).allShards(watchIndexName);

SearchRequest searchRequest = new SearchRequest(INDEX).scroll(scrollTimeout)
.preference(Preference.ONLY_LOCAL.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -183,10 +185,11 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
return triggeredWatches;
}

@NotMultiProjectCapable(description = "Watcher is not available in serverless")
public static boolean validate(ClusterState state) {
IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metadata());
return indexMetadata == null
|| (indexMetadata.getState() == IndexMetadata.State.OPEN
&& state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive());
&& state.routingTable(ProjectId.DEFAULT).index(indexMetadata.getIndex()).allPrimaryShardsActive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
Expand Down Expand Up @@ -91,10 +93,11 @@ public void forcePut(WatchRecord watchRecord) {
* @param state The current cluster state
* @return true, if history store is ready to be started
*/
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
public static boolean validate(ClusterState state) {
IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(HistoryStoreField.DATA_STREAM, state.metadata());
return indexMetadata == null
|| (indexMetadata.getState() == IndexMetadata.State.OPEN
&& state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive());
&& state.routingTable(ProjectId.DEFAULT).index(indexMetadata.getIndex()).allPrimaryShardsActive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
Expand Down Expand Up @@ -89,9 +91,10 @@ protected String getOrigin() {
return WATCHER_ORIGIN;
}

@NotMultiProjectCapable(description = "Watcher is not available in serverless")
public static boolean validate(ClusterState state) {
return state.getMetadata()
.getProject()
.getProject(ProjectId.DEFAULT)
.templatesV2()
.keySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;

Expand All @@ -26,7 +29,9 @@ public class WatchStoreUtils {
* @throws IndexNotFoundException If no index exists
*/
public static IndexMetadata getConcreteIndex(String name, Metadata metadata) {
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(name);
@NotMultiProjectCapable(description = "Watcher is not available in serverless")
ProjectMetadata projectMetadata = metadata.getProject(ProjectId.DEFAULT);
IndexAbstraction indexAbstraction = projectMetadata.getIndicesLookup().get(name);
if (indexAbstraction == null) {
return null;
}
Expand All @@ -48,7 +53,7 @@ public static IndexMetadata getConcreteIndex(String name, Metadata metadata) {
if (concreteIndex == null) {
concreteIndex = indexAbstraction.getIndices().get(indexAbstraction.getIndices().size() - 1);
}
return metadata.getProject().index(concreteIndex);
return projectMetadata.index(concreteIndex);
}

}
Loading