From d265feccd39fad4731676126802648088c5b35ba Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 30 Oct 2025 11:57:10 -0400 Subject: [PATCH 1/5] [Transform] Preview index request Preview currently shows the source document as the index request. There are some internal components that want both the source document and the id of the generated entities, so we've added a query param that controls the output format. `as_index_request` defaults to false to show the existing structure, and true will show the response that the transform would use to index the generated entities. --- .../transform_preview_as_index_request.csv | 1 + .../resources/transport/upper_bounds/9.3.csv | 2 +- .../xpack/core/transform/TransformField.java | 3 + .../action/PreviewTransformAction.java | 27 ++- .../AbstractSerializingTransformTestCase.java | 28 +-- .../PreviewTransformActionRequestTests.java | 40 ++++- .../integration/TransformLatestRestIT.java | 23 +++ .../integration/TransformPivotRestIT.java | 48 ++++++ .../TransformNoRemoteClusterClientNodeIT.java | 2 +- .../TransformNoTransformNodeIT.java | 2 +- .../TransportPreviewTransformAction.java | 162 ++++++++++-------- .../action/RestPreviewTransformAction.java | 14 +- .../common/AbstractCompositeAggFunction.java | 28 +-- .../transforms/pivot/PivotTests.java | 33 +++- 14 files changed, 277 insertions(+), 136 deletions(-) create mode 100644 server/src/main/resources/transport/definitions/referable/transform_preview_as_index_request.csv diff --git a/server/src/main/resources/transport/definitions/referable/transform_preview_as_index_request.csv b/server/src/main/resources/transport/definitions/referable/transform_preview_as_index_request.csv new file mode 100644 index 0000000000000..9b32a5cf8b643 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/transform_preview_as_index_request.csv @@ -0,0 +1 @@ +9211000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 88b7f13fd8f9f..f1bbf715d6e41 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -add_sample_method_downsample_ilm,9210000 +transform_preview_as_index_request,9211000 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index d7f6a46eada3a..cd24ebde92c49 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -60,6 +60,7 @@ public final class TransformField { public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); public static final ParseField CHECK_FOR_DANGLING_TASKS = new ParseField("check_dangling_tasks"); + public static final ParseField PREVIEW_AS_INDEX_REQUEST = new ParseField("as_index_request"); /** * Fields for checkpointing */ @@ -102,6 +103,8 @@ public final class TransformField { // internal document id public static final String DOCUMENT_ID_FIELD = "_id"; + // internal document source + public static final String DOCUMENT_SOURCE_FIELD = "_source"; public static final PersistentTasksCustomMetadata.Assignment AWAITING_UPGRADE = new PersistentTasksCustomMetadata.Assignment( null, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java index adebbba651f16..d8706d7bd9b67 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; @@ -55,19 +56,24 @@ private PreviewTransformAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { + static final TransportVersion PREVIEW_AS_INDEX_REQUEST = TransportVersion.fromName("transform_preview_as_index_request"); private final TransformConfig config; + private final boolean previewAsIndexRequest; - public Request(TransformConfig config, TimeValue timeout) { + public Request(TransformConfig config, TimeValue timeout, boolean previewAsIndexRequest) { super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, timeout); this.config = config; + this.previewAsIndexRequest = previewAsIndexRequest; } public Request(StreamInput in) throws IOException { super(in); this.config = new TransformConfig(in); + this.previewAsIndexRequest = in.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST) ? in.readBoolean() : false; } - public static Request fromXContent(final XContentParser parser, TimeValue timeout) throws IOException { + public static Request fromXContent(final XContentParser parser, TimeValue timeout, boolean previewAsIndexRequest) + throws IOException { Map content = parser.map(); // dest.index is not required for _preview, so we just supply our own Map tempDestination = new HashMap<>(); @@ -89,7 +95,7 @@ public static Request fromXContent(final XContentParser parser, TimeValue timeou XContentType.JSON ) ) { - return new Request(TransformConfig.fromXContent(newParser, null, false), timeout); + return new Request(TransformConfig.fromXContent(newParser, null, false), timeout, previewAsIndexRequest); } } @@ -115,15 +121,22 @@ public TransformConfig getConfig() { return config; } + public boolean previewAsIndexRequest() { + return previewAsIndexRequest; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); this.config.writeTo(out); + if (out.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST)) { + out.writeBoolean(previewAsIndexRequest); + } } @Override public int hashCode() { - return Objects.hash(config); + return Objects.hash(config, previewAsIndexRequest); } @Override @@ -135,7 +148,7 @@ public boolean equals(Object obj) { return false; } Request other = (Request) obj; - return Objects.equals(config, other.config); + return Objects.equals(config, other.config) && (previewAsIndexRequest == other.previewAsIndexRequest); } @Override @@ -170,10 +183,6 @@ public List> getDocs() { return docs; } - public TransformDestIndexSettings getGeneratedDestIndexSettings() { - return generatedDestIndexSettings; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeInt(docs.size()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java index 0a4f6ea33eac6..440639d386f95 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java @@ -8,17 +8,14 @@ package org.elasticsearch.xpack.core.transform; import org.elasticsearch.TransportVersion; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.BaseAggregationBuilder; -import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.test.AbstractBWCSerializationTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; @@ -30,14 +27,12 @@ import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.junit.Before; -import java.io.IOException; import java.util.Collections; import java.util.List; import static java.util.Collections.emptyList; -public abstract class AbstractSerializingTransformTestCase extends AbstractXContentSerializingTestCase< - T> { +public abstract class AbstractSerializingTransformTestCase extends AbstractBWCSerializationTestCase { protected static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams( Collections.singletonMap(TransformField.FOR_INTERNAL_STORAGE, "true") @@ -111,22 +106,9 @@ protected NamedXContentRegistry xContentRegistry() { return namedXContentRegistry; } - protected Y writeAndReadBWCObject( - X original, - NamedWriteableRegistry registry, - Writeable.Writer writer, - Writeable.Reader reader, - TransportVersion version - ) throws IOException { - try (BytesStreamOutput output = new BytesStreamOutput()) { - output.setTransportVersion(version); - original.writeTo(output); - - try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) { - in.setTransportVersion(version); - return reader.read(in); - } - } + @Override + protected T mutateInstanceForVersion(T instance, TransportVersion version) { + return instance; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java index a200258d7ce1d..4220df68e22f0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.Writeable; @@ -16,6 +17,8 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.DeprecationHandler; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request; @@ -36,7 +39,7 @@ public class PreviewTransformActionRequestTests extends AbstractSerializingTrans @Override protected Request doParseInstance(XContentParser parser) throws IOException { - return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); } @Override @@ -46,7 +49,11 @@ protected Writeable.Reader instanceReader() { @Override protected Request createTestInstance() { - TransformConfig config = new TransformConfig( + return new Request(randomTransformConfig(), randomTimeValue(), randomBoolean()); + } + + private static TransformConfig randomTransformConfig() { + return new TransformConfig( "transform-preview", randomSourceConfig(), new DestConfig("unused-transform-preview-index", null, null), @@ -62,12 +69,31 @@ protected Request createTestInstance() { null, null ); - return new Request(config, randomTimeValue()); + } + + @Override + protected Request createXContextTestInstance(XContentType xContentType) { + return new Request(randomTransformConfig(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); } @Override protected Request mutateInstance(Request instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 + return randomBoolean() + ? new Request( + randomValueOtherThan(instance.getConfig(), PreviewTransformActionRequestTests::randomTransformConfig), + instance.ackTimeout(), + instance.previewAsIndexRequest() + ) + : new Request(instance.getConfig(), instance.ackTimeout(), instance.previewAsIndexRequest() == false); + } + + @Override + protected Request mutateInstanceForVersion(Request instance, TransportVersion version) { + if (version.supports(Request.PREVIEW_AS_INDEX_REQUEST)) { + return instance; + } else { + return new Request(instance.getConfig(), instance.ackTimeout(), false); + } } public void testParsingOverwritesIdField() throws IOException { @@ -125,13 +151,13 @@ private void testParsingOverwrites( try ( XContentParser parser = JsonXContent.jsonXContent.createParser( - xContentRegistry(), - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry()) + .withDeprecationHandler(DeprecationHandler.THROW_UNSUPPORTED_OPERATION), json.streamInput() ) ) { - Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); assertThat(request.getConfig().getId(), is(equalTo(expectedTransformId))); assertThat(request.getConfig().getDestination().getIndex(), is(equalTo(expectedDestIndex))); assertThat(request.getConfig().getDestination().getPipeline(), is(equalTo(expectedDestPipeline))); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java index af5fe9f180a4e..ed78655e22820 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java @@ -148,6 +148,29 @@ public void testLatestWithAggregateMetricDoubleAsUniqueKey() throws Exception { } } + @SuppressWarnings("unchecked") + public void testPreviewAsIndexRequest() throws IOException { + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + var createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview?as_index_request", null); + createPreviewRequest.setJsonEntity(Strings.format(""" + { + "source": { + "index": "%s" + }, + "latest": { + "unique_key": [ "user_id" ], + "sort": "@timestamp" + } + }""", REVIEWS_INDEX_NAME)); + var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest)); + var preview = (List>) previewTransformResponse.get("preview"); + preview.forEach(p -> { + assertNotNull(XContentMapValues.extractValue("_id", p)); + assertNotNull(XContentMapValues.extractValue("_source.@timestamp", p)); + assertNotNull(XContentMapValues.extractValue("_source.user_id", p)); + }); + } + public void testContinuousLatestWithFrom_NoDocs() throws Exception { testContinuousLatestWithFrom("latest_from_no_docs", "reviews_from_no_docs", "2017-02-20", 0); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index 83f731e298159..2be53c21b2587 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -1411,6 +1411,54 @@ private List previewWithOffset(String offset) throws IOException { return preview.stream().map(p -> (String) p.get("by_week")).toList(); } + @SuppressWarnings("unchecked") + public void testPreviewAsIndexRequest() throws Exception { + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + final Request createPreviewRequest = createRequestWithAuth( + "POST", + getTransformEndpoint() + "_preview?as_index_request", + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + + createPreviewRequest.setJsonEntity(Strings.format(""" + { + "source": { + "index": "%s" + }, + "pivot": { + "group_by": { + "user.id": { + "terms": { + "field": "user_id" + } + }, + "by_day": { + "date_histogram": { + "fixed_interval": "1d", + "field": "timestamp" + } + } + }, + "aggregations": { + "user.avg_rating": { + "avg": { + "field": "stars" + } + } + } + } + }""", REVIEWS_INDEX_NAME)); + + var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest)); + var preview = (List>) previewTransformResponse.get("preview"); + preview.forEach(p -> { + assertNotNull(XContentMapValues.extractValue("_id", p)); + assertNotNull(XContentMapValues.extractValue("_source.by_day", p)); + assertNotNull(XContentMapValues.extractValue("_source.user.id", p)); + assertNotNull(XContentMapValues.extractValue("_source.user.avg_rating", p)); + }); + } + public void testPivotWithMaxOnDateField() throws Exception { String transformId = "simple_date_histogram_pivot_with_max_time"; String transformIndex = "pivot_reviews_via_date_histogram_with_max_time"; diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java index 5090a00211ff4..295204c03e975 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java @@ -41,7 +41,7 @@ protected Settings nodeSettings() { public void testPreviewTransformWithRemoteIndex() { String transformId = "transform-with-remote-index"; TransformConfig config = randomConfig(transformId, "remote_cluster:my-index"); - PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, () -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet() diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java index 1ce2299df33fb..170dcb3e38123 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java @@ -63,7 +63,7 @@ public void testGetTransform() { public void testPreviewTransform() { String transformId = "transform-1"; TransformConfig config = randomConfig(transformId); - PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, () -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet() diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index 60f00da195974..32cc93d739adb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -47,12 +47,10 @@ import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Response; -import org.elasticsearch.xpack.core.transform.transforms.DestAlias; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.SyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; -import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings; import org.elasticsearch.xpack.core.transform.transforms.TransformEffectiveSettings; import org.elasticsearch.xpack.transform.TransformExtensionHolder; import org.elasticsearch.xpack.transform.persistence.TransformIndex; @@ -61,12 +59,11 @@ import org.elasticsearch.xpack.transform.transforms.TransformNodes; import org.elasticsearch.xpack.transform.utils.SourceDestValidations; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; @@ -155,9 +152,9 @@ protected void doExecute(Task task, Request request, ActionListener li config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), - config.getDestination().getAliases(), config.getSyncConfig(), config.getSettings(), + request.previewAsIndexRequest(), listener ), listener::onFailure @@ -211,89 +208,46 @@ private void getPreview( SourceConfig source, String pipeline, String dest, - List aliases, SyncConfig syncConfig, SettingsConfig settingsConfig, + boolean previewAsIndexRequest, ActionListener listener ) { - Client parentTaskClient = new ParentTaskAssigningClient(client, parentTaskId); + var parentTaskClient = new ParentTaskAssigningClient(client, parentTaskId); - final SetOnce> mappings = new SetOnce<>(); + final var mappings = new SetOnce>(); - final Map filteredHeaders = getSecurityHeadersPreferringSecondary( - threadPool, - securityContext, - clusterService.state() - ); + final var filteredHeaders = getSecurityHeadersPreferringSecondary(threadPool, securityContext, clusterService.state()); - ActionListener pipelineResponseActionListener = ActionListener.wrap(simulatePipelineResponse -> { - List> docs = new ArrayList<>(simulatePipelineResponse.getResults().size()); - List> errors = new ArrayList<>(); - for (var simulateDocumentResult : simulatePipelineResponse.getResults()) { - try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { - XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); - Map tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2(); - Map doc = (Map) XContentMapValues.extractValue("doc._source", tempMap); - if (doc != null) { - docs.add(doc); - } - Map error = (Map) XContentMapValues.extractValue("error", tempMap); - if (error != null) { - errors.add(error); - } - } - } - if (errors.isEmpty() == false) { - HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.get(0)); - } - TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( + ActionListener>> responseDocsListener = listener.delegateFailureAndWrap((l, docs) -> { + var generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( destIndexSettings, mappings.get(), transformId, Clock.systemUTC() ); + TransformConfigLinter.getWarnings(function, source, syncConfig).forEach(HeaderWarning::addWarning); + if (previewAsIndexRequest) { + l.onResponse(new Response(docs, generatedDestIndexSettings)); + } else { + l.onResponse( + new Response( + docs.stream().map(doc -> (Map) doc.get(TransformField.DOCUMENT_SOURCE_FIELD)).toList(), + generatedDestIndexSettings + ) + ); + } + }); - List warnings = TransformConfigLinter.getWarnings(function, source, syncConfig); - warnings.forEach(HeaderWarning::addWarning); - listener.onResponse(new Response(docs, generatedDestIndexSettings)); - }, listener::onFailure); - - ActionListener>> previewListener = ActionListener.wrap(docs -> { + ActionListener>> previewListener = responseDocsListener.delegateFailureAndWrap((l, docs) -> { if (pipeline == null) { - TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( - destIndexSettings, - mappings.get(), - transformId, - Clock.systemUTC() - ); - List warnings = TransformConfigLinter.getWarnings(function, source, syncConfig); - warnings.forEach(HeaderWarning::addWarning); - listener.onResponse(new Response(docs, generatedDestIndexSettings)); + l.onResponse(docs); } else { - List> results = docs.stream().map(doc -> { - Map src = new HashMap<>(); - String id = (String) doc.get(TransformField.DOCUMENT_ID_FIELD); - src.put("_source", doc); - src.put("_id", id); - src.put("_index", dest); - return src; - }).collect(Collectors.toList()); - - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - builder.field("docs", results); - builder.endObject(); - var pipelineRequest = new SimulatePipelineRequest( - ReleasableBytesReference.wrap(BytesReference.bytes(builder)), - XContentType.JSON - ); - pipelineRequest.setId(pipeline); - parentTaskClient.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener); - } + simulatePipeline(docs, pipeline, dest, parentTaskClient, l); } - }, listener::onFailure); + }); - ActionListener> deduceMappingsListener = ActionListener.wrap(deducedMappings -> { + ActionListener> deduceMappingsListener = previewListener.delegateFailureAndWrap((l, deducedMappings) -> { if (TransformEffectiveSettings.isDeduceMappingsDisabled(settingsConfig)) { mappings.set(emptyMap()); } else { @@ -307,10 +261,72 @@ private void getPreview( // Use deduced mappings for generating preview even if "settings.deduce_mappings" is set to false deducedMappings, NUMBER_OF_PREVIEW_BUCKETS, - previewListener + l ); - }, listener::onFailure); + }); function.deduceMappings(parentTaskClient, filteredHeaders, transformId, source, deduceMappingsListener); } + + @SuppressWarnings("unchecked") + private void simulatePipeline( + List> previewDocs, + String pipeline, + String dest, + Client client, + ActionListener>> listener + ) throws IOException { + ActionListener pipelineResponseActionListener = listener.delegateFailureAndWrap( + (l, simulatePipelineResponse) -> { + List> docs = new ArrayList<>(simulatePipelineResponse.getResults().size()); + List> errors = new ArrayList<>(); + for (var simulateDocumentResult : simulatePipelineResponse.getResults()) { + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + Map tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON) + .v2(); + Map sourceDoc = (Map) XContentMapValues.extractValue("doc._source", tempMap); + if (sourceDoc != null) { + docs.add( + Map.ofEntries( + Map.entry(TransformField.DOCUMENT_ID_FIELD, XContentMapValues.extractValue("doc._id", tempMap)), + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, sourceDoc) + ) + ); + } + var error = (Map) XContentMapValues.extractValue("error", tempMap); + if (error != null) { + errors.add(error); + } + } + } + if (errors.isEmpty() == false) { + HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.getFirst()); + } + l.onResponse(docs); + } + ); + + var results = previewDocs.stream() + .map( + doc -> Map.ofEntries( + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, doc.get(TransformField.DOCUMENT_SOURCE_FIELD)), + Map.entry(TransformField.DOCUMENT_ID_FIELD, doc.get(TransformField.DOCUMENT_ID_FIELD)), + Map.entry("_index", dest) + ) + ) + .toList(); + + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.field("docs", results); + builder.endObject(); + var pipelineRequest = new SimulatePipelineRequest( + ReleasableBytesReference.wrap(BytesReference.bytes(builder)), + XContentType.JSON + ); + pipelineRequest.setId(pipeline); + client.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener); + } + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java index 7aa45d37a3b41..5fedd3688375c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java @@ -13,7 +13,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; @@ -69,12 +68,15 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient ); } - TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + var timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + var previewAsIndexRequest = restRequest.paramAsBoolean(TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName(), false); SetOnce previewRequestHolder = new SetOnce<>(); if (Strings.isNullOrEmpty(transformId)) { - previewRequestHolder.set(PreviewTransformAction.Request.fromXContent(restRequest.contentOrSourceParamParser(), timeout)); + previewRequestHolder.set( + PreviewTransformAction.Request.fromXContent(restRequest.contentOrSourceParamParser(), timeout, previewAsIndexRequest) + ); } Client client = new RestCancellableNodeClient(nodeClient, restRequest.getHttpChannel()); @@ -97,7 +99,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient ) ); } else { - PreviewTransformAction.Request previewRequest = new PreviewTransformAction.Request(transforms.get(0), timeout); + PreviewTransformAction.Request previewRequest = new PreviewTransformAction.Request( + transforms.getFirst(), + timeout, + previewAsIndexRequest + ); client.execute(PreviewTransformAction.INSTANCE, previewRequest, listener); } }, listener::onFailure)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java index 2de810b2b902d..97e6357c1da03 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java @@ -38,7 +38,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; @@ -82,32 +81,37 @@ public void preview( client, TransportSearchAction.TYPE, buildSearchRequestForValidation("preview", sourceConfig, timeout, numberOfBuckets), - ActionListener.wrap(r -> { + listener.delegateFailureAndWrap((l, r) -> { try { final InternalAggregations aggregations = r.getAggregations(); if (aggregations == null) { - listener.onFailure( + l.onFailure( new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST) ); return; } final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); if (agg == null || agg.getBuckets().isEmpty()) { - listener.onResponse(Collections.emptyList()); + l.onResponse(Collections.emptyList()); return; } - TransformIndexerStats stats = new TransformIndexerStats(); - TransformProgress progress = new TransformProgress(); - List> docs = extractResults(agg, fieldTypeMap, stats, progress).map( - this::documentTransformationFunction - ).collect(Collectors.toList()); + var stats = new TransformIndexerStats(); + var progress = new TransformProgress(); + var docs = extractResults(agg, fieldTypeMap, stats, progress).map(doc -> { + var docId = (String) doc.get(TransformField.DOCUMENT_ID_FIELD); + doc = documentTransformationFunction(doc); + return Map.ofEntries( + Map.entry(TransformField.DOCUMENT_ID_FIELD, docId), + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, doc) + ); + }).toList(); - listener.onResponse(docs); + l.onResponse(docs); } catch (AggregationResultUtils.AggregationExtractionException extractionException) { - listener.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST)); + l.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST)); } - }, listener::onFailure) + }) ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java index 37817bc261be5..1608ba2d840c2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.InternalComposite; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; @@ -39,6 +40,7 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.TransformDeprecations; +import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfigTests; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; @@ -71,9 +73,11 @@ import java.util.stream.Stream; import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.transform.transforms.common.AbstractCompositeAggFunction.COMPOSITE_AGGREGATION_NAME; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -329,7 +333,16 @@ public void testPreviewForCompositeAggregation() throws Exception { SettingsConfigTests.randomSettingsConfig(), TransformConfigVersion.CURRENT, Collections.emptySet() - ); + ) { + protected Stream> extractResults( + CompositeAggregation agg, + Map fieldTypeMap, + TransformIndexerStats transformIndexerStats, + TransformProgress transformProgress + ) { + return Stream.of(Map.ofEntries(Map.entry(TransformField.DOCUMENT_ID_FIELD, "1234"), Map.entry("testKey", "testValue"))); + } + }; CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionHolder = new AtomicReference<>(); @@ -348,7 +361,15 @@ public void testPreviewForCompositeAggregation() throws Exception { } assertThat(exceptionHolder.get(), is(nullValue())); - assertThat(responseHolder.get(), is(empty())); + assertThat( + responseHolder.get(), + containsInAnyOrder( + Map.ofEntries( + Map.entry(TransformField.DOCUMENT_ID_FIELD, "1234"), + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, Map.of("testKey", "testValue")) + ) + ) + ); } private static SearchResponse searchResponseFromAggs(InternalAggregations aggs) { @@ -425,10 +446,12 @@ protected void Request request, ActionListener listener ) { - SearchResponse response = mock(SearchResponse.class); - InternalComposite compositeAggregation = mock(InternalComposite.class); + SearchResponse response = mock(); + InternalComposite compositeAggregation = mock(); + when(compositeAggregation.getName()).thenReturn(COMPOSITE_AGGREGATION_NAME); + InternalComposite.InternalBucket bucket = mock(); + when(compositeAggregation.getBuckets()).thenReturn(List.of(bucket)); when(response.getAggregations()).thenReturn(InternalAggregations.from(List.of(compositeAggregation))); - when(compositeAggregation.getBuckets()).thenReturn(new ArrayList<>()); listener.onResponse((Response) response); } } From c08e1ac97115e694bd35ab355e352e904499e1b3 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Fri, 31 Oct 2025 09:45:01 -0400 Subject: [PATCH 2/5] Update docs/changelog/137455.yaml --- docs/changelog/137455.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/137455.yaml diff --git a/docs/changelog/137455.yaml b/docs/changelog/137455.yaml new file mode 100644 index 0000000000000..4732c0f1b276f --- /dev/null +++ b/docs/changelog/137455.yaml @@ -0,0 +1,5 @@ +pr: 137455 +summary: Preview index request +area: Transform +type: enhancement +issues: [] From 799c2ea40a32586a600c829fcbe8d52a8004d157 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 5 Nov 2025 11:22:42 -0500 Subject: [PATCH 3/5] throw error when trying to write to an unsupported node --- .../action/PreviewTransformAction.java | 9 +++++ .../AbstractSerializingTransformTestCase.java | 28 ++++++++++--- .../PreviewTransformActionRequestTests.java | 40 +++++++++++++++---- 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java index d8706d7bd9b67..1fd75b44407e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; @@ -19,6 +20,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -131,6 +133,13 @@ public void writeTo(StreamOutput out) throws IOException { this.config.writeTo(out); if (out.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST)) { out.writeBoolean(previewAsIndexRequest); + } else if (previewAsIndexRequest) { + throw new ElasticsearchStatusException( + "_preview with " + + TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName() + + " set to true only works if all the nodes support it.", + RestStatus.FORBIDDEN + ); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java index 440639d386f95..0a4f6ea33eac6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/AbstractSerializingTransformTestCase.java @@ -8,14 +8,17 @@ package org.elasticsearch.xpack.core.transform; import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.BaseAggregationBuilder; -import org.elasticsearch.test.AbstractBWCSerializationTestCase; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; @@ -27,12 +30,14 @@ import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.junit.Before; +import java.io.IOException; import java.util.Collections; import java.util.List; import static java.util.Collections.emptyList; -public abstract class AbstractSerializingTransformTestCase extends AbstractBWCSerializationTestCase { +public abstract class AbstractSerializingTransformTestCase extends AbstractXContentSerializingTestCase< + T> { protected static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams( Collections.singletonMap(TransformField.FOR_INTERNAL_STORAGE, "true") @@ -106,9 +111,22 @@ protected NamedXContentRegistry xContentRegistry() { return namedXContentRegistry; } - @Override - protected T mutateInstanceForVersion(T instance, TransportVersion version) { - return instance; + protected Y writeAndReadBWCObject( + X original, + NamedWriteableRegistry registry, + Writeable.Writer writer, + Writeable.Reader reader, + TransportVersion version + ) throws IOException { + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setTransportVersion(version); + original.writeTo(output); + + try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) { + in.setTransportVersion(version); + return reader.read(in); + } + } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java index 4220df68e22f0..ded2e40aa367b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java @@ -7,11 +7,12 @@ package org.elasticsearch.xpack.core.transform.action; -import org.elasticsearch.TransportVersion; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Strings; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -29,7 +30,9 @@ import java.io.IOException; import java.util.Map; +import java.util.function.Predicate; +import static org.elasticsearch.test.BWCVersions.DEFAULT_BWC_VERSIONS; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -87,12 +90,35 @@ protected Request mutateInstance(Request instance) { : new Request(instance.getConfig(), instance.ackTimeout(), instance.previewAsIndexRequest() == false); } - @Override - protected Request mutateInstanceForVersion(Request instance, TransportVersion version) { - if (version.supports(Request.PREVIEW_AS_INDEX_REQUEST)) { - return instance; - } else { - return new Request(instance.getConfig(), instance.ackTimeout(), false); + public void testAsIndexRequestIsNotBackwardsCompatible() throws IOException { + var unsupportedVersions = DEFAULT_BWC_VERSIONS.stream() + .filter(Predicate.not(version -> version.supports(Request.PREVIEW_AS_INDEX_REQUEST))) + .toList(); + for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) { + var testInstance = createTestInstance(); + for (var unsupportedVersion : unsupportedVersions) { + if (testInstance.previewAsIndexRequest()) { + var statusException = assertThrows( + ElasticsearchStatusException.class, + () -> copyWriteable(testInstance, getNamedWriteableRegistry(), instanceReader(), unsupportedVersion) + ); + assertThat(statusException.status(), equalTo(RestStatus.FORBIDDEN)); + assertThat( + statusException.getMessage(), + equalTo("_preview with as_index_request set to true only works if all the nodes support it.") + ); + } else { + var deserializedInstance = copyWriteable( + testInstance, + getNamedWriteableRegistry(), + instanceReader(), + unsupportedVersion + ); + assertNotSame(unsupportedVersion.toString(), deserializedInstance, testInstance); + assertEquals(unsupportedVersion.toString(), deserializedInstance, testInstance); + assertEquals(unsupportedVersion.toString(), deserializedInstance.hashCode(), testInstance.hashCode()); + } + } } } From 99a952c19588f96da2d325cc83fb423636297173 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 6 Nov 2025 09:30:04 -0500 Subject: [PATCH 4/5] change error message to be more actionable --- .../xpack/core/transform/action/PreviewTransformAction.java | 4 ++-- .../transform/action/PreviewTransformActionRequestTests.java | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java index 1fd75b44407e0..62cb2e67030bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java @@ -135,9 +135,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(previewAsIndexRequest); } else if (previewAsIndexRequest) { throw new ElasticsearchStatusException( - "_preview with " + "Cannot send a _preview request with " + TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName() - + " set to true only works if all the nodes support it.", + + " to an outdated node. Please upgrade the node to 9.3.0+ and try again.", RestStatus.FORBIDDEN ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java index ded2e40aa367b..792baa6358816 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java @@ -105,7 +105,10 @@ public void testAsIndexRequestIsNotBackwardsCompatible() throws IOException { assertThat(statusException.status(), equalTo(RestStatus.FORBIDDEN)); assertThat( statusException.getMessage(), - equalTo("_preview with as_index_request set to true only works if all the nodes support it.") + equalTo( + "Cannot send a _preview request with as_index_request to an outdated node. " + + "Please upgrade the node to 9.3.0+ and try again." + ) ); } else { var deserializedInstance = copyWriteable( From 078cff9adadabdb362d925d026037fe3ae84fed7 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 6 Nov 2025 12:14:37 -0500 Subject: [PATCH 5/5] 403 -> 400 --- .../xpack/core/transform/action/PreviewTransformAction.java | 2 +- .../transform/action/PreviewTransformActionRequestTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java index 62cb2e67030bd..a084bd5443547 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java @@ -138,7 +138,7 @@ public void writeTo(StreamOutput out) throws IOException { "Cannot send a _preview request with " + TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName() + " to an outdated node. Please upgrade the node to 9.3.0+ and try again.", - RestStatus.FORBIDDEN + RestStatus.BAD_REQUEST ); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java index 792baa6358816..afb38a7266b46 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java @@ -102,7 +102,7 @@ public void testAsIndexRequestIsNotBackwardsCompatible() throws IOException { ElasticsearchStatusException.class, () -> copyWriteable(testInstance, getNamedWriteableRegistry(), instanceReader(), unsupportedVersion) ); - assertThat(statusException.status(), equalTo(RestStatus.FORBIDDEN)); + assertThat(statusException.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat( statusException.getMessage(), equalTo(