|
21 | 21 | import org.elasticsearch.cluster.ProjectState; |
22 | 22 | import org.elasticsearch.cluster.block.ClusterBlockException; |
23 | 23 | import org.elasticsearch.cluster.block.ClusterBlockLevel; |
24 | | -import org.elasticsearch.cluster.metadata.DataStream; |
25 | 24 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; |
26 | | -import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; |
| 25 | +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; |
27 | 26 | import org.elasticsearch.cluster.metadata.ProjectMetadata; |
28 | 27 | import org.elasticsearch.cluster.project.ProjectResolver; |
29 | 28 | import org.elasticsearch.cluster.service.ClusterService; |
|
32 | 31 | import org.elasticsearch.common.settings.Settings; |
33 | 32 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
34 | 33 | import org.elasticsearch.core.SuppressForbidden; |
35 | | -import org.elasticsearch.index.Index; |
36 | 34 | import org.elasticsearch.indices.SystemIndices; |
37 | 35 | import org.elasticsearch.injection.guice.Inject; |
38 | | -import org.elasticsearch.snapshots.SnapshotInProgressException; |
39 | | -import org.elasticsearch.snapshots.SnapshotsService; |
40 | 36 | import org.elasticsearch.tasks.Task; |
41 | 37 | import org.elasticsearch.threadpool.ThreadPool; |
42 | 38 | import org.elasticsearch.transport.TransportService; |
|
46 | 42 | import java.util.List; |
47 | 43 | import java.util.Set; |
48 | 44 | import java.util.function.Consumer; |
| 45 | +import java.util.stream.Collectors; |
49 | 46 |
|
50 | 47 | import static org.elasticsearch.action.datastreams.DataStreamsActionUtil.getDataStreamNames; |
51 | 48 |
|
@@ -155,34 +152,11 @@ static ClusterState removeDataStream( |
155 | 152 | } |
156 | 153 | } |
157 | 154 |
|
158 | | - Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreams); |
159 | | - if (snapshottingDataStreams.isEmpty() == false) { |
160 | | - throw new SnapshotInProgressException( |
161 | | - "Cannot delete data streams that are being snapshotted: " |
162 | | - + snapshottingDataStreams |
163 | | - + ". Try again after snapshot finishes or cancel the currently running snapshot." |
164 | | - ); |
165 | | - } |
166 | | - |
167 | | - Set<Index> backingIndicesToRemove = new HashSet<>(); |
168 | | - for (String dataStreamName : dataStreams) { |
169 | | - DataStream dataStream = project.dataStreams().get(dataStreamName); |
170 | | - assert dataStream != null; |
171 | | - backingIndicesToRemove.addAll(dataStream.getIndices()); |
172 | | - backingIndicesToRemove.addAll(dataStream.getFailureIndices()); |
173 | | - } |
174 | | - |
175 | | - // first delete the data streams and then the indices: |
176 | | - // (this to avoid data stream validation from failing when deleting an index that is part of a data stream |
177 | | - // without updating the data stream) |
178 | | - // TODO: change order when delete index api also updates the data stream the index to be removed is member of |
179 | | - ClusterState newState = projectState.updatedState(builder -> { |
180 | | - for (String ds : dataStreams) { |
181 | | - LOGGER.info("removing data stream [{}]", ds); |
182 | | - builder.removeDataStream(ds); |
183 | | - } |
184 | | - }); |
185 | | - return MetadataDeleteIndexService.deleteIndices(newState.projectState(projectState.projectId()), backingIndicesToRemove, settings); |
| 155 | + return MetadataDataStreamsService.deleteDataStreams( |
| 156 | + projectState, |
| 157 | + dataStreams.stream().map(project.dataStreams()::get).collect(Collectors.toSet()), |
| 158 | + settings |
| 159 | + ); |
186 | 160 | } |
187 | 161 |
|
188 | 162 | @Override |
|
0 commit comments