Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

import org.elasticsearch.plugins.internal.RestExtension;
import org.elasticsearch.reservedstate.ReservedStateHandlerProvider;

/** The Elasticsearch Server Module. */
module org.elasticsearch.server {
Expand Down Expand Up @@ -411,7 +412,7 @@
org.elasticsearch.index.shard.ShardToolCliProvider;

uses org.elasticsearch.reservedstate.service.FileSettingsServiceProvider;
uses org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
uses ReservedStateHandlerProvider;
uses org.elasticsearch.jdk.ModuleQualifiedExportsService;
uses org.elasticsearch.node.internal.TerminationHandlerProvider;
uses org.elasticsearch.internal.VersionExtension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,7 @@
import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectIdResolver;
import org.elasticsearch.cluster.routing.RerouteService;
Expand Down Expand Up @@ -254,6 +252,7 @@
import org.elasticsearch.repositories.VerifyNodeRepositoryAction;
import org.elasticsearch.repositories.VerifyNodeRepositoryCoordinationAction;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.reservedstate.service.ReservedClusterStateService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -474,8 +473,8 @@ public ActionModule(
TelemetryProvider telemetryProvider,
ClusterService clusterService,
RerouteService rerouteService,
List<ReservedClusterStateHandler<ClusterState, ?>> reservedClusterStateHandlers,
List<ReservedClusterStateHandler<ProjectMetadata, ?>> reservedProjectStateHandlers,
List<ReservedClusterStateHandler<?>> reservedClusterStateHandlers,
List<ReservedProjectStateHandler<?>> reservedProjectStateHandlers,
RestExtension restExtension,
IncrementalBulkService bulkService,
ProjectIdResolver projectIdResolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* It is used by the ReservedClusterStateService to add/update or remove snapshot repositories. Typical usage
* for this action is in the context of file based settings.
*/
public class ReservedRepositoryAction implements ReservedClusterStateHandler<ClusterState, List<PutRepositoryRequest>> {
public class ReservedRepositoryAction implements ReservedClusterStateHandler<List<PutRepositoryRequest>> {
public static final String NAME = "snapshot_repositories";

private final RepositoriesService repositoriesService;
Expand Down Expand Up @@ -67,8 +67,7 @@ public Collection<PutRepositoryRequest> prepare(Object input) {
}

@Override
public TransformState<ClusterState> transform(List<PutRepositoryRequest> source, TransformState<ClusterState> prevState)
throws Exception {
public TransformState transform(List<PutRepositoryRequest> source, TransformState prevState) throws Exception {
var requests = prepare(source);

ClusterState state = prevState.state();
Expand All @@ -88,7 +87,7 @@ public TransformState<ClusterState> transform(List<PutRepositoryRequest> source,
state = task.execute(state);
}

return new TransformState<>(state, entities);
return new TransformState(state, entities);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand All @@ -45,7 +48,7 @@
*/
public class ReservedComposableIndexTemplateAction
implements
ReservedClusterStateHandler<ProjectMetadata, ReservedComposableIndexTemplateAction.ComponentsAndComposables> {
ReservedProjectStateHandler<ReservedComposableIndexTemplateAction.ComponentsAndComposables> {
public static final String NAME = "index_templates";
public static final String COMPONENTS = "component_templates";
private static final String COMPONENT_PREFIX = "component_template:";
Expand Down Expand Up @@ -133,10 +136,10 @@ private ComponentsAndComposables prepare(ComponentsAndComposables componentsAndC
}

@Override
public TransformState<ProjectMetadata> transform(ComponentsAndComposables source, TransformState<ProjectMetadata> prevState)
throws Exception {
public TransformState transform(ProjectId projectId, ComponentsAndComposables source, TransformState prevState) throws Exception {
var requests = prepare(source);
ProjectMetadata project = prevState.state();
ClusterState clusterState = prevState.state();
ProjectMetadata project = clusterState.getMetadata().getProject(projectId);

// We transform in the following order:
// 1. create or update component templates (composable templates depend on them)
Expand Down Expand Up @@ -192,7 +195,10 @@ public TransformState<ProjectMetadata> transform(ComponentsAndComposables source
project = MetadataIndexTemplateService.innerRemoveComponentTemplate(project, componentNames);
}

return new TransformState<>(project, Sets.union(componentEntities, composableEntities));
return new TransformState(
ClusterState.builder(clusterState).putProjectMetadata(project).build(),
Sets.union(componentEntities, composableEntities)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand All @@ -36,7 +38,7 @@
* It is used by the ReservedClusterStateService to add/update or remove ingest pipelines. Typical usage
* for this action is in the context of file based state.
*/
public class ReservedPipelineAction implements ReservedClusterStateHandler<ProjectMetadata, List<PutPipelineRequest>> {
public class ReservedPipelineAction implements ReservedProjectStateHandler<List<PutPipelineRequest>> {
public static final String NAME = "ingest_pipelines";

/**
Expand Down Expand Up @@ -78,21 +80,21 @@ private static ProjectMetadata wrapIngestTaskExecute(IngestService.PipelineClust
}

@Override
public TransformState<ProjectMetadata> transform(List<PutPipelineRequest> source, TransformState<ProjectMetadata> prevState)
throws Exception {
public TransformState transform(ProjectId projectId, List<PutPipelineRequest> source, TransformState prevState) throws Exception {
var requests = prepare(source);

ProjectMetadata state = prevState.state();
ClusterState clusterState = prevState.state();
ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId);

for (var request : requests) {
var nopUpdate = IngestService.isNoOpPipelineUpdate(state, request);
var nopUpdate = IngestService.isNoOpPipelineUpdate(projectMetadata, request);

if (nopUpdate) {
continue;
}

var task = new IngestService.PutPipelineClusterStateUpdateTask(state.id(), request);
state = wrapIngestTaskExecute(task, state);
var task = new IngestService.PutPipelineClusterStateUpdateTask(projectMetadata.id(), request);
projectMetadata = wrapIngestTaskExecute(task, projectMetadata);
}

Set<String> entities = requests.stream().map(PutPipelineRequest::getId).collect(Collectors.toSet());
Expand All @@ -102,18 +104,18 @@ public TransformState<ProjectMetadata> transform(List<PutPipelineRequest> source

for (var pipelineToDelete : toDelete) {
var task = new IngestService.DeletePipelineClusterStateUpdateTask(
state.id(),
projectMetadata.id(),
null,
new DeletePipelineRequest(
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
pipelineToDelete
)
);
state = wrapIngestTaskExecute(task, state);
projectMetadata = wrapIngestTaskExecute(task, projectMetadata);
}

return new TransformState<>(state, entities);
return new TransformState(ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build(), entities);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService;
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -189,7 +188,8 @@
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.reservedstate.ReservedStateHandlerProvider;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.reservedstate.service.FileSettingsService;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthIndicatorService;
Expand Down Expand Up @@ -995,7 +995,7 @@ public Map<String, String> queryFields() {
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
modules.bindToInstance(ResponseCollectorService.class, responseCollectorService);

var reservedStateHandlerProviders = pluginsService.loadServiceProviders(ReservedClusterStateHandlerProvider.class);
var reservedStateHandlerProviders = pluginsService.loadServiceProviders(ReservedStateHandlerProvider.class);

ActionModule actionModule = new ActionModule(
settings,
Expand Down Expand Up @@ -1599,11 +1599,11 @@ private Module loadPersistedClusterStateService(
return b -> b.bind(PersistedClusterStateService.class).toInstance(service);
}

private List<ReservedClusterStateHandler<ClusterState, ?>> buildReservedClusterStateHandlers(
List<? extends ReservedClusterStateHandlerProvider> handlers,
private List<ReservedClusterStateHandler<?>> buildReservedClusterStateHandlers(
List<? extends ReservedStateHandlerProvider> handlers,
SettingsModule settingsModule
) {
List<ReservedClusterStateHandler<ClusterState, ?>> reservedStateHandlers = new ArrayList<>();
List<ReservedClusterStateHandler<?>> reservedStateHandlers = new ArrayList<>();

// add all reserved state handlers from server
reservedStateHandlers.add(new ReservedClusterSettingsAction(settingsModule.getClusterSettings()));
Expand All @@ -1614,8 +1614,8 @@ private Module loadPersistedClusterStateService(
return reservedStateHandlers;
}

private List<ReservedClusterStateHandler<ProjectMetadata, ?>> buildReservedProjectStateHandlers(
List<? extends ReservedClusterStateHandlerProvider> handlers,
private List<ReservedProjectStateHandler<?>> buildReservedProjectStateHandlers(
List<? extends ReservedStateHandlerProvider> handlers,
SettingsModule settingsModule,
ClusterService clusterService,
IndicesService indicesService,
Expand All @@ -1624,7 +1624,7 @@ private Module loadPersistedClusterStateService(
MetadataCreateIndexService metadataCreateIndexService,
DataStreamGlobalRetentionSettings globalRetentionSettings
) {
List<ReservedClusterStateHandler<ProjectMetadata, ?>> reservedStateHandlers = new ArrayList<>();
List<ReservedProjectStateHandler<?>> reservedStateHandlers = new ArrayList<>();

var templateService = new MetadataIndexTemplateService(
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,12 @@

package org.elasticsearch.reservedstate;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

/**
* Base interface used for implementing 'operator mode' cluster state updates.
*
* <p>
* Reserving cluster state, for file based settings and modules/plugins, requires
* that we have a separate update handler interface that is different than the REST handlers. This interface class declares
* the basic contract for implementing cluster state update handlers that result in a cluster state that is effectively immutable
* by the REST handlers. The only way the reserved cluster state can be updated is through the 'operator mode' actions, e.g. updating
* the file settings.
* </p>
* {@link ReservedStateHandler} for updating cluster-wide cluster state.
*
* @param <S> The state type to be updated by this handler
* @param <T> The type used to represent the state update
*/
public interface ReservedClusterStateHandler<S, T> {
/**
* Unique identifier for the handler.
*
* <p>
* The handler name is a unique identifier that is matched to a section in a
* cluster state update content. The reserved cluster state updates are done as a single
* cluster state update and the cluster state is typically supplied as a combined content,
* unlike the REST handlers. This name must match a desired content key name in the combined
* cluster state update, e.g. "ilm" or "cluster_settings" (for persistent cluster settings update).
*
* @return a String with the handler name, e.g "ilm".
*/
String name();
public interface ReservedClusterStateHandler<T> extends ReservedStateHandler<T> {

/**
* The transformation method implemented by the handler.
Expand All @@ -58,77 +27,11 @@ public interface ReservedClusterStateHandler<S, T> {
* {@link TransformState}, which contains the current cluster state as well as any previous keys
* set by this handler on prior invocation.
*
* @param source The parsed information specific to this handler from the combined cluster state content
* @param source The parsed information specific to this handler from the combined cluster state content
* @param prevState The previous cluster state and keys set by this handler (if any)
* @return The modified state and the current keys set by this handler
* @throws Exception
*/
TransformState<S> transform(T source, TransformState<S> prevState) throws Exception;

/**
* List of dependent handler names for this handler.
*
* <p>
* Sometimes certain parts of the cluster state cannot be created/updated without previously
* setting other cluster state components, e.g. composable templates. Since the reserved cluster state handlers
* are processed in random order by the ReservedClusterStateService, this method gives an opportunity
* to any reserved handler to declare other state handlers it depends on. Given dependencies exist,
* the ReservedClusterStateService will order those handlers such that the handlers that are dependent
* on are processed first.
*
* @return a collection of reserved state handler names
*/
default Collection<String> dependencies() {
return Collections.emptyList();
}

/**
* List of optional dependent handler names for this handler.
*
* <p>
* These are dependent handlers which may or may not exist for this handler to be
* processed. If the optional dependency exists, then they are simply ordered to be
* merged into the cluster state before this handler.
*
* @return a collection of optional reserved state handler names
*/
default Collection<String> optionalDependencies() {
return Collections.emptyList();
}

/**
* Generic validation helper method that throws consistent exception for all handlers.
*
* <p>
* All implementations of {@link ReservedClusterStateHandler} should call the request validate method, by calling this default
* implementation. To aid in any special validation logic that may need to be implemented by the reserved cluster state handler
* we provide this convenience method.
*
* @param request the master node request that we base this reserved state handler on
*/
default void validate(MasterNodeRequest<?> request) {
ActionRequestValidationException exception = request.validate();
if (exception != null) {
throw new IllegalStateException("Validation error", exception);
}
}
TransformState transform(T source, TransformState prevState) throws Exception;

/**
* The parse content method which is called during parsing of file based content.
*
* <p>
* The immutable state can be provided as XContent, which means that each handler needs
* to implement a method to convert an XContent to an object it can consume later in
* transform
*
* @param parser the XContent parser we are parsing from
* @return
* @throws IOException
*/
T fromXContent(XContentParser parser) throws IOException;

/**
* Reserved-state handlers create master-node requests but never actually send them to the master node so the timeouts are not relevant.
*/
TimeValue RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT = TimeValue.THIRTY_SECONDS;
}
Loading