Skip to content

Commit 19f653a

Browse files
authored
Handle task type split on reading cluster state from disk (#1964)
An old node writes all tasks in the metadata custom. A new old must be able to read it and separate the cluster-scoped and project-scoped tasks and store them in the right place. This PR does that. Relates: #1945, MP-1938
1 parent 0d2ffb4 commit 19f653a

File tree

2 files changed

+98
-4
lines changed

2 files changed

+98
-4
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1714,7 +1714,17 @@ public static Metadata fromXContent(XContentParser parser) throws IOException {
17141714
&& MOVED_PROJECT_CUSTOMS.contains(currentFieldName) == false) {
17151715
parseCustomObject(parser, currentFieldName, ClusterCustom.class, builder::putCustom);
17161716
} else if (registry.hasParser(ProjectCustom.class, currentFieldName, parser.getRestApiVersion())) {
1717-
parseCustomObject(parser, currentFieldName, ProjectCustom.class, builder::putProjectCustom);
1717+
parseCustomObject(parser, currentFieldName, ProjectCustom.class, (name, projectCustom) -> {
1718+
if (projectCustom instanceof PersistentTasksCustomMetadata persistentTasksCustomMetadata) {
1719+
assert PersistentTasksCustomMetadata.TYPE.equals(name)
1720+
: name + " != " + PersistentTasksCustomMetadata.TYPE;
1721+
final var tuple = persistentTasksCustomMetadata.split();
1722+
builder.putProjectCustom(PersistentTasksCustomMetadata.TYPE, tuple.v2());
1723+
builder.putCustom(ClusterPersistentTasksCustomMetadata.TYPE, tuple.v1());
1724+
} else {
1725+
builder.putProjectCustom(name, projectCustom);
1726+
}
1727+
});
17181728
} else {
17191729
logger.warn("Skipping unknown custom object with type {}", currentFieldName);
17201730
parser.skipChildren();

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
import org.elasticsearch.TransportVersion;
1414
import org.elasticsearch.TransportVersions;
1515
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
16+
import org.elasticsearch.client.internal.Client;
1617
import org.elasticsearch.cluster.ClusterModule;
1718
import org.elasticsearch.cluster.ClusterState;
1819
import org.elasticsearch.cluster.Diff;
1920
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
2021
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
22+
import org.elasticsearch.cluster.service.ClusterService;
2123
import org.elasticsearch.common.Strings;
2224
import org.elasticsearch.common.UUIDs;
2325
import org.elasticsearch.common.bytes.BytesArray;
@@ -28,12 +30,15 @@
2830
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2931
import org.elasticsearch.common.io.stream.StreamInput;
3032
import org.elasticsearch.common.io.stream.StreamOutput;
33+
import org.elasticsearch.common.settings.ClusterSettings;
34+
import org.elasticsearch.common.settings.IndexScopedSettings;
3135
import org.elasticsearch.common.settings.Setting;
3236
import org.elasticsearch.common.settings.Settings;
3337
import org.elasticsearch.common.util.Maps;
3438
import org.elasticsearch.common.util.set.Sets;
3539
import org.elasticsearch.common.xcontent.ChunkedToXContent;
3640
import org.elasticsearch.common.xcontent.XContentHelper;
41+
import org.elasticsearch.core.FixForMultiProject;
3742
import org.elasticsearch.core.Nullable;
3843
import org.elasticsearch.core.Predicates;
3944
import org.elasticsearch.core.SuppressForbidden;
@@ -47,13 +52,22 @@
4752
import org.elasticsearch.index.alias.RandomAliasActionsGenerator;
4853
import org.elasticsearch.index.mapper.MapperService;
4954
import org.elasticsearch.indices.IndicesModule;
55+
import org.elasticsearch.indices.SystemIndices;
5056
import org.elasticsearch.ingest.IngestMetadata;
57+
import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata;
58+
import org.elasticsearch.persistent.PersistentTasks;
59+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
60+
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
61+
import org.elasticsearch.persistent.PersistentTasksService;
5162
import org.elasticsearch.plugins.FieldPredicate;
5263
import org.elasticsearch.plugins.MapperPlugin;
5364
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
5465
import org.elasticsearch.test.ESTestCase;
5566
import org.elasticsearch.test.TransportVersionUtils;
5667
import org.elasticsearch.test.index.IndexVersionUtils;
68+
import org.elasticsearch.threadpool.ThreadPool;
69+
import org.elasticsearch.upgrades.SystemIndexMigrationExecutor;
70+
import org.elasticsearch.upgrades.SystemIndexMigrationTaskParams;
5771
import org.elasticsearch.xcontent.NamedXContentRegistry;
5872
import org.elasticsearch.xcontent.ToXContent;
5973
import org.elasticsearch.xcontent.XContentBuilder;
@@ -111,6 +125,8 @@
111125
import static org.hamcrest.Matchers.nullValue;
112126
import static org.hamcrest.Matchers.sameInstance;
113127
import static org.hamcrest.Matchers.startsWith;
128+
import static org.mockito.Mockito.mock;
129+
import static org.mockito.Mockito.when;
114130

115131
public class MetadataTests extends ESTestCase {
116132

@@ -750,6 +766,10 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException {
750766
{
751767
"id": "health-node",
752768
"task":{ "health-node": {"params":{}} }
769+
},
770+
{
771+
"id": "upgrade-system-indices",
772+
"task":{ "upgrade-system-indices": {"params":{}} }
753773
}
754774
]
755775
},
@@ -797,17 +817,45 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException {
797817
registry.addAll(ClusterModule.getNamedXWriteables());
798818
registry.addAll(IndicesModule.getNamedXContents());
799819
registry.addAll(HealthNodeTaskExecutor.getNamedXContentParsers());
820+
registry.addAll(SystemIndexMigrationExecutor.getNamedXContentParsers());
821+
822+
final var clusterService = mock(ClusterService.class);
823+
when(clusterService.threadPool()).thenReturn(mock(ThreadPool.class));
824+
final var healthNodeTaskExecutor = HealthNodeTaskExecutor.create(
825+
clusterService,
826+
mock(PersistentTasksService.class),
827+
Settings.EMPTY,
828+
ClusterSettings.createBuiltInClusterSettings()
829+
);
830+
final var systemIndexMigrationExecutor = new SystemIndexMigrationExecutor(
831+
mock(Client.class),
832+
clusterService,
833+
mock(SystemIndices.class),
834+
mock(MetadataUpdateSettingsService.class),
835+
mock(MetadataCreateIndexService.class),
836+
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS
837+
);
838+
new PersistentTasksExecutorRegistry(List.of(healthNodeTaskExecutor, systemIndexMigrationExecutor));
800839

801840
XContentParserConfiguration config = XContentParserConfiguration.EMPTY.withRegistry(new NamedXContentRegistry(registry));
802841
try (XContentParser parser = JsonXContent.jsonXContent.createParser(config, json)) {
803842
final var metatdata = Metadata.fromXContent(parser);
804843
assertThat(metatdata, notNullValue());
805844
assertThat(metatdata.clusterUUID(), is("aba1aa1ababbbaabaabaab"));
806-
assertThat(metatdata.customs().keySet(), containsInAnyOrder("desired_nodes"));
845+
assertThat(metatdata.customs().keySet(), containsInAnyOrder("desired_nodes", "cluster_persistent_tasks"));
846+
@FixForMultiProject(description = "adjust the assertion once health-node becomes cluster-scoped")
847+
final var clusterTasks = ClusterPersistentTasksCustomMetadata.get(metatdata);
848+
assertThat(clusterTasks.tasks(), hasSize(0));
807849
assertThat(
808850
metatdata.getProject().customs().keySet(),
809851
containsInAnyOrder("persistent_tasks", "index-graveyard", "component_template")
810852
);
853+
final var projectTasks = PersistentTasksCustomMetadata.get(metatdata.getProject());
854+
assertThat(
855+
projectTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getTaskName).toList(),
856+
containsInAnyOrder("health-node", "upgrade-system-indices")
857+
);
858+
assertThat(clusterTasks.getLastAllocationId(), equalTo(projectTasks.getLastAllocationId()));
811859
}
812860
}
813861

@@ -2534,16 +2582,52 @@ public void testEmptyDiffReturnsSameInstance() throws IOException {
25342582
}
25352583

25362584
public void testMultiProjectXContent() throws IOException {
2537-
final List<ProjectMetadata> projects = randomList(1, 5, () -> randomProject(new ProjectId(randomUUID()), randomIntBetween(1, 3)));
2585+
final long lastAllocationId = randomNonNegativeLong();
2586+
final List<ProjectMetadata> projects = randomList(1, 5, () -> randomProject(new ProjectId(randomUUID()), randomIntBetween(1, 3)))
2587+
.stream()
2588+
.map(
2589+
project -> ProjectMetadata.builder(project)
2590+
.putCustom(
2591+
PersistentTasksCustomMetadata.TYPE,
2592+
new PersistentTasksCustomMetadata(
2593+
lastAllocationId,
2594+
Map.of(
2595+
SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME,
2596+
new PersistentTasksCustomMetadata.PersistentTask<>(
2597+
SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME,
2598+
SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME,
2599+
new SystemIndexMigrationTaskParams(),
2600+
lastAllocationId,
2601+
PersistentTasks.INITIAL_ASSIGNMENT
2602+
)
2603+
)
2604+
)
2605+
)
2606+
.build()
2607+
)
2608+
.toList();
2609+
2610+
@FixForMultiProject(description = "considering adding health-node into metadata customs once health-node becomes a cluster task")
25382611
final Metadata originalMeta = randomMetadata(projects);
2612+
25392613
ToXContent.Params p = new ToXContent.MapParams(
25402614
Map.of("multi-project", "true", Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY)
25412615
);
25422616

25432617
final BytesReference bytes = toXContentBytes(originalMeta, p);
2544-
try (XContentParser parser = createParser(JsonXContent.jsonXContent, bytes)) {
2618+
final List<NamedXContentRegistry.Entry> registry = new ArrayList<>();
2619+
registry.addAll(ClusterModule.getNamedXWriteables());
2620+
registry.addAll(SystemIndexMigrationExecutor.getNamedXContentParsers());
2621+
final var config = XContentParserConfiguration.EMPTY.withRegistry(new NamedXContentRegistry(registry));
2622+
2623+
try (XContentParser parser = createParser(config, JsonXContent.jsonXContent, bytes)) {
25452624
Metadata fromXContentMeta = Metadata.fromXContent(parser);
25462625
assertThat(fromXContentMeta.projects().keySet(), equalTo(originalMeta.projects().keySet()));
2626+
for (var project : fromXContentMeta.projects().values()) {
2627+
final var projectTasks = PersistentTasksCustomMetadata.get(project);
2628+
assertThat(projectTasks.getLastAllocationId(), equalTo(lastAllocationId));
2629+
assertThat(projectTasks.taskMap().keySet(), equalTo(Set.of(SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME)));
2630+
}
25472631
}
25482632
}
25492633

0 commit comments

Comments
 (0)