diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java index 70150d4f95bc9..c0af2c660420d 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Instant; import java.util.List; +import java.util.Locale; import java.util.function.Supplier; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -31,8 +32,9 @@ public class DownsampleIT extends DownsamplingIntegTestCase { public void testDownsamplingPassthroughDimensions() throws Exception { String dataStreamName = "metrics-foo"; // Set up template - putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """ + putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, String.format(Locale.ROOT, """ { + %s "properties": { "attributes": { "type": "passthrough", @@ -45,7 +47,7 @@ public void testDownsamplingPassthroughDimensions() throws Exception { } } } - """, null, null); + """, generateForceMergeMetadata()), null, null); // Create data stream by indexing documents final Instant now = Instant.now(); @@ -96,4 +98,14 @@ public void testDownsamplingPassthroughDimensions() throws Exception { assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig); } + + private String generateForceMergeMetadata() { + return switch (randomIntBetween(0, 4)) { + case 0 -> "\"_meta\": { \"downsample.forcemerge.enabled\": false},"; + case 1 -> "\"_meta\": { \"downsample.forcemerge.enabled\": true},"; + case 2 -> "\"_meta\": { \"downsample.forcemerge.enabled\": 4},"; + case 3 -> "\"_meta\": { \"downsample.forcemerge.enabled\": null},"; + default -> ""; + }; + } } diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java index 27de42447d3a0..7710222dce6aa 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java @@ -257,6 +257,8 @@ void assertDownsampleIndexFieldsAndDimensions(String sourceIndex, String downsam final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings); mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE); + assertThat(downsampleIndexMappings.get("_meta"), equalTo(sourceIndexMappings.get("_meta"))); + // Collect expected mappings for fields and dimensions Map metricFields = new HashMap<>(); Map dimensionFields = new HashMap<>(); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 8486963a5daee..9342a62cb849c 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -51,7 +51,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -98,6 +97,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.function.Supplier; import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDEX_PREFIX; @@ -117,10 +117,10 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc private final MasterServiceTaskQueue taskQueue; private final MetadataCreateIndexService metadataCreateIndexService; private final IndexScopedSettings indexScopedSettings; - private final ThreadContext threadContext; private final PersistentTasksService persistentTasksService; private final DownsampleMetrics downsampleMetrics; private final ProjectResolver projectResolver; + private final Supplier nowSupplier; private static final Set FORBIDDEN_SETTINGS = Set.of( IndexSettings.DEFAULT_PIPELINE.getKey(), @@ -161,6 +161,39 @@ public TransportDownsampleAction( IndexScopedSettings indexScopedSettings, PersistentTasksService persistentTasksService, DownsampleMetrics downsampleMetrics + ) { + this( + new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN), + indicesService, + clusterService, + transportService, + threadPool, + metadataCreateIndexService, + actionFilters, + projectResolver, + indexScopedSettings, + persistentTasksService, + downsampleMetrics, + clusterService.createTaskQueue("downsample", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR), + () -> client.threadPool().relativeTimeInMillis() + ); + } + + // For testing + TransportDownsampleAction( + Client client, + IndicesService indicesService, + ClusterService clusterService, + TransportService transportService, + ThreadPool threadPool, + MetadataCreateIndexService metadataCreateIndexService, + ActionFilters actionFilters, + ProjectResolver projectResolver, + IndexScopedSettings indexScopedSettings, + PersistentTasksService persistentTasksService, + DownsampleMetrics downsampleMetrics, + MasterServiceTaskQueue taskQueue, + Supplier nowSupplier ) { super( DownsampleAction.NAME, @@ -171,15 +204,15 @@ public TransportDownsampleAction( DownsampleAction.Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); - this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); + this.client = client; this.indicesService = indicesService; this.metadataCreateIndexService = metadataCreateIndexService; this.projectResolver = projectResolver; this.indexScopedSettings = indexScopedSettings; - this.threadContext = threadPool.getThreadContext(); - this.taskQueue = clusterService.createTaskQueue("downsample", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR); + this.taskQueue = taskQueue; this.persistentTasksService = persistentTasksService; this.downsampleMetrics = downsampleMetrics; + this.nowSupplier = nowSupplier; } private void recordSuccessMetrics(long startTime) { @@ -195,10 +228,7 @@ private void recordInvalidConfigurationMetrics(long startTime) { } private void recordOperation(long startTime, DownsampleMetrics.ActionStatus status) { - downsampleMetrics.recordOperation( - TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime).getMillis(), - status - ); + downsampleMetrics.recordOperation(TimeValue.timeValueMillis(nowSupplier.get() - startTime).getMillis(), status); } @Override @@ -208,11 +238,12 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - long startTime = client.threadPool().relativeTimeInMillis(); + long startTime = nowSupplier.get(); String sourceIndexName = request.getSourceIndex(); IndexNameExpressionResolver.assertExpressionHasNullOrDataSelector(sourceIndexName); IndexNameExpressionResolver.assertExpressionHasNullOrDataSelector(request.getTargetIndex()); - final IndicesAccessControl indicesAccessControl = threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY); + final IndicesAccessControl indicesAccessControl = threadPool.getThreadContext() + .getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY); if (indicesAccessControl != null) { final IndicesAccessControl.IndexAccessControl indexPermissions = indicesAccessControl.getIndexPermissions(sourceIndexName); if (indexPermissions != null) { @@ -269,7 +300,7 @@ protected void masterOperation( final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId()); // Short circuit if target index has been downsampled: final String downsampleIndexName = request.getTargetIndex(); - if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), startTime, projectMetadata, listener)) { + if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), startTime, true, projectMetadata, listener)) { logger.info("Skipping downsampling, because a previous execution already completed downsampling"); return; } @@ -306,6 +337,7 @@ protected void masterOperation( final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(sourceIndexMetadata); final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings); mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE); + final boolean forceMergeEnabled = isForceMergeEnabled(sourceIndexMappings); // Validate downsampling interval validateDownsamplingInterval(mapperService, request.getDownsampleConfig()); @@ -381,7 +413,8 @@ protected void masterOperation( startTime, metricFields, labelFields, - dimensionFields + dimensionFields, + forceMergeEnabled ); } else { recordFailureMetrics(startTime); @@ -394,6 +427,7 @@ protected void masterOperation( parentTask, request.getWaitTimeout(), startTime, + forceMergeEnabled, clusterService.state().metadata().getProject(projectMetadata.id()), listener )) { @@ -411,7 +445,8 @@ protected void masterOperation( startTime, metricFields, labelFields, - dimensionFields + dimensionFields, + forceMergeEnabled ); } else { recordFailureMetrics(startTime); @@ -422,6 +457,18 @@ protected void masterOperation( })); } + private boolean isForceMergeEnabled(Map sourceIndexMappings) { + if (sourceIndexMappings.containsKey("_meta")) { + if (sourceIndexMappings.get("_meta") instanceof Map metadataMap) { + var enabledForceMergeValue = metadataMap.get("downsample.forcemerge.enabled"); + if (enabledForceMergeValue instanceof Boolean enabledForceMerge) { + return enabledForceMerge; + } + } + } + return true; + } + /** * Shortcircuit when another downsample api invocation already completed successfully. */ @@ -430,6 +477,7 @@ private boolean canShortCircuit( TaskId parentTask, TimeValue waitTimeout, long startTime, + boolean forceMergeEnabled, ProjectMetadata projectMetadata, ActionListener listener ) { @@ -463,7 +511,8 @@ private boolean canShortCircuit( parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout, - startTime + startTime, + forceMergeEnabled ) ); return true; @@ -483,7 +532,8 @@ private void performShardDownsampling( long startTime, List metricFields, List labelFields, - List dimensionFields + List dimensionFields, + boolean forceMergeEnabled ) { final int numberOfShards = sourceIndexMetadata.getNumberOfShards(); final Index sourceIndex = sourceIndexMetadata.getIndex(); @@ -543,7 +593,8 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask nextStepListener = forceMergeEnabled + ? new ForceMergeActionListener(parentTask, downsampleIndexName, startTime, actionListener) + : new MeasurementActionListener(startTime, actionListener); // Mark downsample index as "completed successfully" ("index.downsample.status": "success") taskQueue.submitTask( "update-downsample-metadata [" + downsampleIndexName + "]", - new DownsampleClusterStateUpdateTask( - new ForceMergeActionListener(parentTask, downsampleIndexName, startTime, actionListener) - ) { + new DownsampleClusterStateUpdateTask(nextStepListener) { @Override public ClusterState execute(ClusterState currentState) { @@ -1124,18 +1192,16 @@ class ForceMergeActionListener implements ActionListener { final ActionListener actionListener; private final TaskId parentTask; private final String downsampleIndexName; - private final long startTime; ForceMergeActionListener( final TaskId parentTask, final String downsampleIndexName, final long startTime, - final ActionListener onFailure + final ActionListener actionListener ) { this.parentTask = parentTask; this.downsampleIndexName = downsampleIndexName; - this.startTime = startTime; - this.actionListener = onFailure; + this.actionListener = new MeasurementActionListener(startTime, actionListener); } @Override @@ -1143,19 +1209,43 @@ public void onResponse(final AcknowledgedResponse response) { ForceMergeRequest request = new ForceMergeRequest(downsampleIndexName); request.maxNumSegments(1); request.setParentTask(parentTask); - client.admin().indices().forceMerge(request, ActionListener.wrap(mergeIndexResp -> { - actionListener.onResponse(AcknowledgedResponse.TRUE); - recordSuccessMetrics(startTime); - }, t -> { - /* - * At this point downsample index has been created - * successfully even if force merge failed. - * So, we should not fail the downsample operation. - */ - logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t); - actionListener.onResponse(AcknowledgedResponse.TRUE); - recordSuccessMetrics(startTime); - })); + client.admin() + .indices() + .forceMerge(request, ActionListener.wrap(mergeIndexResp -> { actionListener.onResponse(AcknowledgedResponse.TRUE); }, t -> { + /* + * At this point downsample index has been created + * successfully even if force merge failed. + * So, we should not fail the downsample operation. + */ + logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t); + actionListener.onResponse(AcknowledgedResponse.TRUE); + })); + } + + @Override + public void onFailure(Exception e) { + this.actionListener.onFailure(e); + } + + } + + /** + * Records measurements + */ + class MeasurementActionListener implements ActionListener { + + final ActionListener actionListener; + private final long startTime; + + MeasurementActionListener(final long startTime, final ActionListener onFailure) { + this.startTime = startTime; + this.actionListener = onFailure; + } + + @Override + public void onResponse(final AcknowledgedResponse response) { + recordSuccessMetrics(startTime); + actionListener.onResponse(AcknowledgedResponse.TRUE); } @Override diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index 1b2cc32e12a65..a968198098b39 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -7,23 +7,423 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.downsample.DownsampleAction; +import org.elasticsearch.action.downsample.DownsampleConfig; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.allocation.DataTier; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; +import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.junit.After; +import org.junit.Before; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.UUID; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TransportDownsampleActionTests extends ESTestCase { + + @Mock + private ClusterService clusterService; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Client client; + @Mock + private ThreadPool threadPool; + @Mock + private DownsampleMetrics downsampleMetrics; + @Mock + private ProjectResolver projectResolver; + @Mock + private PersistentTasksService persistentTaskService; + @Mock + private IndicesService indicesService; + @Mock + private MasterServiceTaskQueue taskQueue; + @Mock + private MapperService mapperService; + + private static final String TEMPLATE_MAPPING = """ + { + "_doc": { + %s + "properties": { + "attributes.host": { + "type": "keyword", + "time_series_dimension": true + }, + "metrics.cpu_usage": { + "type": "double", + "time_series_metric": "counter" + } + } + } + }"""; + + private static final String NO_METADATA_MAPPING = String.format(Locale.ROOT, TEMPLATE_MAPPING, ""); + private static final String OTHER_METADATA_MAPPING = String.format( + Locale.ROOT, + TEMPLATE_MAPPING, + "\"_meta\":{\"downsample.forcemerge.enabled\":100}," + ); + private static final String FORCE_MERGE_ENABLED_MAPPING = String.format( + Locale.ROOT, + TEMPLATE_MAPPING, + "\"_meta\":{\"downsample.forcemerge.enabled\":true}," + ); + private static final String FORCE_MERGE_DISABLED_MAPPING = String.format( + Locale.ROOT, + TEMPLATE_MAPPING, + "\"_meta\":{\"downsample.forcemerge.enabled\":false}," + ); + + private TransportDownsampleAction action; + + private AutoCloseable mocks; + private String sourceIndex; + private String targetIndex; + private int primaryShards; + private int replicaShards; + private ProjectId projectId; + private Task task; + private IndicesAdminClient indicesAdminClient; + + @Before + public void setUp() throws Exception { + super.setUp(); + mocks = MockitoAnnotations.openMocks(this); + action = new TransportDownsampleAction( + client, + indicesService, + clusterService, + mock(TransportService.class), + threadPool, + mock(MetadataCreateIndexService.class), + new ActionFilters(Set.of()), + projectResolver, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + persistentTaskService, + downsampleMetrics, + taskQueue, + System::currentTimeMillis + ); + indicesAdminClient = client.admin().indices(); + sourceIndex = "source-index"; + targetIndex = "downsample-index"; + primaryShards = randomIntBetween(1, 5); + replicaShards = randomIntBetween(0, 3); + projectId = randomProjectIdOrDefault(); + task = new Task(1, "type", "action", "description", null, null); + + var threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(clusterService.localNode()).thenReturn(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "node_name")); + when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + + Answer mockBroadcastResponse = invocation -> { + @SuppressWarnings("unchecked") + var listener = (ActionListener) invocation.getArgument(1, ActionListener.class); + listener.onResponse(new BroadcastResponse(primaryShards, primaryShards, 0, List.of())); + return null; + }; + doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any()); + doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any()); + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); + MappedFieldType timestampFieldMock = mock(MappedFieldType.class); + when(timestampFieldMock.meta()).thenReturn(Map.of()); + when(mapperService.fieldType(any())).thenReturn(timestampFieldMock); + when(mapperService.mappingLookup()).thenReturn(MappingLookup.EMPTY); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + mocks.close(); + } + + public void testDownsamplingWithForceMerge() throws IOException { + String mapping = switch (randomIntBetween(0, 2)) { + case 0 -> NO_METADATA_MAPPING; + case 1 -> OTHER_METADATA_MAPPING; + default -> FORCE_MERGE_ENABLED_MAPPING; + }; + downsample(mapping); + verify(indicesAdminClient).forceMerge(any(), any()); + } + + public void testDownsamplingSkipsForceMerge() throws IOException { + downsample(FORCE_MERGE_DISABLED_MAPPING); + verify(indicesAdminClient, never()).forceMerge(any(), any()); + } + + private void downsample(String mapping) throws IOException { + mockGetMapping(mapping); + mockMergedMapping(mapping); + + var projectMetadata = ProjectMetadata.builder(projectId) + .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) + .build(); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(projectMetadata) + .blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + updateTask.listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); + Answer mockPersistentTask = invocation -> { + ActionListener> listener = invocation.getArgument(4); + PersistentTasksCustomMetadata.PersistentTask task = mock(PersistentTasksCustomMetadata.PersistentTask.class); + when(task.getId()).thenReturn(randomAlphaOfLength(10)); + DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.COMPLETED, + null + ); + when(task.getState()).thenReturn(runningTaskState); + listener.onResponse(task); + return null; + }; + doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any()); + doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any()); + doAnswer(invocation -> { + var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + + PlainActionFuture listener = new PlainActionFuture<>(); + action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.ONE_HOUR, + new DownsampleConfig(new DateHistogramInterval("5m")) + ), + clusterState, + listener + ); + safeGet(listener); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + } + + public void testDownsamplingForceMergeWithShortCircuitAfterCreation() { + String mapping = switch (randomIntBetween(0, 3)) { + case 0 -> NO_METADATA_MAPPING; + case 1 -> OTHER_METADATA_MAPPING; + case 2 -> FORCE_MERGE_ENABLED_MAPPING; + default -> FORCE_MERGE_DISABLED_MAPPING; + }; + mockGetMapping(mapping); + + var projectMetadata = ProjectMetadata.builder(projectId) + .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) + .put(createTargetIndexMetadata(targetIndex, primaryShards, replicaShards)) + .build(); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(projectMetadata) + .blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + + PlainActionFuture listener = new PlainActionFuture<>(); + action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.ONE_HOUR, + new DownsampleConfig(new DateHistogramInterval("5m")) + ), + clusterState, + listener + ); + safeGet(listener); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verify(indicesAdminClient).forceMerge(any(), any()); + } + + public void testDownsamplingForceMergeWithShortCircuitDuringCreation() throws IOException { + String mapping = switch (randomIntBetween(0, 2)) { + case 0 -> NO_METADATA_MAPPING; + case 1 -> OTHER_METADATA_MAPPING; + default -> FORCE_MERGE_ENABLED_MAPPING; + }; + downsampleWithShortCircuitDuringCreation(mapping); + verify(indicesAdminClient).forceMerge(any(), any()); + } + + public void testDownsamplingSkipsForceMergeWithShortCircuitDuringCreation() throws IOException { + downsampleWithShortCircuitDuringCreation(FORCE_MERGE_DISABLED_MAPPING); + verify(indicesAdminClient, never()).forceMerge(any(), any()); + } + + public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOException { + mockGetMapping(mapping); + mockMergedMapping(mapping); + + var projectMetadata = ProjectMetadata.builder(projectId) + .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) + .build(); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(projectMetadata) + .blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + updateTask.listener.onFailure(new ResourceAlreadyExistsException(targetIndex)); + return null; + }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); + when(clusterService.state()).thenReturn( + ClusterState.builder(clusterState) + .putProjectMetadata( + ProjectMetadata.builder(projectMetadata).put(createTargetIndexMetadata(targetIndex, primaryShards, replicaShards)) + ) + .build() + ); + + PlainActionFuture listener = new PlainActionFuture<>(); + action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.ONE_HOUR, + new DownsampleConfig(new DateHistogramInterval("5m")) + ), + clusterState, + listener + ); + safeGet(listener); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + } + + private void mockGetMapping(String mapping) { + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + var listener = (ActionListener) invocation.getArgument(1, ActionListener.class); + listener.onResponse( + new GetMappingsResponse( + Map.of(sourceIndex, new MappingMetadata("_doc", XContentHelper.convertToMap(JsonXContent.jsonXContent, mapping, true))) + ) + ); + return null; + }).when(indicesAdminClient).getMappings(any(), any()); + } + + private void mockMergedMapping(String mapping) throws IOException { + DocumentMapper documentMapper = mock(DocumentMapper.class); + when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON(mapping)); + when(mapperService.merge(anyString(), any(CompressedXContent.class), any())).thenReturn(documentMapper); + } + + private IndexMetadata.Builder createSourceIndexMetadata(String sourceIndex, int primaryShards, int replicaShards) { + return IndexMetadata.builder(sourceIndex) + .settings( + indexSettings(IndexVersion.current(), randomUUID(), primaryShards, replicaShards).put( + IndexSettings.MODE.getKey(), + IndexMode.TIME_SERIES.getName() + ) + .put("index.routing_path", "dimensions") + .put(IndexMetadata.SETTING_BLOCKS_WRITE, true) + .put("index.time_series.start_time", "2021-01-01T00:00:00Z") + .put("index.time_series.end_time", "2022-01-01T00:00:00Z") + ); + } + + private IndexMetadata.Builder createTargetIndexMetadata(String targetIndex, int primaryShards, int replicaShards) { + return IndexMetadata.builder(targetIndex) + .settings( + indexSettings(IndexVersion.current(), randomUUID(), primaryShards, replicaShards).put( + IndexSettings.MODE.getKey(), + IndexMode.TIME_SERIES.getName() + ) + .put("index.routing_path", "dimensions") + .put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), IndexMetadata.DownsampleTaskStatus.STARTED) + .put(IndexMetadata.SETTING_BLOCKS_WRITE, true) + ); + + } + public void testCopyIndexMetadata() { // GIVEN final List tiers = List.of(DataTier.DATA_HOT, DataTier.DATA_WARM, DataTier.DATA_COLD, DataTier.DATA_CONTENT);