From ec186201d3d1e4e413cb9163e67654213e2256a0 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 21 May 2025 17:24:05 -0500 Subject: [PATCH 01/18] Adding Mappings to Data Streams --- .../datastreams/DataStreamsPlugin.java | 10 + .../TransportGetDataStreamMappingsAction.java | 90 +++++++ .../action/TransportGetDataStreamsAction.java | 19 +- ...ansportUpdateDataStreamMappingsAction.java | 228 ++++++++++++++++++ .../rest/RestGetDataStreamMappingsAction.java | 51 ++++ .../RestUpdateDataStreamMappingsAction.java | 73 ++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/action/ActionListener.java | 1 + .../datastreams/GetDataStreamAction.java | 14 ++ .../GetDataStreamMappingsAction.java | 144 +++++++++++ .../UpdateDataStreamMappingsAction.java | 220 +++++++++++++++++ .../metadata/ComposableIndexTemplate.java | 62 +++++ .../cluster/metadata/DataStream.java | 70 +++++- .../MetadataCreateDataStreamService.java | 2 + .../metadata/MetadataDataStreamsService.java | 63 +++++ .../cluster/metadata/DataStreamTests.java | 21 +- .../metadata/DataStreamTestHelper.java | 14 ++ 17 files changed, 1073 insertions(+), 10 deletions(-) create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamMappingsAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestUpdateDataStreamMappingsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index c138cc7538f8a..c097111bfe882 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -14,11 +14,13 @@ import org.elasticsearch.action.datastreams.DataStreamsStatsAction; import org.elasticsearch.action.datastreams.DeleteDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction; import org.elasticsearch.action.datastreams.GetDataStreamSettingsAction; import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; import org.elasticsearch.action.datastreams.PutDataStreamOptionsAction; +import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction; import org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; @@ -38,11 +40,13 @@ import org.elasticsearch.datastreams.action.TransportCreateDataStreamAction; import org.elasticsearch.datastreams.action.TransportDataStreamsStatsAction; import org.elasticsearch.datastreams.action.TransportDeleteDataStreamAction; +import org.elasticsearch.datastreams.action.TransportGetDataStreamMappingsAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamSettingsAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.action.TransportMigrateToDataStreamAction; import org.elasticsearch.datastreams.action.TransportModifyDataStreamsAction; import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction; +import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction; import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; @@ -71,11 +75,13 @@ import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction; import org.elasticsearch.datastreams.rest.RestDeleteDataStreamAction; +import org.elasticsearch.datastreams.rest.RestGetDataStreamMappingsAction; import org.elasticsearch.datastreams.rest.RestGetDataStreamSettingsAction; import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction; import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction; import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction; import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction; +import org.elasticsearch.datastreams.rest.RestUpdateDataStreamMappingsAction; import org.elasticsearch.datastreams.rest.RestUpdateDataStreamSettingsAction; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.health.HealthIndicatorService; @@ -248,6 +254,8 @@ public List getActions() { actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class)); actions.add(new ActionHandler(GetDataStreamSettingsAction.INSTANCE, TransportGetDataStreamSettingsAction.class)); actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class)); + actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class)); + actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class)); return actions; } @@ -286,6 +294,8 @@ public List getRestHandlers( if (DataStream.LOGS_STREAM_FEATURE_FLAG) { handlers.add(new RestGetDataStreamSettingsAction()); handlers.add(new RestUpdateDataStreamSettingsAction()); + handlers.add(new RestGetDataStreamMappingsAction()); + handlers.add(new RestUpdateDataStreamMappingsAction()); } return handlers; } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java new file mode 100644 index 0000000000000..309f60301ef3d --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; +import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TransportGetDataStreamMappingsAction extends TransportLocalProjectMetadataAction< + GetDataStreamMappingsAction.Request, + GetDataStreamMappingsAction.Response> { + private final IndexNameExpressionResolver indexNameExpressionResolver; + + @Inject + public TransportGetDataStreamMappingsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + GetSettingsAction.NAME, + actionFilters, + transportService.getTaskManager(), + clusterService, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + projectResolver + ); + this.indexNameExpressionResolver = indexNameExpressionResolver; + } + + @Override + protected ClusterBlockException checkBlock(GetDataStreamMappingsAction.Request request, ProjectState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void localClusterStateOperation( + Task task, + GetDataStreamMappingsAction.Request request, + ProjectState project, + ActionListener listener + ) throws Exception { + List dataStreamNames = indexNameExpressionResolver.dataStreamNames( + clusterService.state(), + IndicesOptions.DEFAULT, + request.indices() + ); + Map dataStreamMap = project.metadata().dataStreams(); + List responseList = new ArrayList<>(dataStreamNames.size()); + for (String dataStreamName : dataStreamNames) { + DataStream dataStream = dataStreamMap.get(dataStreamName); + responseList.add( + new GetDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + dataStream.getMappings(), + dataStream.getEffectiveMappings(project.metadata()) + ) + ); + } + listener.onResponse(new GetDataStreamMappingsAction.Response(responseList)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java index 021667f509965..db40e8674895f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java @@ -54,6 +54,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -261,13 +262,17 @@ static GetDataStreamAction.Response innerOperation( Settings settings = dataStream.getEffectiveSettings(state.metadata()); ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME); if (indexMode == null && state.metadata().templatesV2().get(indexTemplate) != null) { - indexMode = resolveMode( - state, - indexSettingProviders, - dataStream, - settings, - dataStream.getEffectiveIndexTemplate(state.metadata()) - ); + try { + indexMode = resolveMode( + state, + indexSettingProviders, + dataStream, + settings, + dataStream.getEffectiveIndexTemplate(state.metadata()) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } } indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings); } else { diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java new file mode 100644 index 0000000000000..76db8a2214e17 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java @@ -0,0 +1,228 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.datastreams.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CountDownActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; +import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class TransportUpdateDataStreamMappingsAction extends TransportMasterNodeAction< + UpdateDataStreamMappingsAction.Request, + UpdateDataStreamMappingsAction.Response> { + private static final Logger logger = LogManager.getLogger(TransportUpdateDataStreamMappingsAction.class); + private static final Set APPLY_TO_BACKING_INDICES = Set.of("index.lifecycle.name"); + private static final Set APPLY_TO_DATA_STREAM_ONLY = Set.of("index.number_of_shards"); + private final MetadataDataStreamsService metadataDataStreamsService; + private final MetadataUpdateSettingsService updateSettingsService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SystemIndices systemIndices; + private final ProjectResolver projectResolver; + private final SettingsFilter settingsFilter; + + @Inject + public TransportUpdateDataStreamMappingsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + MetadataDataStreamsService metadataDataStreamsService, + MetadataUpdateSettingsService updateSettingsService, + IndexNameExpressionResolver indexNameExpressionResolver, + SystemIndices systemIndices, + SettingsFilter settingsFilter + ) { + super( + UpdateDataStreamMappingsAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + UpdateDataStreamMappingsAction.Request::new, + UpdateDataStreamMappingsAction.Response::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.projectResolver = projectResolver; + this.metadataDataStreamsService = metadataDataStreamsService; + this.updateSettingsService = updateSettingsService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.systemIndices = systemIndices; + this.settingsFilter = settingsFilter; + } + + @Override + protected void masterOperation( + Task task, + UpdateDataStreamMappingsAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + List dataStreamNames = indexNameExpressionResolver.dataStreamNames( + clusterService.state(), + IndicesOptions.DEFAULT, + request.indices() + ); + List dataStreamSettingsResponse = new ArrayList<>(); + CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size() + 1, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + listener.onResponse(new UpdateDataStreamMappingsAction.Response(dataStreamSettingsResponse)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + countDownListener.onResponse(null); + for (String dataStreamName : dataStreamNames) { + updateSingleDataStream( + dataStreamName, + request.getMappings(), + request.masterNodeTimeout(), + request.ackTimeout(), + new ActionListener<>() { + @Override + public void onResponse(UpdateDataStreamMappingsAction.DataStreamMappingsResponse dataStreamResponse) { + dataStreamSettingsResponse.add(dataStreamResponse); + countDownListener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + dataStreamSettingsResponse.add( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + e.getMessage(), + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + countDownListener.onResponse(null); + } + } + ); + } + } + + private void updateSingleDataStream( + String dataStreamName, + CompressedXContent mappingsOverrides, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + logger.debug("updating mappings for {}", dataStreamName); + if (systemIndices.isSystemDataStream(dataStreamName)) { + listener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + "Cannot update a system data stream", + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + return; + } + metadataDataStreamsService.updateMappings( + projectResolver.getProjectId(), + masterNodeTimeout, + ackTimeout, + dataStreamName, + mappingsOverrides, + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (acknowledgedResponse.isAcknowledged()) { + DataStream dataStream = clusterService.state() + .projectState(projectResolver.getProjectId()) + .metadata() + .dataStreams() + .get(dataStreamName); + try { + listener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + true, + null, + mappingsOverrides, + dataStream.getEffectiveMappings( + clusterService.state().projectState(projectResolver.getProjectId()).metadata() + ) + ) + ); + } catch (IOException e) { + listener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + e.getMessage(), + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + } + } else { + listener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + "Updating mappings not accepted for unknown reasons", + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + } + + @Override + protected ClusterBlockException checkBlock(UpdateDataStreamMappingsAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamMappingsAction.java new file mode 100644 index 0000000000000..202017d96a4c0 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamMappingsAction.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.rest; + +import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +@ServerlessScope(Scope.PUBLIC) +public class RestGetDataStreamMappingsAction extends BaseRestHandler { + @Override + public String getName() { + return "gett_data_stream_mappings_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_data_stream/{name}/_mappings")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + GetDataStreamMappingsAction.Request getDataStreamRequest = new GetDataStreamMappingsAction.Request( + RestUtils.getMasterNodeTimeout(request) + ).indices(Strings.splitStringByCommaToArray(request.param("name"))); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + GetDataStreamMappingsAction.INSTANCE, + getDataStreamRequest, + new RestRefCountedChunkedToXContentListener<>(channel) + ); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestUpdateDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestUpdateDataStreamMappingsAction.java new file mode 100644 index 0000000000000..9777b05cb9a40 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestUpdateDataStreamMappingsAction.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.datastreams.rest; + +import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Base64; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; + +@ServerlessScope(Scope.PUBLIC) +public class RestUpdateDataStreamMappingsAction extends BaseRestHandler { + + @Override + public String getName() { + return "update_data_stream_mappings_action"; + } + + @Override + public List routes() { + return List.of(new Route(PUT, "/_data_stream/{name}/_mappings")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + CompressedXContent mappings; + try (XContentParser parser = request.contentParser()) { + XContentParser.Token token = parser.nextToken(); + if (token == XContentParser.Token.VALUE_STRING) { + mappings = new CompressedXContent(Base64.getDecoder().decode(parser.text())); + } else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) { + mappings = new CompressedXContent(parser.binaryValue()); + } else if (token == XContentParser.Token.START_OBJECT) { + mappings = new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered()))); + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + } + boolean dryRun = request.paramAsBoolean("dry_run", false); + + UpdateDataStreamMappingsAction.Request updateDataStreamMappingsRequest = new UpdateDataStreamMappingsAction.Request( + mappings, + dryRun, + RestUtils.getMasterNodeTimeout(request), + RestUtils.getAckTimeout(request) + ).indices(Strings.splitStringByCommaToArray(request.param("name"))); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + UpdateDataStreamMappingsAction.INSTANCE, + updateDataStreamMappingsRequest, + new RestRefCountedChunkedToXContentListener<>(channel) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 2c119df3f8f37..2d0403a550db4 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -261,6 +261,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME = def(9_077_0_00); public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED = def(9_078_0_00); public static final TransportVersion NODES_STATS_SUPPORTS_MULTI_PROJECT = def(9_079_0_00); + public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_080_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index b7e8de1f4fa36..13a066ae5d965 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -21,6 +21,7 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.transport.LeakTracker; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 1706a8fce493a..266bfc757ffe2 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -37,6 +38,7 @@ import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.time.Instant; @@ -214,6 +216,7 @@ public static class DataStreamInfo implements SimpleDiffable, To public static final ParseField STATUS_FIELD = new ParseField("status"); public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); public static final ParseField SETTINGS_FIELD = new ParseField("settings"); + public static final ParseField MAPPINGS_FIELD = new ParseField("mappings"); public static final ParseField PREFER_ILM = new ParseField("prefer_ilm"); public static final ParseField MANAGED_BY = new ParseField("managed_by"); public static final ParseField NEXT_GENERATION_INDEX_MANAGED_BY = new ParseField("next_generation_managed_by"); @@ -424,6 +427,17 @@ public XContentBuilder toXContent( builder.endObject(); } + builder.startObject(SETTINGS_FIELD.getPreferredName()); + dataStream.getSettings().toXContent(builder, params); + builder.endObject(); + + Map uncompressedMapping = XContentHelper.convertToMap(dataStream.getMappings().uncompressed(), true, XContentType.JSON) + .v2(); + if (uncompressedMapping.isEmpty() == false) { + builder.field(MAPPINGS_FIELD.getPreferredName()); + builder.map(uncompressedMapping); + } + builder.startObject(DataStream.FAILURE_STORE_FIELD.getPreferredName()); builder.field(FAILURE_STORE_ENABLED.getPreferredName(), failureStoreEffectivelyEnabled); builder.field(DataStream.ROLLOVER_ON_WRITE_FIELD.getPreferredName(), dataStream.getFailureComponent().isRolloverOnWrite()); diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java new file mode 100644 index 0000000000000..b70d450a0905f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContentObject; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class GetDataStreamMappingsAction extends ActionType { + public static final String NAME = "indices:monitor/data_stream/mappings/get"; + public static final GetDataStreamMappingsAction INSTANCE = new GetDataStreamMappingsAction(); + + public GetDataStreamMappingsAction() { + super(NAME); + } + + public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable { + private String[] dataStreamNames; + + public Request(TimeValue masterNodeTimeout) { + super(masterNodeTimeout); + } + + @Override + public GetDataStreamMappingsAction.Request indices(String... dataStreamNames) { + this.dataStreamNames = dataStreamNames; + return this; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public String[] indices() { + return dataStreamNames; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetDataStreamMappingsAction.Request request = (GetDataStreamMappingsAction.Request) o; + return Arrays.equals(dataStreamNames, request.dataStreamNames); + } + + @Override + public int hashCode() { + return Arrays.hashCode(dataStreamNames); + } + } + + public static class Response extends ActionResponse implements ChunkedToXContentObject { + private final List DataStreamMappingsResponses; + + public Response(List DataStreamMappingsResponses) { + this.DataStreamMappingsResponses = DataStreamMappingsResponses; + } + + public List getDataStreamMappingsResponses() { + return DataStreamMappingsResponses; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + assert false : "This ought to never be called because this action only runs locally"; + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, params1) -> builder.startObject().startArray("data_streams")), + DataStreamMappingsResponses.stream().map(DataStreamMappingsResponse -> (ToXContent) DataStreamMappingsResponse).iterator(), + Iterators.single((builder, params1) -> builder.endArray().endObject()) + ); + } + } + + public record DataStreamMappingsResponse(String dataStreamName, CompressedXContent mappings, CompressedXContent effectiveMappings) + implements + ToXContent { + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + Map uncompressedMappings = XContentHelper.convertToMap(mappings.uncompressed(), true, XContentType.JSON).v2(); + if (uncompressedMappings.isEmpty() == false) { + builder.field("mappings"); + builder.map(uncompressedMappings); + } + Map uncompressedEffectiveMappings = XContentHelper.convertToMap( + effectiveMappings.uncompressed(), + true, + XContentType.JSON + ).v2(); + if (uncompressedEffectiveMappings.isEmpty() == false) { + builder.field("effective_mappings"); + builder.map(uncompressedEffectiveMappings); + } + builder.endObject(); + return builder; + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java new file mode 100644 index 0000000000000..467ea9080e834 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java @@ -0,0 +1,220 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContentObject; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class UpdateDataStreamMappingsAction extends ActionType { + + public static final String NAME = "indices:admin/data_stream/mappings/update"; + public static final UpdateDataStreamMappingsAction INSTANCE = new UpdateDataStreamMappingsAction(); + + public UpdateDataStreamMappingsAction() { + super(NAME); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest.Replaceable { + private final CompressedXContent mappings; + private final boolean dryRun; + private String[] dataStreamNames = Strings.EMPTY_ARRAY; + + public Request(CompressedXContent mappings, boolean dryRun, TimeValue masterNodeTimeout, TimeValue ackTimeout) { + super(masterNodeTimeout, ackTimeout); + this.mappings = mappings; + this.dryRun = dryRun; + } + + @Override + public Request indices(String... dataStreamNames) { + this.dataStreamNames = dataStreamNames; + return this; + } + + public CompressedXContent getMappings() { + return mappings; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.dataStreamNames = in.readStringArray(); + this.mappings = CompressedXContent.readCompressedString(in); + this.dryRun = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(dataStreamNames); + mappings.writeTo(out); + out.writeBoolean(dryRun); + } + + @Override + public String[] indices() { + return dataStreamNames; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(dataStreamNames, request.dataStreamNames) + && mappings.equals(request.mappings) + && dryRun == request.dryRun + && Objects.equals(masterNodeTimeout(), request.masterNodeTimeout()) + && Objects.equals(ackTimeout(), request.ackTimeout()); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(dataStreamNames), mappings, dryRun, masterNodeTimeout(), ackTimeout()); + } + + } + + public static class Response extends ActionResponse implements ChunkedToXContentObject { + private final List dataStreamMappingsResponses; + + public Response(List dataStreamMappingsResponses) { + this.dataStreamMappingsResponses = dataStreamMappingsResponses; + } + + public Response(StreamInput in) throws IOException { + this(in.readCollectionAsList(DataStreamMappingsResponse::new)); + } + + public List getDataStreamMappingsResponses() { + return dataStreamMappingsResponses; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(dataStreamMappingsResponses, (out1, value) -> value.writeTo(out1)); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, params1) -> builder.startObject().startArray("data_streams")), + dataStreamMappingsResponses.stream().map(dataStreamMappingsResponse -> (ToXContent) dataStreamMappingsResponse).iterator(), + Iterators.single((builder, params1) -> builder.endArray().endObject()) + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(dataStreamMappingsResponses, response.dataStreamMappingsResponses); + } + + @Override + public int hashCode() { + return Objects.hash(dataStreamMappingsResponses); + } + } + + public record DataStreamMappingsResponse( + String dataStreamName, + boolean dataStreamSucceeded, + String dataStreamErrorMessage, + CompressedXContent mappings, + CompressedXContent effectiveMappings + ) implements ToXContent, Writeable { + + public DataStreamMappingsResponse(StreamInput in) throws IOException { + this( + in.readString(), + in.readBoolean(), + in.readOptionalString(), + CompressedXContent.readCompressedString(in), + CompressedXContent.readCompressedString(in) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(dataStreamName); + out.writeBoolean(dataStreamSucceeded); + out.writeOptionalString(dataStreamErrorMessage); + mappings.writeTo(out); + effectiveMappings.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + builder.field("applied_to_data_stream", dataStreamSucceeded); + if (dataStreamErrorMessage != null) { + builder.field("error", dataStreamErrorMessage); + } + Map uncompressedMappings = XContentHelper.convertToMap(mappings.uncompressed(), true, XContentType.JSON).v2(); + if (uncompressedMappings.isEmpty() == false) { + builder.field("mappings"); + builder.map(uncompressedMappings); + } + Map uncompressedEffectiveMappings = XContentHelper.convertToMap( + effectiveMappings.uncompressed(), + true, + XContentType.JSON + ).v2(); + if (uncompressedEffectiveMappings.isEmpty() == false) { + builder.field("effective_mappings"); + builder.map(uncompressedEffectiveMappings); + } + builder.endObject(); + return builder; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index 4ceb807adece4..fa270e06d91cd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -19,18 +19,24 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -338,6 +344,62 @@ public ComposableIndexTemplate mergeSettings(Settings settings) { return mergedIndexTemplateBuilder.build(); } + public ComposableIndexTemplate mergeMappings(CompressedXContent mappings) throws IOException { + Objects.requireNonNull(mappings); + if (Mapping.EMPTY.toCompressedXContent().equals(mappings)) { + return this; + } + ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = this.toBuilder(); + Template.Builder mergedTemplateBuilder; + CompressedXContent templateMappings; + if (this.template() == null) { + mergedTemplateBuilder = Template.builder(); + templateMappings = null; + } else { + mergedTemplateBuilder = Template.builder(this.template()); + templateMappings = this.template().mappings(); + } + mergedTemplateBuilder.mappings(templateMappings == null ? mappings : merge(templateMappings, mappings)); + mergedIndexTemplateBuilder.template(mergedTemplateBuilder); + return mergedIndexTemplateBuilder.build(); + } + + @SuppressWarnings("unchecked") + private CompressedXContent merge(CompressedXContent originalMapping, CompressedXContent mappingAddition) throws IOException { + Map mappingAdditionMap = XContentHelper.convertToMap(mappingAddition.uncompressed(), true, XContentType.JSON).v2(); + Map combinedMappingMap = new HashMap<>(); + if (originalMapping != null) { + combinedMappingMap.putAll((Map) XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON).v2().get("_doc")); + } + XContentHelper.update(combinedMappingMap, mappingAdditionMap, true); + if (combinedMappingMap.isEmpty()) { + return null; + } else { + return convertMappingMapToXContent(combinedMappingMap); + } + } + + private static CompressedXContent convertMappingMapToXContent(Map rawAdditionalMapping) throws IOException { + CompressedXContent compressedXContent; + if (rawAdditionalMapping == null || rawAdditionalMapping.isEmpty()) { + compressedXContent = null; + } else { + try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) { + compressedXContent = mappingFromXContent(parser); + } + } + return compressedXContent; + } + + private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT) { + return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered()))); + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + } + @Override public int hashCode() { return Objects.hash( diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index f131f0e2550a2..9d1d8368ff918 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -45,11 +46,15 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; @@ -58,6 +63,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Base64; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -120,6 +126,7 @@ public final class DataStream implements SimpleDiffable, ToXContentO @Nullable private final Map metadata; private final Settings settings; + private final CompressedXContent mappings; private final boolean hidden; private final boolean replicated; private final boolean system; @@ -156,6 +163,7 @@ public DataStream( generation, metadata, Settings.EMPTY, + Mapping.EMPTY.toCompressedXContent(), hidden, replicated, system, @@ -176,6 +184,7 @@ public DataStream( long generation, Map metadata, Settings settings, + CompressedXContent mappings, boolean hidden, boolean replicated, boolean system, @@ -192,6 +201,7 @@ public DataStream( generation, metadata, settings, + mappings, hidden, replicated, system, @@ -210,6 +220,7 @@ public DataStream( long generation, Map metadata, Settings settings, + CompressedXContent mappings, boolean hidden, boolean replicated, boolean system, @@ -225,6 +236,7 @@ public DataStream( this.generation = generation; this.metadata = metadata; this.settings = Objects.requireNonNull(settings); + this.mappings = Objects.requireNonNull(mappings); assert system == false || hidden; // system indices must be hidden this.hidden = hidden; this.replicated = replicated; @@ -285,11 +297,18 @@ public static DataStream read(StreamInput in) throws IOException { } else { settings = Settings.EMPTY; } + CompressedXContent mappings; + if (in.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) { + mappings = CompressedXContent.readCompressedString(in); + } else { + mappings = Mapping.EMPTY.toCompressedXContent(); + } return new DataStream( name, generation, metadata, settings, + mappings, hidden, replicated, system, @@ -380,8 +399,8 @@ public boolean rolloverOnWrite() { return backingIndices.rolloverOnWrite; } - public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) { - return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings); + public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) throws IOException { + return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings).mergeMappings(mappings); } public Settings getEffectiveSettings(ProjectMetadata projectMetadata) { @@ -390,6 +409,10 @@ public Settings getEffectiveSettings(ProjectMetadata projectMetadata) { return templateSettings.merge(settings); } + public CompressedXContent getEffectiveMappings(ProjectMetadata projectMetadata) throws IOException { + return getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings(); + } + private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) { return lookupTemplateForDataStream(name, projectMetadata); } @@ -509,6 +532,10 @@ public Settings getSettings() { return settings; } + public CompressedXContent getMappings() { + return mappings; + } + @Override public boolean isHidden() { return hidden; @@ -1352,6 +1379,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS)) { settings.writeTo(out); } + if (out.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) { + mappings.writeTo(out); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -1374,6 +1404,7 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding"); public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options"); public static final ParseField SETTINGS_FIELD = new ParseField("settings"); + public static final ParseField MAPPINGS_FIELD = new ParseField("mappings"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( @@ -1383,6 +1414,7 @@ public void writeTo(StreamOutput out) throws IOException { (Long) args[2], (Map) args[3], args[17] == null ? Settings.EMPTY : (Settings) args[17], + args[18] == null ? Mapping.EMPTY.toCompressedXContent() : (CompressedXContent) args[18], args[4] != null && (boolean) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6], @@ -1454,6 +1486,18 @@ public void writeTo(StreamOutput out) throws IOException { DATA_STREAM_OPTIONS_FIELD ); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> { + XContentParser.Token token = p.currentToken(); + if (token == XContentParser.Token.VALUE_STRING) { + return new CompressedXContent(Base64.getDecoder().decode(p.text())); + } else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) { + return new CompressedXContent(p.binaryValue()); + } else if (token == XContentParser.Token.START_OBJECT) { + return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(p.mapOrdered()))); + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + }, MAPPINGS_FIELD, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -1518,6 +1562,20 @@ public XContentBuilder toXContent( builder.startObject(SETTINGS_FIELD.getPreferredName()); this.settings.toXContent(builder, params); builder.endObject(); + + String context = params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API); + boolean binary = params.paramAsBoolean("binary", false); + if (Metadata.CONTEXT_MODE_API.equals(context) || binary == false) { + Map uncompressedMapping = XContentHelper.convertToMap(this.mappings.uncompressed(), true, XContentType.JSON) + .v2(); + if (uncompressedMapping.isEmpty() == false) { + builder.field(MAPPINGS_FIELD.getPreferredName()); + builder.map(uncompressedMapping); + } + } else { + builder.field(MAPPINGS_FIELD.getPreferredName(), mappings.compressed()); + } + builder.endObject(); return builder; } @@ -1862,6 +1920,7 @@ public static class Builder { @Nullable private Map metadata = null; private Settings settings = Settings.EMPTY; + private CompressedXContent mappings = Mapping.EMPTY.toCompressedXContent(); private boolean hidden = false; private boolean replicated = false; private boolean system = false; @@ -1890,6 +1949,7 @@ private Builder(DataStream dataStream) { generation = dataStream.generation; metadata = dataStream.metadata; settings = dataStream.settings; + mappings = dataStream.mappings; hidden = dataStream.hidden; replicated = dataStream.replicated; system = dataStream.system; @@ -1926,6 +1986,11 @@ public Builder setSettings(Settings settings) { return this; } + public Builder setMappings(CompressedXContent mappings) { + this.mappings = mappings; + return this; + } + public Builder setHidden(boolean hidden) { this.hidden = hidden; return this; @@ -1987,6 +2052,7 @@ public DataStream build() { generation, metadata, settings, + mappings, hidden, replicated, system, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index ba462a9416520..36e1463a07208 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; +import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.indices.SystemDataStreamDescriptor; @@ -332,6 +333,7 @@ static ClusterState createDataStream( initialGeneration, template.metadata() != null ? Map.copyOf(template.metadata()) : null, Settings.EMPTY, + Mapping.EMPTY.toCompressedXContent(), hidden, false, isSystem, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index d3784253b536e..846d87d648299 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -59,6 +60,7 @@ public class MetadataDataStreamsService { private final MasterServiceTaskQueue setRolloverOnWriteTaskQueue; private final MasterServiceTaskQueue updateOptionsTaskQueue; private final MasterServiceTaskQueue updateSettingsTaskQueue; + private final MasterServiceTaskQueue updateMappingsTaskQueue; public MetadataDataStreamsService( ClusterService clusterService, @@ -177,6 +179,39 @@ public Tuple executeTask( Priority.NORMAL, updateSettingsExecutor ); + ClusterStateTaskExecutor updateMappingsExecutor = new SimpleBatchedAckListenerTaskExecutor<>() { + + @Override + public Tuple executeTask( + UpdateMappingsTask updateMappingsTask, + ClusterState clusterState + ) throws Exception { + + ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateMappingsTask.projectId); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + Map dataStreamMap = projectMetadata.dataStreams(); + DataStream dataStream = dataStreamMap.get(updateMappingsTask.dataStreamName); + + final ComposableIndexTemplate template = lookupTemplateForDataStream(updateMappingsTask.dataStreamName, projectMetadata); + ComposableIndexTemplate mergedTemplate = template.mergeMappings(updateMappingsTask.mappingsOverrides); + MetadataIndexTemplateService.validateTemplate( + mergedTemplate.template().settings(), + mergedTemplate.template().mappings(), + indicesService + ); + DataStream.Builder dataStreamBuilder = dataStream.copy().setMappings(updateMappingsTask.mappingsOverrides); + projectMetadataBuilder.removeDataStream(updateMappingsTask.dataStreamName); + projectMetadataBuilder.put(dataStreamBuilder.build()); + ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build(); + + return new Tuple<>(updatedClusterState, updateMappingsTask); + } + }; + this.updateMappingsTaskQueue = clusterService.createTaskQueue( + "update-data-stream-settings", + Priority.NORMAL, + updateMappingsExecutor + ); } public void modifyDataStream( @@ -429,6 +464,15 @@ public void updateSettings( ); } + public void updateMappings(ProjectId projectId, TimeValue masterNodeTimeout, TimeValue ackTimeout, String dataStreamName, + CompressedXContent mappingsOverrides, ActionListener listener) { + updateMappingsTaskQueue.submitTask( + "updating mappings on data stream", + new UpdateMappingsTask(projectId, dataStreamName, mappingsOverrides, ackTimeout, listener), + masterNodeTimeout + ); + } + private static void addBackingIndex( ProjectMetadata project, ProjectMetadata.Builder builder, @@ -692,4 +736,23 @@ static class UpdateSettingsTask extends AckedBatchedClusterStateUpdateTask { this.settingsOverrides = settingsOverrides; } } + + static class UpdateMappingsTask extends AckedBatchedClusterStateUpdateTask { + final ProjectId projectId; + private final String dataStreamName; + private final CompressedXContent mappingsOverrides; + + UpdateMappingsTask( + ProjectId projectId, + String dataStreamName, + CompressedXContent mappingsOverrides, + TimeValue ackTimeout, + ActionListener listener + ) { + super(ackTimeout, listener); + this.projectId = projectId; + this.dataStreamName = dataStreamName; + this.mappingsOverrides = mappingsOverrides; + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 85b33e61ce8f4..95a57ae0d2a30 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -98,6 +98,7 @@ protected DataStream mutateInstance(DataStream instance) { var generation = instance.getGeneration(); var metadata = instance.getMetadata(); var settings = instance.getSettings(); + var mappings = instance.getMappings(); var isHidden = instance.isHidden(); var isReplicated = instance.isReplicated(); var isSystem = instance.isSystem(); @@ -110,7 +111,7 @@ protected DataStream mutateInstance(DataStream instance) { var autoShardingEvent = instance.getAutoShardingEvent(); var failureRolloverOnWrite = instance.getFailureComponent().isRolloverOnWrite(); var failureAutoShardingEvent = instance.getDataComponent().getAutoShardingEvent(); - switch (between(0, 16)) { + switch (between(0, 17)) { case 0 -> name = randomAlphaOfLength(10); case 1 -> indices = randomNonEmptyIndexInstances(); case 2 -> generation = instance.getGeneration() + randomIntBetween(1, 10); @@ -179,6 +180,7 @@ protected DataStream mutateInstance(DataStream instance) { ? null : new DataStreamAutoShardingEvent(indices.getLast().getName(), randomIntBetween(1, 10), randomMillisUpToYear9999()); case 16 -> settings = randomValueOtherThan(settings, DataStreamTestHelper::randomSettings); + case 17 -> mappings = randomValueOtherThan(mappings, ComponentTemplateTests::randomMappings); } return new DataStream( @@ -186,6 +188,7 @@ protected DataStream mutateInstance(DataStream instance) { generation, metadata, settings, + mappings, isHidden, isReplicated, isSystem, @@ -1953,6 +1956,7 @@ public void testXContentSerializationWithRolloverAndEffectiveRetention() throws generation, metadata, randomSettings(), + randomMappings(), isSystem, randomBoolean(), isSystem, @@ -2146,6 +2150,7 @@ public void testWriteFailureIndex() { randomNonNegativeInt(), null, randomSettings(), + randomMappings(), hidden, replicated, system, @@ -2165,6 +2170,7 @@ public void testWriteFailureIndex() { randomNonNegativeInt(), null, randomSettings(), + randomMappings(), hidden, replicated, system, @@ -2191,6 +2197,7 @@ public void testWriteFailureIndex() { randomNonNegativeInt(), null, randomSettings(), + randomMappings(), hidden, replicated, system, @@ -2216,6 +2223,7 @@ public void testIsFailureIndex() { randomNonNegativeInt(), null, randomSettings(), + randomMappings(), hidden, replicated, system, @@ -2239,6 +2247,7 @@ public void testIsFailureIndex() { randomNonNegativeInt(), null, randomSettings(), + randomMappings(), hidden, replicated, system, @@ -2271,6 +2280,7 @@ public void testIsFailureIndex() { randomNonNegativeInt(), null, randomSettings(), + randomMappings(), hidden, replicated, system, @@ -2674,4 +2684,13 @@ private static void createMetadataForIndices(Metadata.Builder builder, List dataStreams = List.of(generateRandomStringArray(5, 5, false, false)); return new DataStreamAlias( From 7c3188f0d1b6c703b6a8a031cb03fcf682d371e9 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 21 May 2025 22:32:11 +0000 Subject: [PATCH 02/18] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/action/ActionListener.java | 1 - .../action/datastreams/GetDataStreamAction.java | 7 +++++-- .../cluster/metadata/ComposableIndexTemplate.java | 6 +++++- .../org/elasticsearch/cluster/metadata/DataStream.java | 1 - .../cluster/metadata/MetadataDataStreamsService.java | 10 ++++++++-- .../cluster/metadata/DataStreamTests.java | 10 ++++++++-- .../cluster/metadata/DataStreamTestHelper.java | 1 - 7 files changed, 26 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index 13a066ae5d965..b7e8de1f4fa36 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -21,7 +21,6 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.transport.LeakTracker; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 266bfc757ffe2..9a8b852971b77 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -431,8 +431,11 @@ public XContentBuilder toXContent( dataStream.getSettings().toXContent(builder, params); builder.endObject(); - Map uncompressedMapping = XContentHelper.convertToMap(dataStream.getMappings().uncompressed(), true, XContentType.JSON) - .v2(); + Map uncompressedMapping = XContentHelper.convertToMap( + dataStream.getMappings().uncompressed(), + true, + XContentType.JSON + ).v2(); if (uncompressedMapping.isEmpty() == false) { builder.field(MAPPINGS_FIELD.getPreferredName()); builder.map(uncompressedMapping); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index fa270e06d91cd..e823f7d70542d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -369,7 +369,11 @@ private CompressedXContent merge(CompressedXContent originalMapping, CompressedX Map mappingAdditionMap = XContentHelper.convertToMap(mappingAddition.uncompressed(), true, XContentType.JSON).v2(); Map combinedMappingMap = new HashMap<>(); if (originalMapping != null) { - combinedMappingMap.putAll((Map) XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON).v2().get("_doc")); + combinedMappingMap.putAll( + (Map) XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON) + .v2() + .get("_doc") + ); } XContentHelper.update(combinedMappingMap, mappingAdditionMap, true); if (combinedMappingMap.isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 9d1d8368ff918..ab0e9a4498478 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -46,7 +46,6 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.xcontent.ConstructingObjectParser; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 846d87d648299..86e02f0042486 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -464,8 +464,14 @@ public void updateSettings( ); } - public void updateMappings(ProjectId projectId, TimeValue masterNodeTimeout, TimeValue ackTimeout, String dataStreamName, - CompressedXContent mappingsOverrides, ActionListener listener) { + public void updateMappings( + ProjectId projectId, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + String dataStreamName, + CompressedXContent mappingsOverrides, + ActionListener listener + ) { updateMappingsTaskQueue.submitTask( "updating mappings on data stream", new UpdateMappingsTask(projectId, dataStreamName, mappingsOverrides, ackTimeout, listener), diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 95a57ae0d2a30..bfb1766881b7e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2690,7 +2690,13 @@ protected ToXContent.Params getToXContentParams() { if (randomBoolean()) { return ToXContent.EMPTY_PARAMS; } - return new ToXContent.MapParams(Map.of("binary", randomFrom("true", "false"), Metadata.CONTEXT_MODE_PARAM, - randomFrom(Metadata.XContentContext.values()).toString())); + return new ToXContent.MapParams( + Map.of( + "binary", + randomFrom("true", "false"), + Metadata.CONTEXT_MODE_PARAM, + randomFrom(Metadata.XContentContext.values()).toString() + ) + ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index e22198e6f1352..7c78c82f75f0e 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -55,7 +55,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ToXContent; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; From 7cafce9c69f78494e668777cb41c184af76fa623 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 23 May 2025 15:45:47 -0500 Subject: [PATCH 03/18] some testing --- .../data_stream/250_data_stream_mappings.yml | 92 +++++++++++++++++++ .../api/indices.put_data_stream_mappings.json | 49 ++++++++++ .../datastreams/GetDataStreamAction.java | 18 ++-- .../GetDataStreamMappingsAction.java | 12 +-- .../cluster/metadata/DataStream.java | 23 ++++- .../MetadataCreateDataStreamService.java | 3 +- .../cluster/metadata/DataStreamTests.java | 6 +- 7 files changed, 174 insertions(+), 29 deletions(-) create mode 100644 modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml new file mode 100644 index 0000000000000..f7bd2ef1f467c --- /dev/null +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml @@ -0,0 +1,92 @@ +setup: + - skip: + features: allowed_warnings + +--- +"Test single data stream": + - requires: + cluster_features: [ "logs_stream" ] + reason: requires setting 'logs_stream' to get or set data stream settings + - do: + allowed_warnings: + - "index template [my-template] has index patterns [my-data-stream-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation" + indices.put_index_template: + name: my-template + body: + index_patterns: [ my-data-stream-* ] + data_stream: { } + template: + settings: + number_of_replicas: 0 + lifecycle.name: my-policy + + - do: + indices.create_data_stream: + name: my-data-stream-1 + + - do: + cluster.health: + index: "my-data-stream-1" + wait_for_status: green + + - do: + indices.get_data_stream_mappings: + name: my-data-stream-1 + - match: { data_streams.0.name: my-data-stream-1 } + - match: { data_streams.0.mappings: {} } + - match: { data_streams.0.effective_mappings: {} } + + - do: + indices.get_data_stream: + name: my-data-stream-1 + - match: { data_streams.0.name: my-data-stream-1 } + - match: { data_streams.0.mappings: {} } + - match: { data_streams.0.effective_mappings: null } + + - do: + indices.put_data_stream_mappings: + name: my-data-stream-1 + body: + properties: + name: + type: keyword + fields: + english: + type: text + - match: { data_streams.0.name: my-data-stream-1 } + - match: { data_streams.0.applied_to_data_stream: true } + - match: { data_streams.0.mappings.properties.name.type: "keyword" } + - match: { data_streams.0.effective_mappings.properties.name.type: "keyword" } + + - do: + indices.rollover: + alias: "my-data-stream-1" + + - do: + cluster.health: + index: "my-data-stream-1" + wait_for_status: green + + - do: + indices.get_data_stream_mappings: + name: my-data-stream-1 + - match: { data_streams.0.name: my-data-stream-1 } + - match: { data_streams.0.mappings.properties.name.type: "keyword" } + - match: { data_streams.0.effective_mappings.properties.name.type: "keyword" } + + - do: + indices.get_data_stream: + name: my-data-stream-1 + - match: { data_streams.0.name: my-data-stream-1 } + - match: { data_streams.0.mappings.properties.name.type: "keyword" } + - match: { data_streams.0.effective_mappings: null } + + - do: + indices.get_data_stream: + name: my-data-stream-1 + - set: { data_streams.0.indices.0.index_name: idx0name } + + - do: + indices.get_mapping: + index: my-data-stream-1 + - match: { .$idx0name.mappings.properties.name.type: "keyword" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json new file mode 100644 index 0000000000000..152a44b2f7914 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json @@ -0,0 +1,49 @@ +{ + "indices.put_data_stream_mappings":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Updates a data stream's mappings" + }, + "stability":"stable", + "visibility": "feature_flag", + "feature_flag": "logs_stream", + "headers":{ + "accept": [ "application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}/_mappings", + "methods":[ + "PUT" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream or data stream pattern" + } + } + } + ] + }, + "params":{ + "dry_run":{ + "type":"boolean", + "description":"Perform a dry run but do not actually change any mappings", + "default":false + }, + "timeout":{ + "type":"time", + "description":"Specify timeout for acknowledging the cluster state update" + }, + "master_timeout":{ + "type":"time", + "description":"Specify timeout for connection to master" + } + }, + "body":{ + "description":"The data stream mappings to be updated", + "required":true + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 9a8b852971b77..e310e7928021b 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -425,20 +425,14 @@ public XContentBuilder toXContent( builder.startObject(SETTINGS_FIELD.getPreferredName()); dataStream.getSettings().toXContent(builder, params); builder.endObject(); - } - - builder.startObject(SETTINGS_FIELD.getPreferredName()); - dataStream.getSettings().toXContent(builder, params); - builder.endObject(); - Map uncompressedMapping = XContentHelper.convertToMap( - dataStream.getMappings().uncompressed(), - true, - XContentType.JSON - ).v2(); - if (uncompressedMapping.isEmpty() == false) { builder.field(MAPPINGS_FIELD.getPreferredName()); - builder.map(uncompressedMapping); + Map uncompressedMappings = XContentHelper.convertToMap( + dataStream.getMappings().uncompressed(), + true, + XContentType.JSON + ).v2(); + builder.map(uncompressedMappings); } builder.startObject(DataStream.FAILURE_STORE_FIELD.getPreferredName()); diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java index b70d450a0905f..9b5c60912a11f 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java @@ -123,19 +123,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field("name", dataStreamName); Map uncompressedMappings = XContentHelper.convertToMap(mappings.uncompressed(), true, XContentType.JSON).v2(); - if (uncompressedMappings.isEmpty() == false) { - builder.field("mappings"); - builder.map(uncompressedMappings); - } + builder.field("mappings"); + builder.map(uncompressedMappings); Map uncompressedEffectiveMappings = XContentHelper.convertToMap( effectiveMappings.uncompressed(), true, XContentType.JSON ).v2(); - if (uncompressedEffectiveMappings.isEmpty() == false) { - builder.field("effective_mappings"); - builder.map(uncompressedEffectiveMappings); - } + builder.field("effective_mappings"); + builder.map(uncompressedEffectiveMappings); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index ab0e9a4498478..800f05497a010 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -94,6 +94,16 @@ public final class DataStream implements SimpleDiffable, ToXContentO public static final String FAILURE_STORE_PREFIX = ".fs-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); public static final String TIMESTAMP_FIELD_NAME = "@timestamp"; + + public static final CompressedXContent EMPTY_MAPPINGS; + static { + try { + EMPTY_MAPPINGS = new CompressedXContent(Map.of()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + // Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations public static final Comparator TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> { try { @@ -300,7 +310,7 @@ public static DataStream read(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) { mappings = CompressedXContent.readCompressedString(in); } else { - mappings = Mapping.EMPTY.toCompressedXContent(); + mappings = EMPTY_MAPPINGS; } return new DataStream( name, @@ -409,7 +419,12 @@ public Settings getEffectiveSettings(ProjectMetadata projectMetadata) { } public CompressedXContent getEffectiveMappings(ProjectMetadata projectMetadata) throws IOException { - return getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings(); + CompressedXContent mergedMappings = getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings(); + if (mergedMappings == null) { + return EMPTY_MAPPINGS; + } else { + return mergedMappings; + } } private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) { @@ -1413,7 +1428,7 @@ public void writeTo(StreamOutput out) throws IOException { (Long) args[2], (Map) args[3], args[17] == null ? Settings.EMPTY : (Settings) args[17], - args[18] == null ? Mapping.EMPTY.toCompressedXContent() : (CompressedXContent) args[18], + args[18] == null ? EMPTY_MAPPINGS : (CompressedXContent) args[18], args[4] != null && (boolean) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6], @@ -1919,7 +1934,7 @@ public static class Builder { @Nullable private Map metadata = null; private Settings settings = Settings.EMPTY; - private CompressedXContent mappings = Mapping.EMPTY.toCompressedXContent(); + private CompressedXContent mappings = EMPTY_MAPPINGS; private boolean hidden = false; private boolean replicated = false; private boolean system = false; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 36e1463a07208..794da816e980e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -32,7 +32,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.indices.SystemDataStreamDescriptor; @@ -333,7 +332,7 @@ static ClusterState createDataStream( initialGeneration, template.metadata() != null ? Map.copyOf(template.metadata()) : null, Settings.EMPTY, - Mapping.EMPTY.toCompressedXContent(), + DataStream.EMPTY_MAPPINGS, hidden, false, isSystem, diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index bfb1766881b7e..b6854de3a2db9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2562,7 +2562,7 @@ public void testGetEffectiveIndexTemplateNoMatchingTemplate() { assertThrows(IllegalArgumentException.class, () -> dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build())); } - public void testGetEffectiveIndexTemplateTemplateSettingsOnly() { + public void testGetEffectiveIndexTemplateTemplateSettingsOnly() throws IOException { // We only have settings from the template, so the effective template will just be the original template DataStream dataStream = createDataStream(Settings.EMPTY); Settings templateSettings = randomSettings(); @@ -2577,7 +2577,7 @@ public void testGetEffectiveIndexTemplateTemplateSettingsOnly() { assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(indexTemplate)); } - public void testGetEffectiveIndexTemplateDataStreamSettingsOnly() { + public void testGetEffectiveIndexTemplateDataStreamSettingsOnly() throws IOException { // We only have settings from the data stream, so we expect to get only those back in the effective template Settings dataStreamSettings = randomSettings(); DataStream dataStream = createDataStream(dataStreamSettings); @@ -2600,7 +2600,7 @@ public void testGetEffectiveIndexTemplateDataStreamSettingsOnly() { assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate)); } - public void testGetEffectiveIndexTemplate() { + public void testGetEffectiveIndexTemplate() throws IOException { // Here we have settings from both the template and the data stream, so we expect the data stream settings to take precedence Settings dataStreamSettings = Settings.builder() .put("index.setting1", "dataStreamValue") From 77104551e336dc9625654119a9e3772dd1acef6e Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 27 May 2025 16:20:44 -0500 Subject: [PATCH 04/18] fixing compilation error --- .../datastreams/UpdateTimeSeriesRangeServiceTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java index 0169b1b7da8cd..54232799a3c78 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java @@ -258,6 +258,7 @@ public void testUpdateTimeSeriesTemporalOneBadDataStream() { 2, ds2.getMetadata(), ds2.getSettings(), + ds2.getMappings(), ds2.isHidden(), ds2.isReplicated(), ds2.isSystem(), From 97ef9d61d601abc20796d5dbe7db476298c07ff4 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 27 May 2025 16:27:34 -0500 Subject: [PATCH 05/18] adding and fixing rest-api-spec files --- .../api/indices.get_data_stream_mappings.json | 36 +++++++++++++++++++ .../api/indices.put_data_stream_mappings.json | 8 ++--- 2 files changed, 40 insertions(+), 4 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_stream_mappings.json diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_stream_mappings.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_stream_mappings.json new file mode 100644 index 0000000000000..1677e4d677086 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_stream_mappings.json @@ -0,0 +1,36 @@ +{ + "indices.get_data_stream_mappings":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Gets a data stream's mappings" + }, + "stability":"stable", + "visibility": "feature_flag", + "feature_flag": "logs_stream", + "headers":{ + "accept": [ "application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}/_mappings", + "methods":[ + "GET" + ], + "parts":{ + "name":{ + "type":"string", + "description":"Comma-separated list of data streams or data stream patterns" + } + } + } + ] + }, + "params":{ + "master_timeout":{ + "type":"time", + "description":"Period to wait for a connection to the master node" + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json index 152a44b2f7914..d885f45c33b14 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json @@ -20,7 +20,7 @@ "parts":{ "name":{ "type":"string", - "description":"The name of the data stream or data stream pattern" + "description":"Comma-separated list of data streams or data stream patterns" } } } @@ -29,16 +29,16 @@ "params":{ "dry_run":{ "type":"boolean", - "description":"Perform a dry run but do not actually change any mappings", + "description":"Whether this request should only be a dry run rather than actually applying mappings", "default":false }, "timeout":{ "type":"time", - "description":"Specify timeout for acknowledging the cluster state update" + "description":"Period to wait for a response" }, "master_timeout":{ "type":"time", - "description":"Specify timeout for connection to master" + "description":"Period to wait for a connection to the master node" } }, "body":{ From 594110518052a61105a0e51c65b438bdeae598ca Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 27 May 2025 16:29:42 -0500 Subject: [PATCH 06/18] fixing OperatorPrivilegesIT --- .../org/elasticsearch/xpack/security/operator/Constants.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 891a0badfd741..f5875d1125f37 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -610,6 +610,7 @@ public class Constants { "indices:data/write/update", "indices:data/write/update/byquery", "indices:monitor/data_stream/settings/get", + "indices:monitor/data_stream/settings/update", "indices:monitor/data_stream/stats", "indices:monitor/field_usage_stats", "indices:monitor/fleet/global_checkpoints[s]", From fc67d9dc7654e45de483765de7384b45708355af Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 27 May 2025 17:28:12 -0500 Subject: [PATCH 07/18] fixing yaml rest test --- .../test/data_stream/250_data_stream_mappings.yml | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml index f7bd2ef1f467c..33cb8ac508c74 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml @@ -80,13 +80,11 @@ setup: - match: { data_streams.0.name: my-data-stream-1 } - match: { data_streams.0.mappings.properties.name.type: "keyword" } - match: { data_streams.0.effective_mappings: null } - - - do: - indices.get_data_stream: - name: my-data-stream-1 - - set: { data_streams.0.indices.0.index_name: idx0name } + - set: { data_streams.0.indices.0.index_name: oldIndexName } + - set: { data_streams.0.indices.1.index_name: newIndexName } - do: indices.get_mapping: index: my-data-stream-1 - - match: { .$idx0name.mappings.properties.name.type: "keyword" } + - match: { .$oldIndexName.mappings.properties.name: null } + - match: { .$newIndexName.mappings.properties.name.type: "keyword" } From 9f9e61704e45cd865b1b6961dda60c195aae86bf Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 28 May 2025 16:46:54 -0500 Subject: [PATCH 08/18] fixing tests --- .../data_stream/250_data_stream_mappings.yml | 7 ++++- .../cluster/metadata/DataStreamTests.java | 30 +++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml index 33cb8ac508c74..d64f5e6b94612 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml @@ -19,6 +19,10 @@ setup: settings: number_of_replicas: 0 lifecycle.name: my-policy + mappings: + properties: + field1: + type: keyword - do: indices.create_data_stream: @@ -34,7 +38,7 @@ setup: name: my-data-stream-1 - match: { data_streams.0.name: my-data-stream-1 } - match: { data_streams.0.mappings: {} } - - match: { data_streams.0.effective_mappings: {} } + - length: { data_streams.0.effective_mappings.properties: 1 } - do: indices.get_data_stream: @@ -71,6 +75,7 @@ setup: indices.get_data_stream_mappings: name: my-data-stream-1 - match: { data_streams.0.name: my-data-stream-1 } + - length: { data_streams.0.effective_mappings.properties: 2 } - match: { data_streams.0.mappings.properties.name.type: "keyword" } - match: { data_streams.0.effective_mappings.properties.name.type: "keyword" } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index b6854de3a2db9..614ad930fc66a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -53,7 +53,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; @@ -2562,9 +2561,9 @@ public void testGetEffectiveIndexTemplateNoMatchingTemplate() { assertThrows(IllegalArgumentException.class, () -> dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build())); } - public void testGetEffectiveIndexTemplateTemplateSettingsOnly() throws IOException { - // We only have settings from the template, so the effective template will just be the original template - DataStream dataStream = createDataStream(Settings.EMPTY); + public void testGetEffectiveIndexTemplateTemplateNoOverrides() throws IOException { + // We only have settings and mappings from the template, so the effective template will just be the original template + DataStream dataStream = createDataStream(Settings.EMPTY, DataStream.EMPTY_MAPPINGS); Settings templateSettings = randomSettings(); Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(randomMappings()); ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder() @@ -2580,7 +2579,7 @@ public void testGetEffectiveIndexTemplateTemplateSettingsOnly() throws IOExcepti public void testGetEffectiveIndexTemplateDataStreamSettingsOnly() throws IOException { // We only have settings from the data stream, so we expect to get only those back in the effective template Settings dataStreamSettings = randomSettings(); - DataStream dataStream = createDataStream(dataStreamSettings); + DataStream dataStream = createDataStream(dataStreamSettings, DataStream.EMPTY_MAPPINGS); Settings templateSettings = Settings.EMPTY; CompressedXContent templateMappings = randomMappings(); Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings); @@ -2607,7 +2606,7 @@ public void testGetEffectiveIndexTemplate() throws IOException { .put("index.setting2", "dataStreamValue") .put("index.setting3", (String) null) // This one gets removed from the effective settings .build(); - DataStream dataStream = createDataStream(dataStreamSettings); + DataStream dataStream = createDataStream(dataStreamSettings, DataStream.EMPTY_MAPPINGS); Settings templateSettings = Settings.builder() .put("index.setting1", "templateValue") .put("index.setting3", "templateValue") @@ -2636,11 +2635,30 @@ public void testGetEffectiveIndexTemplate() throws IOException { assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate)); } + private static CompressedXContent randomMappings() { + try { + return new CompressedXContent("{\"_doc\": {\"properties\":{\"" + randomAlphaOfLength(5) + "\":{\"type\":\"keyword\"}}}}"); + } catch (IOException e) { + fail("got an IO exception creating fake mappings: " + e); + return null; + } + } + private DataStream createDataStream(Settings settings) { DataStream dataStream = createTestInstance(); return dataStream.copy().setSettings(settings).build(); } + private DataStream createDataStream(CompressedXContent mappings) { + DataStream dataStream = createTestInstance(); + return dataStream.copy().setMappings(mappings).build(); + } + + private DataStream createDataStream(Settings settings, CompressedXContent mappings) { + DataStream dataStream = createTestInstance(); + return dataStream.copy().setSettings(settings).setMappings(mappings).build(); + } + private record DataStreamMetadata(Long creationTimeInMillis, Long rolloverTimeInMillis, Long originationTimeInMillis) { public static DataStreamMetadata dataStreamMetadata(Long creationTimeInMillis, Long rolloverTimeInMillis) { return new DataStreamMetadata(creationTimeInMillis, rolloverTimeInMillis, null); From 9419f55a9d644bc409e765e95751fcddf7aa35ca Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 28 May 2025 16:49:36 -0500 Subject: [PATCH 09/18] fixing OperatorPrivilegesIT --- .../org/elasticsearch/xpack/security/operator/Constants.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index f5875d1125f37..20a8541701ec0 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -609,6 +609,8 @@ public class Constants { "indices:data/write/reindex", "indices:data/write/update", "indices:data/write/update/byquery", + "indices:monitor/data_stream/mappings/get", + "indices:monitor/data_stream/mappings/update", "indices:monitor/data_stream/settings/get", "indices:monitor/data_stream/settings/update", "indices:monitor/data_stream/stats", From 33d88653ac7f01a79aaca9707e3835c60a4a7634 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 28 May 2025 17:42:09 -0500 Subject: [PATCH 10/18] fixing OperatorPrivilegesIT --- .../org/elasticsearch/xpack/security/operator/Constants.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 20a8541701ec0..9f37d525816e3 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -610,9 +610,7 @@ public class Constants { "indices:data/write/update", "indices:data/write/update/byquery", "indices:monitor/data_stream/mappings/get", - "indices:monitor/data_stream/mappings/update", "indices:monitor/data_stream/settings/get", - "indices:monitor/data_stream/settings/update", "indices:monitor/data_stream/stats", "indices:monitor/field_usage_stats", "indices:monitor/fleet/global_checkpoints[s]", @@ -644,6 +642,7 @@ public class Constants { "indices:admin/data_stream/index/reindex", "indices:admin/data_stream/reindex", "indices:admin/data_stream/reindex_cancel", + "indices:admin/data_stream/mappings/update", "indices:admin/data_stream/settings/update", "indices:admin/index/create_from_source", "indices:admin/index/copy_lifecycle_index_metadata", From c35e8d57881c5ee06e4354b8d1518735135abb66 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 28 May 2025 17:42:32 -0500 Subject: [PATCH 11/18] testing --- .../datastreams/GetDataStreamActionTests.java | 6 + .../GetDataStreamMappingsActionTests.java | 119 ++++++++++++++++++ .../cluster/metadata/DataStreamTests.java | 48 ++++++- 3 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java index 9aa1a2f2a2c07..c90669650a949 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings; import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomSettings; import static org.hamcrest.Matchers.equalTo; @@ -64,6 +66,8 @@ public void testDataStreamInfoToXContent() throws IOException { assertThat(lifecycleResult.get("retention_determined_by"), equalTo("max_global_retention")); Map> settingsMap = (Map>) resultMap.get("settings"); assertThat(Settings.builder().loadFromMap(settingsMap).build(), equalTo(dataStreamInfo.getDataStream().getSettings())); + Map mappingsMap = (Map) resultMap.get("mappings"); + assertThat(new CompressedXContent(mappingsMap), equalTo(dataStreamInfo.getDataStream().getMappings())); } } @@ -105,6 +109,7 @@ private static DataStream newDataStreamInstance(boolean isSystem, TimeValue rete List indices = List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))); DataStreamLifecycle lifecycle = DataStreamLifecycle.createDataLifecycle(true, retention, null); Settings settings = randomSettings(); + CompressedXContent mappings = randomMappings(); return DataStream.builder(randomAlphaOfLength(50), indices) .setGeneration(randomLongBetween(1, 1000)) .setMetadata(Map.of()) @@ -113,6 +118,7 @@ private static DataStream newDataStreamInstance(boolean isSystem, TimeValue rete .setReplicated(randomBoolean()) .setLifecycle(lifecycle) .setSettings(settings) + .setMappings(mappings) .build(); } } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java new file mode 100644 index 0000000000000..1a580ddf1a08b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class GetDataStreamMappingsActionTests extends ESTestCase { + + public void testResponseToXContentEmpty() throws IOException { + List responseList = new ArrayList<>(); + GetDataStreamMappingsAction.Response response = new GetDataStreamMappingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat(xContentMap, equalTo(Map.of("data_streams", List.of()))); + } + } + + public void testResponseToXContent() throws IOException { + Map dataStream1Mappings = Map.of( + "properties", + Map.of("field2", Map.of("type", "text"), "field3", Map.of("type", "keyword")) + ); + Map dataStream1EffectiveMappings = Map.of( + "properties", + Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "text"), "field3", Map.of("type", "keyword")) + ); + Map dataStream2Mappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "text"), "field5", Map.of("type", "keyword")) + ); + Map dataStream2EffectiveMappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "text"), "field5", Map.of("type", "keyword"), "field6", Map.of("type", "keyword")) + ); + GetDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse1 = + new GetDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream1", + new CompressedXContent(dataStream1Mappings), + new CompressedXContent(dataStream1EffectiveMappings) + ); + GetDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse2 = + new GetDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream2", + new CompressedXContent(dataStream2Mappings), + new CompressedXContent(dataStream2EffectiveMappings) + ); + List responseList = List.of( + DataStreamMappingsResponse1, + DataStreamMappingsResponse2 + ); + GetDataStreamMappingsAction.Response response = new GetDataStreamMappingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat( + xContentMap, + equalTo( + Map.of( + "data_streams", + List.of( + Map.of( + "name", + "dataStream1", + "mappings", + dataStream1Mappings, + "effective_mappings", + dataStream1EffectiveMappings + ), + Map.of( + "name", + "dataStream2", + "mappings", + dataStream2Mappings, + "effective_mappings", + dataStream2EffectiveMappings + ) + ) + ) + ) + ); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 614ad930fc66a..340bd8aae2949 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2606,13 +2606,18 @@ public void testGetEffectiveIndexTemplate() throws IOException { .put("index.setting2", "dataStreamValue") .put("index.setting3", (String) null) // This one gets removed from the effective settings .build(); - DataStream dataStream = createDataStream(dataStreamSettings, DataStream.EMPTY_MAPPINGS); + CompressedXContent dataStreamMappings = new CompressedXContent( + Map.of("properties", Map.of("field2", Map.of("type", "text"), "field3", Map.of("type", "keyword"))) + ); + DataStream dataStream = createDataStream(dataStreamSettings, dataStreamMappings); Settings templateSettings = Settings.builder() .put("index.setting1", "templateValue") .put("index.setting3", "templateValue") .put("index.setting4", "templateValue") .build(); - CompressedXContent templateMappings = randomMappings(); + CompressedXContent templateMappings = new CompressedXContent( + Map.of("_doc", Map.of("properties", Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "keyword")))) + ); Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings); ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder() .indexPatterns(List.of(dataStream.getName())) @@ -2626,7 +2631,44 @@ public void testGetEffectiveIndexTemplate() throws IOException { .put("index.setting2", "dataStreamValue") .put("index.setting4", "templateValue") .build(); - Template.Builder expectedTemplateBuilder = Template.builder().settings(mergedSettings).mappings(templateMappings); + CompressedXContent mergedMappings = new CompressedXContent( + Map.of( + "properties", + Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "text"), "field3", Map.of("type", "keyword")) + ) + ); + Template.Builder expectedTemplateBuilder = Template.builder().settings(mergedSettings).mappings(mergedMappings); + ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStream.getName())) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .template(expectedTemplateBuilder) + .build(); + assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate)); + } + + public void testGetEffectiveMappingsNoMatchingTemplate() { + // No matching template, so we expect an IllegalArgumentException + DataStream dataStream = createTestInstance(); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()); + assertThrows(IllegalArgumentException.class, () -> dataStream.getEffectiveMappings(projectMetadataBuilder.build())); + } + + public void testGetEffectiveIndexTemplateDataStreamMappingsOnly() throws IOException { + // We only have mappings from the data stream, so we expect to get only those back in the effective template + CompressedXContent dataStreamMappings = randomMappings(); + DataStream dataStream = createDataStream(Settings.EMPTY, dataStreamMappings); + Settings templateSettings = Settings.EMPTY; + CompressedXContent templateMappings = new CompressedXContent(Map.of("_doc", Map.of())); + ; + Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings); + ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStream.getName())) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .template(templateBuilder) + .build(); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()) + .indexTemplates(Map.of(dataStream.getName(), indexTemplate)); + Template.Builder expectedTemplateBuilder = Template.builder().settings(templateSettings).mappings(dataStreamMappings); ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder() .indexPatterns(List.of(dataStream.getName())) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) From 2367a1b6d954aea769e2d2e270c9ad530f85b3d9 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 29 May 2025 16:52:13 -0500 Subject: [PATCH 12/18] adding tests --- .../UpdateDataStreamMappingsAction.java | 4 + ...eDataStreamMappingsActionRequestTests.java | 78 ++++++++ ...DataStreamMappingsActionResponseTests.java | 170 ++++++++++++++++++ 3 files changed, 252 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java index 467ea9080e834..561e59f3eed36 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java @@ -72,6 +72,10 @@ public boolean includeDataStreams() { return true; } + public boolean isDryRun() { + return dryRun; + } + public Request(StreamInput in) throws IOException { super(in); this.dataStreamNames = in.readStringArray(); diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java new file mode 100644 index 0000000000000..83305174fc507 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.cluster.metadata.ComponentTemplateTests; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Supplier; + +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings; + +public class UpdateDataStreamMappingsActionRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return UpdateDataStreamMappingsAction.Request::new; + } + + @Override + protected UpdateDataStreamMappingsAction.Request createTestInstance() { + UpdateDataStreamMappingsAction.Request request = new UpdateDataStreamMappingsAction.Request( + randomMappings(), + randomBoolean(), + randomTimeValue(), + randomTimeValue() + ); + request.indices(randomIndices()); + return request; + } + + @Override + protected UpdateDataStreamMappingsAction.Request mutateInstance(UpdateDataStreamMappingsAction.Request instance) throws IOException { + String[] indices = instance.indices(); + CompressedXContent mappings = instance.getMappings(); + boolean dryRun = instance.isDryRun(); + TimeValue masterNodeTimeout = instance.masterNodeTimeout(); + TimeValue ackTimeout = instance.ackTimeout(); + switch (between(0, 4)) { + case 0 -> { + indices = randomArrayValueOtherThan(indices, this::randomIndices); + } + case 1 -> { + mappings = randomValueOtherThan(mappings, ComponentTemplateTests::randomMappings); + } + case 2 -> { + dryRun = dryRun == false; + } + case 3 -> { + masterNodeTimeout = randomValueOtherThan(masterNodeTimeout, ESTestCase::randomTimeValue); + } + case 4 -> { + ackTimeout = randomValueOtherThan(ackTimeout, ESTestCase::randomTimeValue); + } + default -> throw new AssertionError("Should not be here"); + } + return new UpdateDataStreamMappingsAction.Request(mappings, dryRun, masterNodeTimeout, ackTimeout).indices(indices); + } + + private String[] randomIndices() { + return randomList(10, () -> randomAlphaOfLength(20)).toArray(new String[0]); + } + + public static T[] randomArrayValueOtherThan(T[] input, Supplier randomSupplier) { + return randomValueOtherThanMany(v -> Arrays.equals(input, v), randomSupplier); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java new file mode 100644 index 0000000000000..e7f4be8138a84 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings; +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class UpdateDataStreamMappingsActionResponseTests extends AbstractWireSerializingTestCase { + + public void testToXContent() throws IOException { + Map dataStream1Mappings = Map.of( + "properties", + Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "keyword")) + ); + Map dataStream1EffectiveMappings = Map.of( + "properties", + Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "keyword"), "field3", Map.of("type", "keyword")) + ); + Map dataStream2Mappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "keyword"), "field5", Map.of("type", "keyword")) + ); + Map dataStream2EffectiveMappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "keyword"), "field5", Map.of("type", "keyword"), "field6", Map.of("type", "keyword")) + ); + boolean dataStream1Succeeded = randomBoolean(); + String dataStream1Error = randomBoolean() ? null : randomAlphaOfLength(20); + boolean dataStream2Succeeded = randomBoolean(); + String dataStream2Error = randomBoolean() ? null : randomAlphaOfLength(20); + UpdateDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse1 = + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream1", + dataStream1Succeeded, + dataStream1Error, + new CompressedXContent(dataStream1Mappings), + new CompressedXContent(dataStream1EffectiveMappings) + ); + UpdateDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse2 = + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream2", + dataStream2Succeeded, + dataStream2Error, + new CompressedXContent(dataStream2Mappings), + new CompressedXContent(dataStream2EffectiveMappings) + ); + List responseList = List.of( + DataStreamMappingsResponse1, + DataStreamMappingsResponse2 + ); + UpdateDataStreamMappingsAction.Response response = new UpdateDataStreamMappingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat( + xContentMap, + equalTo( + Map.of( + "data_streams", + List.of( + buildExpectedMap( + "dataStream1", + dataStream1Succeeded, + dataStream1Error, + dataStream1Mappings, + dataStream1EffectiveMappings + ), + buildExpectedMap( + "dataStream2", + dataStream2Succeeded, + dataStream2Error, + dataStream2Mappings, + dataStream2EffectiveMappings + ) + ) + ) + ) + ); + } + } + + @Override + protected Writeable.Reader instanceReader() { + return UpdateDataStreamMappingsAction.Response::new; + } + + @Override + protected UpdateDataStreamMappingsAction.Response createTestInstance() { + return new UpdateDataStreamMappingsAction.Response(randomList(10, this::randomDataStreamMappingsResponse)); + } + + @Override + protected UpdateDataStreamMappingsAction.Response mutateInstance(UpdateDataStreamMappingsAction.Response instance) throws IOException { + List responseList = instance.getDataStreamMappingsResponses(); + List mutatedResponseList = new ArrayList<>(responseList); + switch (between(0, 1)) { + case 0 -> { + if (responseList.isEmpty()) { + mutatedResponseList.add(randomDataStreamMappingsResponse()); + } else { + mutatedResponseList.remove(randomInt(responseList.size() - 1)); + } + } + case 1 -> { + mutatedResponseList.add(randomDataStreamMappingsResponse()); + } + default -> throw new AssertionError("Should not be here"); + } + return new UpdateDataStreamMappingsAction.Response(mutatedResponseList); + } + + private Map buildExpectedMap( + String name, + boolean succeeded, + String error, + Map mappings, + Map effectiveMappings + ) { + Map result = new HashMap<>(); + result.put("name", name); + result.put("applied_to_data_stream", succeeded); + if (error != null) { + result.put("error", error); + } + result.put("mappings", mappings); + result.put("effective_mappings", effectiveMappings); + Map indexSettingsResults = new HashMap<>(); + return result; + } + + private UpdateDataStreamMappingsAction.DataStreamMappingsResponse randomDataStreamMappingsResponse() { + return new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream1", + randomBoolean(), + randomBoolean() ? null : randomAlphaOfLength(20), + randomMappings(), + randomMappings() + ); + } +} From f6fb5dbf3eb02bed77db1e235b4803317c89fed0 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 29 May 2025 17:22:47 -0500 Subject: [PATCH 13/18] adding dryrun --- ...ansportUpdateDataStreamMappingsAction.java | 129 ++++++++---------- .../metadata/MetadataDataStreamsService.java | 85 ++++++++---- 2 files changed, 116 insertions(+), 98 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java index 76db8a2214e17..5bb0b4d7d61a8 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java @@ -15,12 +15,10 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; @@ -98,18 +96,15 @@ protected void masterOperation( IndicesOptions.DEFAULT, request.indices() ); - List dataStreamSettingsResponse = new ArrayList<>(); - CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size() + 1, new ActionListener<>() { - @Override - public void onResponse(Void unused) { - listener.onResponse(new UpdateDataStreamMappingsAction.Response(dataStreamSettingsResponse)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + List dataStreamMappingsResponse = new ArrayList<>(); + CountDownActionListener countDownListener = new CountDownActionListener( + dataStreamNames.size() + 1, + listener.delegateFailure( + (responseActionListener, unused) -> responseActionListener.onResponse( + new UpdateDataStreamMappingsAction.Response(dataStreamMappingsResponse) + ) + ) + ); countDownListener.onResponse(null); for (String dataStreamName : dataStreamNames) { updateSingleDataStream( @@ -117,27 +112,22 @@ public void onFailure(Exception e) { request.getMappings(), request.masterNodeTimeout(), request.ackTimeout(), - new ActionListener<>() { - @Override - public void onResponse(UpdateDataStreamMappingsAction.DataStreamMappingsResponse dataStreamResponse) { - dataStreamSettingsResponse.add(dataStreamResponse); - countDownListener.onResponse(null); - } - - @Override - public void onFailure(Exception e) { - dataStreamSettingsResponse.add( - new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( - dataStreamName, - false, - e.getMessage(), - Mapping.EMPTY.toCompressedXContent(), - Mapping.EMPTY.toCompressedXContent() - ) - ); - countDownListener.onResponse(null); - } - } + request.isDryRun(), + ActionListener.wrap(dataStreamResponse -> { + dataStreamMappingsResponse.add(dataStreamResponse); + countDownListener.onResponse(null); + }, e -> { + dataStreamMappingsResponse.add( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + e.getMessage(), + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + countDownListener.onResponse(null); + }) ); } } @@ -147,6 +137,7 @@ private void updateSingleDataStream( CompressedXContent mappingsOverrides, TimeValue masterNodeTimeout, TimeValue ackTimeout, + boolean dryRun, ActionListener listener ) { logger.debug("updating mappings for {}", dataStreamName); @@ -168,56 +159,44 @@ private void updateSingleDataStream( ackTimeout, dataStreamName, mappingsOverrides, - new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - if (acknowledgedResponse.isAcknowledged()) { - DataStream dataStream = clusterService.state() - .projectState(projectResolver.getProjectId()) - .metadata() - .dataStreams() - .get(dataStreamName); - try { - listener.onResponse( - new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( - dataStreamName, - true, - null, - mappingsOverrides, - dataStream.getEffectiveMappings( - clusterService.state().projectState(projectResolver.getProjectId()).metadata() - ) - ) - ); - } catch (IOException e) { - listener.onResponse( - new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( - dataStreamName, - false, - e.getMessage(), - Mapping.EMPTY.toCompressedXContent(), - Mapping.EMPTY.toCompressedXContent() + dryRun, + listener.delegateFailure((dataStreamMappingsResponseActionListener, dataStream) -> { + if (dataStream != null) { + try { + dataStreamMappingsResponseActionListener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + true, + null, + mappingsOverrides, + dataStream.getEffectiveMappings( + clusterService.state().projectState(projectResolver.getProjectId()).metadata() ) - ); - } - } else { - listener.onResponse( + ) + ); + } catch (IOException e) { + dataStreamMappingsResponseActionListener.onResponse( new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( dataStreamName, false, - "Updating mappings not accepted for unknown reasons", + e.getMessage(), Mapping.EMPTY.toCompressedXContent(), Mapping.EMPTY.toCompressedXContent() ) ); } + } else { + dataStreamMappingsResponseActionListener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + "Updating mappings not accepted for unknown reasons", + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - } + }) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 507b34743d085..487b6407519eb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -173,24 +173,17 @@ public Tuple executeTask( UpdateMappingsTask updateMappingsTask, ClusterState clusterState ) throws Exception { - + DataStream dataStream = createDataStreamForUpdatedDataStreamMappings( + updateMappingsTask.projectId, + updateMappingsTask.dataStreamName, + updateMappingsTask.mappingsOverrides, + clusterState + ); ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateMappingsTask.projectId); ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); - Map dataStreamMap = projectMetadata.dataStreams(); - DataStream dataStream = dataStreamMap.get(updateMappingsTask.dataStreamName); - - final ComposableIndexTemplate template = lookupTemplateForDataStream(updateMappingsTask.dataStreamName, projectMetadata); - ComposableIndexTemplate mergedTemplate = template.mergeMappings(updateMappingsTask.mappingsOverrides); - MetadataIndexTemplateService.validateTemplate( - mergedTemplate.template().settings(), - mergedTemplate.template().mappings(), - indicesService - ); - DataStream.Builder dataStreamBuilder = dataStream.copy().setMappings(updateMappingsTask.mappingsOverrides); projectMetadataBuilder.removeDataStream(updateMappingsTask.dataStreamName); - projectMetadataBuilder.put(dataStreamBuilder.build()); + projectMetadataBuilder.put(dataStream); ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build(); - return new Tuple<>(updatedClusterState, updateMappingsTask); } }; @@ -497,7 +490,7 @@ private DataStream createDataStreamForUpdatedDataStreamSettings( ComposableIndexTemplate mergedTemplate = template.mergeSettings(mergedSettings); MetadataIndexTemplateService.validateTemplate( mergedTemplate.template().settings(), - mergedTemplate.template().mappings(), + dataStream.getEffectiveMappings(projectMetadata), indicesService ); @@ -505,19 +498,58 @@ private DataStream createDataStreamForUpdatedDataStreamSettings( return dataStream.copy().setSettings(mergedSettings).build(); } + private DataStream createDataStreamForUpdatedDataStreamMappings( + ProjectId projectId, + String dataStreamName, + CompressedXContent mappingsOverrides, + ClusterState clusterState + ) throws Exception { + ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId); + Map dataStreamMap = projectMetadata.dataStreams(); + DataStream dataStream = dataStreamMap.get(dataStreamName); + + final ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, projectMetadata); + ComposableIndexTemplate mergedTemplate = template.mergeMappings(mappingsOverrides); + MetadataIndexTemplateService.validateTemplate( + dataStream.getEffectiveSettings(projectMetadata), + mergedTemplate.template().mappings(), + indicesService + ); + return dataStream.copy().setMappings(mappingsOverrides).build(); + } + public void updateMappings( ProjectId projectId, TimeValue masterNodeTimeout, TimeValue ackTimeout, String dataStreamName, CompressedXContent mappingsOverrides, - ActionListener listener + boolean dryRun, + ActionListener listener ) { - updateMappingsTaskQueue.submitTask( - "updating mappings on data stream", - new UpdateMappingsTask(projectId, dataStreamName, mappingsOverrides, ackTimeout, listener), - masterNodeTimeout - ); + if (dryRun) { + /* + * If this is a dry run, we'll do the settings validation and apply the changes to the data stream locally, but we won't run + * the task that actually updates the cluster state. + */ + try { + DataStream updatedDataStream = createDataStreamForUpdatedDataStreamMappings( + projectId, + dataStreamName, + mappingsOverrides, + clusterService.state() + ); + listener.onResponse(updatedDataStream); + } catch (Exception e) { + listener.onFailure(e); + } + } else { + updateMappingsTaskQueue.submitTask( + "updating mappings on data stream", + new UpdateMappingsTask(projectId, dataStreamName, mappingsOverrides, clusterService, ackTimeout, listener), + masterNodeTimeout + ); + } } private static void addBackingIndex( @@ -800,10 +832,17 @@ static class UpdateMappingsTask extends AckedBatchedClusterStateUpdateTask { ProjectId projectId, String dataStreamName, CompressedXContent mappingsOverrides, + ClusterService clusterService, TimeValue ackTimeout, - ActionListener listener + ActionListener listener ) { - super(ackTimeout, listener); + super(ackTimeout, listener.safeMap(response -> { + if (response.isAcknowledged()) { + return clusterService.state().projectState(projectId).metadata().dataStreams().get(dataStreamName); + } else { + throw new ElasticsearchException("Updating mappings not accepted for unknown reasons"); + } + })); this.projectId = projectId; this.dataStreamName = dataStreamName; this.mappingsOverrides = mappingsOverrides; From 3455e4f1c9627fe20985929fd934cfcfafd572c2 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 30 May 2025 08:02:02 -0500 Subject: [PATCH 14/18] making it serverless-friendly --- .../rest-api-spec/test/data_stream/250_data_stream_mappings.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml index d64f5e6b94612..03cef1699d044 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml @@ -18,7 +18,6 @@ setup: template: settings: number_of_replicas: 0 - lifecycle.name: my-policy mappings: properties: field1: From 50b67eb60823634bee3014dba7298fc9e20641a4 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 30 May 2025 12:23:37 -0500 Subject: [PATCH 15/18] avoiding nulls --- .../cluster/metadata/ComposableIndexTemplate.java | 2 +- .../org/elasticsearch/cluster/metadata/DataStream.java | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index e823f7d70542d..5fde008fbb599 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -346,7 +346,7 @@ public ComposableIndexTemplate mergeSettings(Settings settings) { public ComposableIndexTemplate mergeMappings(CompressedXContent mappings) throws IOException { Objects.requireNonNull(mappings); - if (Mapping.EMPTY.toCompressedXContent().equals(mappings)) { + if (Mapping.EMPTY.toCompressedXContent().equals(mappings) && this.template() != null && this.template().mappings() != null) { return this; } ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = this.toBuilder(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 800f05497a010..d6c989dd58efa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -419,12 +419,7 @@ public Settings getEffectiveSettings(ProjectMetadata projectMetadata) { } public CompressedXContent getEffectiveMappings(ProjectMetadata projectMetadata) throws IOException { - CompressedXContent mergedMappings = getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings(); - if (mergedMappings == null) { - return EMPTY_MAPPINGS; - } else { - return mergedMappings; - } + return getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings(); } private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) { From cce704eaa4d46578c1f13e493d754409fdd62dfd Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 30 May 2025 12:53:48 -0500 Subject: [PATCH 16/18] Adding tests showing what works and what does not --- .../ComposableIndexTemplateTests.java | 3 ++ .../cluster/metadata/DataStreamTests.java | 48 +++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplateTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplateTests.java index e75d8dd9fbcab..dd458847baeda 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplateTests.java @@ -325,12 +325,14 @@ public void testMergeSettings() { .put("index.setting3", "templateValue") .put("index.setting4", "templateValue") .build(); + List componentTemplates = List.of("component_template_1"); CompressedXContent templateMappings = randomMappings(randomDataStreamTemplate()); Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings); ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder() .indexPatterns(List.of(dataStreamName)) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .template(templateBuilder) + .componentTemplates(componentTemplates) .build(); Settings mergedSettings = Settings.builder() .put("index.setting1", "dataStreamValue") @@ -342,6 +344,7 @@ public void testMergeSettings() { .indexPatterns(List.of(dataStreamName)) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .template(expectedTemplateBuilder) + .componentTemplates(componentTemplates) .build(); assertThat(indexTemplate.mergeSettings(dataStreamSettings), equalTo(expectedEffectiveTemplate)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 85b33e61ce8f4..7aae28a6a7043 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2534,13 +2534,31 @@ public void testGetEffectiveSettings() { .indexPatterns(List.of(dataStream.getName())) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .template(templateBuilder) + .componentTemplates(List.of("component-template-1")) .build(); ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()) - .indexTemplates(Map.of(dataStream.getName(), indexTemplate)); + .indexTemplates(Map.of(dataStream.getName(), indexTemplate)) + .componentTemplates( + Map.of( + "component-template-1", + new ComponentTemplate( + Template.builder() + .settings( + Settings.builder() + .put("index.setting1", "componentTemplateValue") + .put("index.setting5", "componentTemplateValue") + ) + .build(), + 1L, + Map.of() + ) + ) + ); Settings mergedSettings = Settings.builder() .put("index.setting1", "dataStreamValue") .put("index.setting2", "dataStreamValue") .put("index.setting4", "templateValue") + .put("index.setting5", "componentTemplateValue") .build(); assertThat(dataStream.getEffectiveSettings(projectMetadataBuilder.build()), equalTo(mergedSettings)); } @@ -2561,9 +2579,20 @@ public void testGetEffectiveIndexTemplateTemplateSettingsOnly() { .indexPatterns(List.of(dataStream.getName())) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .template(templateBuilder) + .componentTemplates(List.of("component-template-1")) .build(); ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()) - .indexTemplates(Map.of(dataStream.getName(), indexTemplate)); + .indexTemplates(Map.of(dataStream.getName(), indexTemplate)) + .componentTemplates( + Map.of( + "component-template-1", + new ComponentTemplate( + Template.builder().settings(Settings.builder().put("index.setting5", "componentTemplateValue")).build(), + 1L, + Map.of() + ) + ) + ); assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(indexTemplate)); } @@ -2574,6 +2603,7 @@ public void testGetEffectiveIndexTemplateDataStreamSettingsOnly() { Settings templateSettings = Settings.EMPTY; CompressedXContent templateMappings = randomMappings(); Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings); + ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder() .indexPatterns(List.of(dataStream.getName())) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) @@ -2609,13 +2639,25 @@ public void testGetEffectiveIndexTemplate() { .indexPatterns(List.of(dataStream.getName())) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .template(templateBuilder) + .componentTemplates(List.of("component-template-1")) .build(); ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()) - .indexTemplates(Map.of(dataStream.getName(), indexTemplate)); + .indexTemplates(Map.of(dataStream.getName(), indexTemplate)) + .componentTemplates( + Map.of( + "component-template-1", + new ComponentTemplate( + Template.builder().settings(Settings.builder().put("index.setting5", "componentTemplateValue")).build(), + 1L, + Map.of() + ) + ) + ); Settings mergedSettings = Settings.builder() .put("index.setting1", "dataStreamValue") .put("index.setting2", "dataStreamValue") .put("index.setting4", "templateValue") + .put("index.setting5", "componentTemplateValue") .build(); Template.Builder expectedTemplateBuilder = Template.builder().settings(mergedSettings).mappings(templateMappings); ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder() From 967d9a8617e71a06093dc371b8a8e684ba671f18 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 30 May 2025 13:01:37 -0500 Subject: [PATCH 17/18] fixing test --- .../org/elasticsearch/cluster/metadata/DataStreamTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 7aae28a6a7043..9f7be331c0f81 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2635,11 +2635,12 @@ public void testGetEffectiveIndexTemplate() { .build(); CompressedXContent templateMappings = randomMappings(); Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings); + List componentTemplates = List.of("component-template-1"); ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder() .indexPatterns(List.of(dataStream.getName())) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .template(templateBuilder) - .componentTemplates(List.of("component-template-1")) + .componentTemplates(componentTemplates) .build(); ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault()) .indexTemplates(Map.of(dataStream.getName(), indexTemplate)) @@ -2657,13 +2658,13 @@ public void testGetEffectiveIndexTemplate() { .put("index.setting1", "dataStreamValue") .put("index.setting2", "dataStreamValue") .put("index.setting4", "templateValue") - .put("index.setting5", "componentTemplateValue") .build(); Template.Builder expectedTemplateBuilder = Template.builder().settings(mergedSettings).mappings(templateMappings); ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder() .indexPatterns(List.of(dataStream.getName())) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .template(expectedTemplateBuilder) + .componentTemplates(componentTemplates) .build(); assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate)); } From cc570b39528069b9e0b86a5501add30e495346fd Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 27 Jun 2025 15:25:08 -0500 Subject: [PATCH 18/18] removing extra line --- .../action/TransportUpdateDataStreamMappingsAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java index 72960450308ec..0fc176201ff27 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java @@ -42,7 +42,6 @@ public class TransportUpdateDataStreamMappingsAction extends TransportMasterNode UpdateDataStreamMappingsAction.Request, UpdateDataStreamMappingsAction.Response> { private static final Logger logger = LogManager.getLogger(TransportUpdateDataStreamMappingsAction.class); - private final IndexNameExpressionResolver indexNameExpressionResolver; private final MetadataDataStreamsService metadataDataStreamsService; private final IndexNameExpressionResolver indexNameExpressionResolver; private final SystemIndices systemIndices;