Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.stats.GeoIpStatsAction;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksMetadataSection;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void cleanUp() throws Exception {
.putNull("ingest.geoip.database_validity")
);
assertBusy(() -> {
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
PersistentTasksMetadataSection.PersistentTask<PersistentTaskParams> task = getTask();
if (task != null) {
GeoIpTaskState state = (GeoIpTaskState) task.getState();
assertThat(state.getDatabases(), anEmptyMap());
Expand Down Expand Up @@ -294,7 +294,7 @@ public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception {
putGeoIpPipeline(pipelineId);
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
assertBusy(() -> {
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
PersistentTasksMetadataSection.PersistentTask<PersistentTaskParams> task = getTask();
assertNotNull(task);
assertNotNull(task.getState());
});
Expand Down Expand Up @@ -467,7 +467,7 @@ private void verifyUpdatedDatabase() throws Exception {
}

private GeoIpTaskState getGeoIpTaskState() {
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
PersistentTasksMetadataSection.PersistentTask<PersistentTaskParams> task = getTask();
assertNotNull(task);
GeoIpTaskState state = (GeoIpTaskState) task.getState();
assertNotNull(state);
Expand Down Expand Up @@ -756,8 +756,8 @@ private void parseDatabase(Path tempFile) throws IOException {
}
}

private PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> getTask() {
return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER);
private PersistentTasksMetadataSection.PersistentTask<PersistentTaskParams> getTask() {
return PersistentTasksMetadataSection.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER);
}

private static class MultiByteArrayInputStream extends InputStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksMetadataSection;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.junit.After;
Expand Down Expand Up @@ -48,7 +48,7 @@ public void cleanUp() {
public void testTaskRemovedAfterCancellation() throws Exception {
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
assertBusy(() -> {
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
PersistentTasksMetadataSection.PersistentTask<PersistentTaskParams> task = getTask();
assertNotNull(task);
assertTrue(task.isAssigned());
});
Expand All @@ -63,7 +63,7 @@ public void testTaskRemovedAfterCancellation() throws Exception {
});
}

private PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> getTask() {
return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER);
private PersistentTasksMetadataSection.PersistentTask<PersistentTaskParams> getTask() {
return PersistentTasksMetadataSection.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.geoip.stats.CacheStats;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksMetadataSection;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -251,7 +251,7 @@ void checkDatabases(ClusterState state) {
return;
}

PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksMetadataSection persistentTasks = state.metadata().section(PersistentTasksMetadataSection.TYPE);
if (persistentTasks == null) {
logger.trace("Not checking databases because persistent tasks are null");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.ingest.geoip.direct.DatabaseConfiguration;
import org.elasticsearch.ingest.geoip.direct.DatabaseConfigurationMetadata;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksMetadataSection.PersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -156,7 +156,7 @@ void updateDatabases() throws IOException {
}

logger.trace("Updating geoip databases");
IngestGeoIpMetadata geoIpMeta = clusterState.metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
IngestGeoIpMetadata geoIpMeta = clusterState.metadata().section(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);

// if there are entries in the cs that aren't in the persistent task state,
// then download those (only)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksMetadataSection;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -126,7 +126,7 @@ protected EnterpriseGeoIpDownloader createTask(
String type,
String action,
TaskId parentTaskId,
PersistentTasksCustomMetadata.PersistentTask<EnterpriseGeoIpTaskParams> taskInProgress,
PersistentTasksMetadataSection.PersistentTask<EnterpriseGeoIpTaskParams> taskInProgress,
Map<String, String> headers
) {
return new EnterpriseGeoIpDownloader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.elasticsearch.ingest.EnterpriseGeoIpTask;
import org.elasticsearch.ingest.geoip.GeoIpTaskState.Metadata;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksMetadataSection;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -33,7 +33,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTaskWithId;
import static org.elasticsearch.persistent.PersistentTasksMetadataSection.getTaskWithId;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

class EnterpriseGeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
Expand Down Expand Up @@ -146,7 +146,7 @@ public void writeTo(StreamOutput out) throws IOException {
*/
@Nullable
static EnterpriseGeoIpTaskState getEnterpriseGeoIpTaskState(ClusterState state) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(state, EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER);
PersistentTasksMetadataSection.PersistentTask<?> task = getTaskWithId(state, EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER);
return (task == null) ? null : (EnterpriseGeoIpTaskState) task.getState();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.ingest.geoip.GeoIpTaskState.Metadata;
import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStats;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksMetadataSection.PersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksMetadataSection;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -169,7 +169,7 @@ protected GeoIpDownloader createTask(
String type,
String action,
TaskId parentTaskId,
PersistentTasksCustomMetadata.PersistentTask<GeoIpTaskParams> taskInProgress,
PersistentTasksMetadataSection.PersistentTask<GeoIpTaskParams> taskInProgress,
Map<String, String> headers
) {
return new GeoIpDownloader(
Expand Down Expand Up @@ -362,7 +362,7 @@ private void startTask(Runnable onFailure) {
}

private void stopTask(Runnable onFailure) {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(
ActionListener<PersistentTasksMetadataSection.PersistentTask<?>> listener = ActionListener.wrap(
r -> logger.debug("Stopped geoip downloader task"),
e -> {
Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksMetadataSection;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -36,7 +36,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTaskWithId;
import static org.elasticsearch.persistent.PersistentTasksMetadataSection.getTaskWithId;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -241,7 +241,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
*/
@Nullable
static GeoIpTaskState getGeoIpTaskState(ClusterState state) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER);
PersistentTasksMetadataSection.PersistentTask<?> task = getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER);
return (task == null) ? null : (GeoIpTaskState) task.getState();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataSection;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -36,7 +37,7 @@
/**
* Holds the ingest-geoip databases that are available in the cluster state.
*/
public final class IngestGeoIpMetadata implements Metadata.Custom {
public final class IngestGeoIpMetadata implements MetadataSection {

public static final String TYPE = "ingest_geoip";
private static final ParseField DATABASES_FIELD = new ParseField("databases");
Expand Down Expand Up @@ -100,11 +101,11 @@ public EnumSet<Metadata.XContentContext> context() {
}

@Override
public Diff<Metadata.Custom> diff(Metadata.Custom before) {
public Diff<MetadataSection> diff(MetadataSection before) {
return new GeoIpMetadataDiff((IngestGeoIpMetadata) before, this);
}

static class GeoIpMetadataDiff implements NamedDiff<Metadata.Custom> {
static class GeoIpMetadataDiff implements NamedDiff<MetadataSection> {

final Diff<Map<String, DatabaseConfigurationMetadata>> databases;

Expand All @@ -122,7 +123,7 @@ static class GeoIpMetadataDiff implements NamedDiff<Metadata.Custom> {
}

@Override
public Metadata.Custom apply(Metadata.Custom part) {
public MetadataSection apply(MetadataSection part) {
return new IngestGeoIpMetadata(databases.apply(((IngestGeoIpMetadata) part).databases));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataSection;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -225,7 +225,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
new NamedWriteableRegistry.Entry(Metadata.Custom.class, IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata::new),
new NamedWriteableRegistry.Entry(MetadataSection.class, IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.GeoIpMetadataDiff::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, GEOIP_DOWNLOADER, GeoIpTaskState::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, GEOIP_DOWNLOADER, GeoIpTaskParams::new),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TransportDeleteDatabaseConfigurationAction(
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
final String id = request.getDatabaseId();
final IngestGeoIpMetadata geoIpMeta = state.metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
final IngestGeoIpMetadata geoIpMeta = state.metadata().section(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
if (geoIpMeta.getDatabases().containsKey(id) == false) {
throw new ResourceNotFoundException("Database configuration not found: {}", id);
}
Expand All @@ -103,15 +103,15 @@ private record DeleteDatabaseConfigurationTask(ActionListener<AcknowledgedRespon
ClusterStateTaskListener {

ClusterState execute(ClusterState currentState) throws Exception {
final IngestGeoIpMetadata geoIpMeta = currentState.metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
final IngestGeoIpMetadata geoIpMeta = currentState.metadata().section(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);

logger.debug("deleting database configuration [{}]", databaseId);
Map<String, DatabaseConfigurationMetadata> databases = new HashMap<>(geoIpMeta.getDatabases());
databases.remove(databaseId);

Metadata currentMeta = currentState.metadata();
return ClusterState.builder(currentState)
.metadata(Metadata.builder(currentMeta).putCustom(IngestGeoIpMetadata.TYPE, new IngestGeoIpMetadata(databases)))
.metadata(Metadata.builder(currentMeta).putSection(IngestGeoIpMetadata.TYPE, new IngestGeoIpMetadata(databases)))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void masterOperation(
);
}

final IngestGeoIpMetadata geoIpMeta = state.metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
final IngestGeoIpMetadata geoIpMeta = state.metadata().section(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
List<DatabaseConfigurationMetadata> results = new ArrayList<>();

for (String id : ids) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static boolean isNoopUpdate(@Nullable DatabaseConfigurationMetadata existingData

static void validatePrerequisites(DatabaseConfiguration database, ClusterState state) {
// we need to verify that the database represents a unique file (name) among the various databases for this same provider
IngestGeoIpMetadata geoIpMeta = state.metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
IngestGeoIpMetadata geoIpMeta = state.metadata().section(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);

Optional<DatabaseConfiguration> sameName = geoIpMeta.getDatabases()
.values()
Expand All @@ -131,7 +131,7 @@ private record UpdateDatabaseConfigurationTask(ActionListener<AcknowledgedRespon
ClusterStateTaskListener {

ClusterState execute(ClusterState currentState) throws Exception {
IngestGeoIpMetadata geoIpMeta = currentState.metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
IngestGeoIpMetadata geoIpMeta = currentState.metadata().section(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);

String id = database.id();
final DatabaseConfigurationMetadata existingDatabase = geoIpMeta.getDatabases().get(id);
Expand Down Expand Up @@ -161,7 +161,7 @@ ClusterState execute(ClusterState currentState) throws Exception {

Metadata currentMeta = currentState.metadata();
return ClusterState.builder(currentState)
.metadata(Metadata.builder(currentMeta).putCustom(IngestGeoIpMetadata.TYPE, geoIpMeta))
.metadata(Metadata.builder(currentMeta).putSection(IngestGeoIpMetadata.TYPE, geoIpMeta))
.build();
}

Expand Down
Loading