|
28 | 28 | import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
29 | 29 | import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
|
30 | 30 | import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
| 31 | +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; |
31 | 32 | import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
|
32 | 33 | import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
|
33 | 34 | import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
|
|
40 | 41 | import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
|
41 | 42 | import org.elasticsearch.action.datastreams.GetDataStreamAction;
|
42 | 43 | import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.DataStreamInfo;
|
| 44 | +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; |
43 | 45 | import org.elasticsearch.action.delete.DeleteRequest;
|
44 | 46 | import org.elasticsearch.action.index.IndexRequest;
|
45 | 47 | import org.elasticsearch.action.index.IndexResponse;
|
|
50 | 52 | import org.elasticsearch.action.search.SearchResponse;
|
51 | 53 | import org.elasticsearch.action.update.UpdateRequest;
|
52 | 54 | import org.elasticsearch.cluster.ClusterState;
|
| 55 | +import org.elasticsearch.cluster.ClusterStateTaskExecutor; |
| 56 | +import org.elasticsearch.cluster.ClusterStateUpdateTask; |
53 | 57 | import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
54 | 58 | import org.elasticsearch.cluster.metadata.AliasMetadata;
|
55 | 59 | import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
56 | 60 | import org.elasticsearch.cluster.metadata.DataStream;
|
| 61 | +import org.elasticsearch.cluster.metadata.DataStreamAction; |
57 | 62 | import org.elasticsearch.cluster.metadata.DataStreamAlias;
|
58 | 63 | import org.elasticsearch.cluster.metadata.IndexMetadata;
|
| 64 | +import org.elasticsearch.cluster.metadata.Metadata; |
59 | 65 | import org.elasticsearch.cluster.metadata.Template;
|
| 66 | +import org.elasticsearch.cluster.service.ClusterService; |
60 | 67 | import org.elasticsearch.common.Strings;
|
61 | 68 | import org.elasticsearch.common.compress.CompressedXContent;
|
62 | 69 | import org.elasticsearch.common.settings.Settings;
|
63 | 70 | import org.elasticsearch.core.Nullable;
|
| 71 | +import org.elasticsearch.index.Index; |
64 | 72 | import org.elasticsearch.index.IndexNotFoundException;
|
65 | 73 | import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
|
66 | 74 | import org.elasticsearch.index.mapper.DateFieldMapper;
|
|
88 | 96 | import java.util.Map;
|
89 | 97 | import java.util.Optional;
|
90 | 98 | import java.util.Set;
|
| 99 | +import java.util.concurrent.CountDownLatch; |
91 | 100 | import java.util.concurrent.CyclicBarrier;
|
92 | 101 | import java.util.concurrent.ExecutionException;
|
93 | 102 | import java.util.concurrent.atomic.AtomicBoolean;
|
| 103 | +import java.util.concurrent.atomic.AtomicReference; |
94 | 104 | import java.util.stream.Collectors;
|
95 | 105 | import java.util.stream.IntStream;
|
96 | 106 |
|
@@ -1711,6 +1721,74 @@ public void testCreateIndexAliasWithSameNameAsDataStreamAlias() throws Exception
|
1711 | 1721 | }
|
1712 | 1722 | }
|
1713 | 1723 |
|
| 1724 | + public void testRemoveGhostReference() throws Exception { |
| 1725 | + String dataStreamName = "logs-es"; |
| 1726 | + DataStreamIT.putComposableIndexTemplate("my-template", List.of("logs-*")); |
| 1727 | + var request = new CreateDataStreamAction.Request(dataStreamName); |
| 1728 | + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, request).actionGet()); |
| 1729 | + assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); |
| 1730 | + var indicesStatsResponse = client().admin().indices().stats(new IndicesStatsRequest()).actionGet(); |
| 1731 | + assertThat(indicesStatsResponse.getIndices().size(), equalTo(2)); |
| 1732 | + ClusterState before = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); |
| 1733 | + assertThat(before.getMetadata().dataStreams().get(dataStreamName).getIndices(), hasSize(2)); |
| 1734 | + |
| 1735 | + CountDownLatch latch = new CountDownLatch(1); |
| 1736 | + AtomicReference<DataStream> brokenDataStreamHolder = new AtomicReference<>(); |
| 1737 | + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) |
| 1738 | + .submitStateUpdateTask(getTestName(), new ClusterStateUpdateTask() { |
| 1739 | + @Override |
| 1740 | + public ClusterState execute(ClusterState currentState) throws Exception { |
| 1741 | + DataStream original = currentState.getMetadata().dataStreams().get(dataStreamName); |
| 1742 | + DataStream broken = new DataStream( |
| 1743 | + original.getName(), |
| 1744 | + List.of(new Index(original.getIndices().get(0).getName(), "broken"), original.getIndices().get(1)), |
| 1745 | + original.getGeneration(), |
| 1746 | + original.getMetadata(), |
| 1747 | + original.isHidden(), |
| 1748 | + original.isReplicated(), |
| 1749 | + original.isSystem(), |
| 1750 | + original.isAllowCustomRouting(), |
| 1751 | + original.getIndexMode() |
| 1752 | + ); |
| 1753 | + brokenDataStreamHolder.set(broken); |
| 1754 | + return ClusterState.builder(currentState) |
| 1755 | + .metadata(Metadata.builder(currentState.getMetadata()).put(broken).build()) |
| 1756 | + .build(); |
| 1757 | + } |
| 1758 | + |
| 1759 | + @Override |
| 1760 | + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { |
| 1761 | + latch.countDown(); |
| 1762 | + } |
| 1763 | + |
| 1764 | + @Override |
| 1765 | + public void onFailure(Exception e) { |
| 1766 | + logger.error("error while adding a broken data stream", e); |
| 1767 | + latch.countDown(); |
| 1768 | + } |
| 1769 | + }, ClusterStateTaskExecutor.unbatched()); |
| 1770 | + latch.await(); |
| 1771 | + var ghostReference = brokenDataStreamHolder.get().getIndices().get(0); |
| 1772 | + |
| 1773 | + // Many APIs fail with NPE, because of broken data stream: |
| 1774 | + expectThrows(NullPointerException.class, () -> client().admin().indices().stats(new IndicesStatsRequest()).actionGet()); |
| 1775 | + expectThrows(NullPointerException.class, () -> client().search(new SearchRequest()).actionGet()); |
| 1776 | + |
| 1777 | + assertAcked( |
| 1778 | + client().execute( |
| 1779 | + ModifyDataStreamsAction.INSTANCE, |
| 1780 | + new ModifyDataStreamsAction.Request(List.of(DataStreamAction.removeBackingIndex(dataStreamName, ghostReference.getName()))) |
| 1781 | + ).actionGet() |
| 1782 | + ); |
| 1783 | + ClusterState after = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); |
| 1784 | + assertThat(after.getMetadata().dataStreams().get(dataStreamName).getIndices(), hasSize(1)); |
| 1785 | + // Data stream resolves now to one backing index. |
| 1786 | + // Note, that old backing index still exists and has been unhidden. |
| 1787 | + // The modify data stream api only fixed the data stream by removing a broken reference to a backing index. |
| 1788 | + indicesStatsResponse = client().admin().indices().stats(new IndicesStatsRequest()).actionGet(); |
| 1789 | + assertThat(indicesStatsResponse.getIndices().size(), equalTo(2)); |
| 1790 | + } |
| 1791 | + |
1714 | 1792 | private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) {
|
1715 | 1793 | verifyResolvability(dataStream, requestBuilder, fail, 0);
|
1716 | 1794 | }
|
|
0 commit comments