Skip to content

Commit f0eb8da

Browse files
authored
Make DeleteSourceAndAddDownsampleToDS project-aware (#124808)
This is part of the work to make DLM project-aware.
1 parent e0c4c4d commit f0eb8da

File tree

5 files changed

+158
-61
lines changed

5 files changed

+158
-61
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
5454
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver;
5555
import org.elasticsearch.cluster.metadata.Metadata;
56+
import org.elasticsearch.cluster.metadata.ProjectId;
5657
import org.elasticsearch.cluster.routing.allocation.AllocationService;
5758
import org.elasticsearch.cluster.service.ClusterService;
5859
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
@@ -63,6 +64,7 @@
6364
import org.elasticsearch.common.settings.Setting;
6465
import org.elasticsearch.common.settings.Settings;
6566
import org.elasticsearch.common.unit.ByteSizeValue;
67+
import org.elasticsearch.core.FixForMultiProject;
6668
import org.elasticsearch.core.Nullable;
6769
import org.elasticsearch.core.Strings;
6870
import org.elasticsearch.core.TimeValue;
@@ -694,10 +696,13 @@ private void replaceBackingIndexWithDownsampleIndexOnce(DataStream dataStream, S
694696
downsampleIndexName,
695697
dataStream
696698
);
699+
@FixForMultiProject(description = "The correct project ID should be passed here")
700+
final var projectId = ProjectId.DEFAULT;
697701
swapSourceWithDownsampleIndexQueue.submitTask(
698702
"data-stream-lifecycle-delete-source[" + backingIndexName + "]-add-to-datastream-[" + downsampleIndexName + "]",
699703
new DeleteSourceAndAddDownsampleToDS(
700704
settings,
705+
projectId,
701706
dataStream.getName(),
702707
backingIndexName,
703708
downsampleIndexName,

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/downsampling/DeleteSourceAndAddDownsampleToDS.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
import org.elasticsearch.cluster.metadata.DataStream;
1818
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1919
import org.elasticsearch.cluster.metadata.IndexMetadata;
20-
import org.elasticsearch.cluster.metadata.Metadata;
2120
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
21+
import org.elasticsearch.cluster.metadata.ProjectId;
22+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2223
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.index.IndexSettings;
@@ -39,18 +40,21 @@ public class DeleteSourceAndAddDownsampleToDS implements ClusterStateTaskListene
3940
private static final Logger LOGGER = LogManager.getLogger(DeleteSourceAndAddDownsampleToDS.class);
4041
private final Settings settings;
4142
private ActionListener<Void> listener;
43+
private final ProjectId projectId;
4244
private final String dataStreamName;
4345
private final String sourceBackingIndex;
4446
private final String downsampleIndex;
4547

4648
public DeleteSourceAndAddDownsampleToDS(
4749
Settings settings,
50+
ProjectId projectId,
4851
String dataStreamName,
4952
String sourceBackingIndex,
5053
String downsampleIndex,
5154
ActionListener<Void> listener
5255
) {
5356
this.settings = settings;
57+
this.projectId = projectId;
5458
this.dataStreamName = dataStreamName;
5559
this.sourceBackingIndex = sourceBackingIndex;
5660
this.downsampleIndex = downsampleIndex;
@@ -64,7 +68,8 @@ ClusterState execute(ClusterState state) {
6468
downsampleIndex,
6569
dataStreamName
6670
);
67-
IndexMetadata downsampleIndexMeta = state.metadata().getProject().index(downsampleIndex);
71+
final var project = state.metadata().getProject(projectId);
72+
IndexMetadata downsampleIndexMeta = project.index(downsampleIndex);
6873
if (downsampleIndexMeta == null) {
6974
// the downsample index doesn't exist anymore so nothing to replace here
7075
LOGGER.trace(
@@ -77,9 +82,9 @@ ClusterState execute(ClusterState state) {
7782
);
7883
return state;
7984
}
80-
IndexAbstraction sourceIndexAbstraction = state.metadata().getProject().getIndicesLookup().get(sourceBackingIndex);
85+
IndexAbstraction sourceIndexAbstraction = project.getIndicesLookup().get(sourceBackingIndex);
8186
if (sourceIndexAbstraction == null) {
82-
DataStream dataStream = state.metadata().getProject().dataStreams().get(dataStreamName);
87+
DataStream dataStream = project.dataStreams().get(dataStreamName);
8388
// index was deleted in the meantime, so let's check if we can make sure the downsample index ends up in the
8489
// data stream (if not already there)
8590
if (dataStream != null
@@ -91,9 +96,9 @@ ClusterState execute(ClusterState state) {
9196
downsampleIndex,
9297
dataStreamName
9398
);
94-
Metadata.Builder newMetaData = Metadata.builder(state.metadata())
95-
.put(dataStream.addBackingIndex(state.metadata().getProject(), downsampleIndexMeta.getIndex()));
96-
return ClusterState.builder(state).metadata(newMetaData).build();
99+
ProjectMetadata.Builder newProject = ProjectMetadata.builder(project)
100+
.put(dataStream.addBackingIndex(project, downsampleIndexMeta.getIndex()));
101+
return ClusterState.builder(state).putProjectMetadata(newProject).build();
97102
}
98103
} else {
99104
DataStream sourceParentDataStream = sourceIndexAbstraction.getParentDataStream();
@@ -107,13 +112,13 @@ ClusterState execute(ClusterState state) {
107112
throw new IllegalStateException(errorMessage);
108113
}
109114

110-
IndexMetadata sourceIndexMeta = state.metadata().getProject().index(sourceBackingIndex);
115+
IndexMetadata sourceIndexMeta = project.index(sourceBackingIndex);
111116
assert sourceIndexMeta != null
112117
: "the source index abstraction exists in the indices lookup, so the index metadata must "
113118
+ "exist in the same cluster state metadata";
114119
// the source index exists so let's start by deleting it
115-
state = MetadataDeleteIndexService.deleteIndices(state, Set.of(sourceIndexMeta.getIndex()), settings);
116-
DataStream dataStream = state.metadata().getProject().dataStreams().get(dataStreamName);
120+
state = MetadataDeleteIndexService.deleteIndices(state.projectState(projectId), Set.of(sourceIndexMeta.getIndex()), settings);
121+
DataStream dataStream = state.metadata().getProject(projectId).dataStreams().get(dataStreamName);
117122
if (sourceParentDataStream != null) {
118123
assert sourceParentDataStream.getName().equals(dataStreamName)
119124
: "the backing index must be part of the provided data "
@@ -142,13 +147,14 @@ ClusterState execute(ClusterState state) {
142147
* This method is private as it fits into the flow of this cluster state task - i.e. the source index has already been removed from
143148
* the provided state.
144149
*/
145-
private static ClusterState addDownsampleIndexToDataStream(
150+
private ClusterState addDownsampleIndexToDataStream(
146151
ClusterState state,
147152
DataStream dataStream,
148153
IndexMetadata sourceIndexMeta,
149154
IndexMetadata downsampleIndexMeta
150155
) {
151-
Metadata.Builder newMetaData = Metadata.builder(state.getMetadata());
156+
final var project = state.metadata().getProject(projectId);
157+
ProjectMetadata.Builder newProject = ProjectMetadata.builder(project);
152158
TimeValue generationLifecycleDate = dataStream.getGenerationLifecycleDate(sourceIndexMeta);
153159
// the generation lifecycle date is null only for the write index
154160
// we fail already if attempting to delete/downsample the write index, so the following assertion just re-inforces that
@@ -159,10 +165,10 @@ private static ClusterState addDownsampleIndexToDataStream(
159165
generationLifecycleDate.millis()
160166
);
161167

162-
newMetaData.put(updatedDownsampleMetadata, true);
168+
newProject.put(updatedDownsampleMetadata, true);
163169
// we deleted the source already so let's add the downsample index to the data stream
164-
newMetaData.put(dataStream.addBackingIndex(state.metadata().getProject(), downsampleIndexMeta.getIndex()));
165-
return ClusterState.builder(state).metadata(newMetaData).build();
170+
newProject.put(dataStream.addBackingIndex(project, downsampleIndexMeta.getIndex()));
171+
return ClusterState.builder(state).putProjectMetadata(newProject).build();
166172
}
167173

168174
/**

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
1919
import org.elasticsearch.cluster.metadata.IndexMetadata;
2020
import org.elasticsearch.cluster.metadata.Metadata;
21+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2122
import org.elasticsearch.cluster.metadata.Template;
2223
import org.elasticsearch.common.compress.CompressedXContent;
2324
import org.elasticsearch.common.settings.Settings;
@@ -54,6 +55,22 @@ public static DataStream createDataStream(
5455
Settings.Builder backingIndicesSettings,
5556
@Nullable DataStreamLifecycle lifecycle,
5657
Long now
58+
) {
59+
var projectBuilder = builder.getProject(Metadata.DEFAULT_PROJECT_ID);
60+
if (projectBuilder == null) {
61+
projectBuilder = ProjectMetadata.builder(Metadata.DEFAULT_PROJECT_ID);
62+
builder.put(projectBuilder);
63+
}
64+
return createDataStream(projectBuilder, dataStreamName, backingIndicesCount, 0, backingIndicesSettings, lifecycle, now);
65+
}
66+
67+
public static DataStream createDataStream(
68+
ProjectMetadata.Builder builder,
69+
String dataStreamName,
70+
int backingIndicesCount,
71+
Settings.Builder backingIndicesSettings,
72+
@Nullable DataStreamLifecycle lifecycle,
73+
Long now
5774
) {
5875
return createDataStream(builder, dataStreamName, backingIndicesCount, 0, backingIndicesSettings, lifecycle, now);
5976
}
@@ -66,6 +83,31 @@ public static DataStream createDataStream(
6683
Settings.Builder backingIndicesSettings,
6784
@Nullable DataStreamLifecycle lifecycle,
6885
Long now
86+
) {
87+
var projectBuilder = builder.getProject(Metadata.DEFAULT_PROJECT_ID);
88+
if (projectBuilder == null) {
89+
projectBuilder = ProjectMetadata.builder(Metadata.DEFAULT_PROJECT_ID);
90+
builder.put(projectBuilder);
91+
}
92+
return createDataStream(
93+
projectBuilder,
94+
dataStreamName,
95+
backingIndicesCount,
96+
failureIndicesCount,
97+
backingIndicesSettings,
98+
lifecycle,
99+
now
100+
);
101+
}
102+
103+
public static DataStream createDataStream(
104+
ProjectMetadata.Builder builder,
105+
String dataStreamName,
106+
int backingIndicesCount,
107+
int failureIndicesCount,
108+
Settings.Builder backingIndicesSettings,
109+
@Nullable DataStreamLifecycle lifecycle,
110+
Long now
69111
) {
70112
final List<Index> backingIndices = new ArrayList<>();
71113
final List<Index> failureIndices = new ArrayList<>();

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/downsampling/DeleteSourceAndAddDownsampleIndexExecutorTests.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
public class DeleteSourceAndAddDownsampleIndexExecutorTests extends ESTestCase {
2626

2727
public void testExecutorNotifiesListenerAndReroutesAllocationService() {
28+
final var projectId = randomUniqueProjectId();
2829
String dataStreamName = randomAlphaOfLengthBetween(10, 100);
2930
String sourceIndex = randomAlphaOfLengthBetween(10, 100);
3031
String downsampleIndex = randomAlphaOfLengthBetween(10, 100);
@@ -34,18 +35,25 @@ public void testExecutorNotifiesListenerAndReroutesAllocationService() {
3435

3536
AtomicBoolean taskListenerCalled = new AtomicBoolean(false);
3637
executor.taskSucceeded(
37-
new DeleteSourceAndAddDownsampleToDS(Settings.EMPTY, dataStreamName, sourceIndex, downsampleIndex, new ActionListener<>() {
38-
@Override
39-
public void onResponse(Void unused) {
40-
taskListenerCalled.set(true);
41-
}
38+
new DeleteSourceAndAddDownsampleToDS(
39+
Settings.EMPTY,
40+
projectId,
41+
dataStreamName,
42+
sourceIndex,
43+
downsampleIndex,
44+
new ActionListener<>() {
45+
@Override
46+
public void onResponse(Void unused) {
47+
taskListenerCalled.set(true);
48+
}
4249

43-
@Override
44-
public void onFailure(Exception e) {
45-
logger.error(e.getMessage(), e);
46-
fail("unexpected exception: " + e.getMessage());
50+
@Override
51+
public void onFailure(Exception e) {
52+
logger.error(e.getMessage(), e);
53+
fail("unexpected exception: " + e.getMessage());
54+
}
4755
}
48-
}),
56+
),
4957
null
5058
);
5159
assertThat(taskListenerCalled.get(), is(true));

0 commit comments

Comments
 (0)