diff --git a/docs/changelog/137455.yaml b/docs/changelog/137455.yaml new file mode 100644 index 0000000000000..4732c0f1b276f --- /dev/null +++ b/docs/changelog/137455.yaml @@ -0,0 +1,5 @@ +pr: 137455 +summary: Preview index request +area: Transform +type: enhancement +issues: [] diff --git a/server/src/main/resources/transport/definitions/referable/transform_preview_as_index_request.csv b/server/src/main/resources/transport/definitions/referable/transform_preview_as_index_request.csv new file mode 100644 index 0000000000000..f81f2fd8c49c7 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/transform_preview_as_index_request.csv @@ -0,0 +1 @@ +9214000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 61602dea24d29..781de9c6e1a78 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -batched_response_might_include_reduction_failure,9213000 +transform_preview_as_index_request,9214000 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index d7f6a46eada3a..cd24ebde92c49 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -60,6 +60,7 @@ public final class TransformField { public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); public static final ParseField CHECK_FOR_DANGLING_TASKS = new ParseField("check_dangling_tasks"); + public static final ParseField PREVIEW_AS_INDEX_REQUEST = new ParseField("as_index_request"); /** * Fields for checkpointing */ @@ -102,6 +103,8 @@ public final class TransformField { // internal document id public static final String DOCUMENT_ID_FIELD = "_id"; + // internal document source + public static final String DOCUMENT_SOURCE_FIELD = "_source"; public static final PersistentTasksCustomMetadata.Assignment AWAITING_UPGRADE = new PersistentTasksCustomMetadata.Assignment( null, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java index adebbba651f16..a084bd5443547 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; @@ -18,6 +20,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -55,19 +58,24 @@ private PreviewTransformAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { + static final TransportVersion PREVIEW_AS_INDEX_REQUEST = TransportVersion.fromName("transform_preview_as_index_request"); private final TransformConfig config; + private final boolean previewAsIndexRequest; - public Request(TransformConfig config, TimeValue timeout) { + public Request(TransformConfig config, TimeValue timeout, boolean previewAsIndexRequest) { super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, timeout); this.config = config; + this.previewAsIndexRequest = previewAsIndexRequest; } public Request(StreamInput in) throws IOException { super(in); this.config = new TransformConfig(in); + this.previewAsIndexRequest = in.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST) ? in.readBoolean() : false; } - public static Request fromXContent(final XContentParser parser, TimeValue timeout) throws IOException { + public static Request fromXContent(final XContentParser parser, TimeValue timeout, boolean previewAsIndexRequest) + throws IOException { Map content = parser.map(); // dest.index is not required for _preview, so we just supply our own Map tempDestination = new HashMap<>(); @@ -89,7 +97,7 @@ public static Request fromXContent(final XContentParser parser, TimeValue timeou XContentType.JSON ) ) { - return new Request(TransformConfig.fromXContent(newParser, null, false), timeout); + return new Request(TransformConfig.fromXContent(newParser, null, false), timeout, previewAsIndexRequest); } } @@ -115,15 +123,29 @@ public TransformConfig getConfig() { return config; } + public boolean previewAsIndexRequest() { + return previewAsIndexRequest; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); this.config.writeTo(out); + if (out.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST)) { + out.writeBoolean(previewAsIndexRequest); + } else if (previewAsIndexRequest) { + throw new ElasticsearchStatusException( + "Cannot send a _preview request with " + + TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName() + + " to an outdated node. Please upgrade the node to 9.3.0+ and try again.", + RestStatus.BAD_REQUEST + ); + } } @Override public int hashCode() { - return Objects.hash(config); + return Objects.hash(config, previewAsIndexRequest); } @Override @@ -135,7 +157,7 @@ public boolean equals(Object obj) { return false; } Request other = (Request) obj; - return Objects.equals(config, other.config); + return Objects.equals(config, other.config) && (previewAsIndexRequest == other.previewAsIndexRequest); } @Override @@ -170,10 +192,6 @@ public List> getDocs() { return docs; } - public TransformDestIndexSettings getGeneratedDestIndexSettings() { - return generatedDestIndexSettings; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeInt(docs.size()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java index a200258d7ce1d..afb38a7266b46 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java @@ -7,15 +7,19 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Strings; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.DeprecationHandler; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request; @@ -26,7 +30,9 @@ import java.io.IOException; import java.util.Map; +import java.util.function.Predicate; +import static org.elasticsearch.test.BWCVersions.DEFAULT_BWC_VERSIONS; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -36,7 +42,7 @@ public class PreviewTransformActionRequestTests extends AbstractSerializingTrans @Override protected Request doParseInstance(XContentParser parser) throws IOException { - return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); } @Override @@ -46,7 +52,11 @@ protected Writeable.Reader instanceReader() { @Override protected Request createTestInstance() { - TransformConfig config = new TransformConfig( + return new Request(randomTransformConfig(), randomTimeValue(), randomBoolean()); + } + + private static TransformConfig randomTransformConfig() { + return new TransformConfig( "transform-preview", randomSourceConfig(), new DestConfig("unused-transform-preview-index", null, null), @@ -62,12 +72,57 @@ protected Request createTestInstance() { null, null ); - return new Request(config, randomTimeValue()); + } + + @Override + protected Request createXContextTestInstance(XContentType xContentType) { + return new Request(randomTransformConfig(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); } @Override protected Request mutateInstance(Request instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 + return randomBoolean() + ? new Request( + randomValueOtherThan(instance.getConfig(), PreviewTransformActionRequestTests::randomTransformConfig), + instance.ackTimeout(), + instance.previewAsIndexRequest() + ) + : new Request(instance.getConfig(), instance.ackTimeout(), instance.previewAsIndexRequest() == false); + } + + public void testAsIndexRequestIsNotBackwardsCompatible() throws IOException { + var unsupportedVersions = DEFAULT_BWC_VERSIONS.stream() + .filter(Predicate.not(version -> version.supports(Request.PREVIEW_AS_INDEX_REQUEST))) + .toList(); + for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) { + var testInstance = createTestInstance(); + for (var unsupportedVersion : unsupportedVersions) { + if (testInstance.previewAsIndexRequest()) { + var statusException = assertThrows( + ElasticsearchStatusException.class, + () -> copyWriteable(testInstance, getNamedWriteableRegistry(), instanceReader(), unsupportedVersion) + ); + assertThat(statusException.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat( + statusException.getMessage(), + equalTo( + "Cannot send a _preview request with as_index_request to an outdated node. " + + "Please upgrade the node to 9.3.0+ and try again." + ) + ); + } else { + var deserializedInstance = copyWriteable( + testInstance, + getNamedWriteableRegistry(), + instanceReader(), + unsupportedVersion + ); + assertNotSame(unsupportedVersion.toString(), deserializedInstance, testInstance); + assertEquals(unsupportedVersion.toString(), deserializedInstance, testInstance); + assertEquals(unsupportedVersion.toString(), deserializedInstance.hashCode(), testInstance.hashCode()); + } + } + } } public void testParsingOverwritesIdField() throws IOException { @@ -125,13 +180,13 @@ private void testParsingOverwrites( try ( XContentParser parser = JsonXContent.jsonXContent.createParser( - xContentRegistry(), - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry()) + .withDeprecationHandler(DeprecationHandler.THROW_UNSUPPORTED_OPERATION), json.streamInput() ) ) { - Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); assertThat(request.getConfig().getId(), is(equalTo(expectedTransformId))); assertThat(request.getConfig().getDestination().getIndex(), is(equalTo(expectedDestIndex))); assertThat(request.getConfig().getDestination().getPipeline(), is(equalTo(expectedDestPipeline))); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java index af5fe9f180a4e..ed78655e22820 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java @@ -148,6 +148,29 @@ public void testLatestWithAggregateMetricDoubleAsUniqueKey() throws Exception { } } + @SuppressWarnings("unchecked") + public void testPreviewAsIndexRequest() throws IOException { + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + var createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview?as_index_request", null); + createPreviewRequest.setJsonEntity(Strings.format(""" + { + "source": { + "index": "%s" + }, + "latest": { + "unique_key": [ "user_id" ], + "sort": "@timestamp" + } + }""", REVIEWS_INDEX_NAME)); + var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest)); + var preview = (List>) previewTransformResponse.get("preview"); + preview.forEach(p -> { + assertNotNull(XContentMapValues.extractValue("_id", p)); + assertNotNull(XContentMapValues.extractValue("_source.@timestamp", p)); + assertNotNull(XContentMapValues.extractValue("_source.user_id", p)); + }); + } + public void testContinuousLatestWithFrom_NoDocs() throws Exception { testContinuousLatestWithFrom("latest_from_no_docs", "reviews_from_no_docs", "2017-02-20", 0); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index 5444dc435a0d0..7828989cdae2e 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -1413,6 +1413,54 @@ private List previewWithOffset(String offset) throws IOException { return preview.stream().map(p -> (String) p.get("by_week")).toList(); } + @SuppressWarnings("unchecked") + public void testPreviewAsIndexRequest() throws Exception { + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + final Request createPreviewRequest = createRequestWithAuth( + "POST", + getTransformEndpoint() + "_preview?as_index_request", + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + + createPreviewRequest.setJsonEntity(Strings.format(""" + { + "source": { + "index": "%s" + }, + "pivot": { + "group_by": { + "user.id": { + "terms": { + "field": "user_id" + } + }, + "by_day": { + "date_histogram": { + "fixed_interval": "1d", + "field": "timestamp" + } + } + }, + "aggregations": { + "user.avg_rating": { + "avg": { + "field": "stars" + } + } + } + } + }""", REVIEWS_INDEX_NAME)); + + var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest)); + var preview = (List>) previewTransformResponse.get("preview"); + preview.forEach(p -> { + assertNotNull(XContentMapValues.extractValue("_id", p)); + assertNotNull(XContentMapValues.extractValue("_source.by_day", p)); + assertNotNull(XContentMapValues.extractValue("_source.user.id", p)); + assertNotNull(XContentMapValues.extractValue("_source.user.avg_rating", p)); + }); + } + public void testPivotWithMaxOnDateField() throws Exception { String transformId = "simple_date_histogram_pivot_with_max_time"; String transformIndex = "pivot_reviews_via_date_histogram_with_max_time"; diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java index 5090a00211ff4..295204c03e975 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java @@ -41,7 +41,7 @@ protected Settings nodeSettings() { public void testPreviewTransformWithRemoteIndex() { String transformId = "transform-with-remote-index"; TransformConfig config = randomConfig(transformId, "remote_cluster:my-index"); - PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, () -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet() diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java index 1ce2299df33fb..170dcb3e38123 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java @@ -63,7 +63,7 @@ public void testGetTransform() { public void testPreviewTransform() { String transformId = "transform-1"; TransformConfig config = randomConfig(transformId); - PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, () -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet() diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index 60f00da195974..32cc93d739adb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -47,12 +47,10 @@ import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Response; -import org.elasticsearch.xpack.core.transform.transforms.DestAlias; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.SyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; -import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings; import org.elasticsearch.xpack.core.transform.transforms.TransformEffectiveSettings; import org.elasticsearch.xpack.transform.TransformExtensionHolder; import org.elasticsearch.xpack.transform.persistence.TransformIndex; @@ -61,12 +59,11 @@ import org.elasticsearch.xpack.transform.transforms.TransformNodes; import org.elasticsearch.xpack.transform.utils.SourceDestValidations; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; @@ -155,9 +152,9 @@ protected void doExecute(Task task, Request request, ActionListener li config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), - config.getDestination().getAliases(), config.getSyncConfig(), config.getSettings(), + request.previewAsIndexRequest(), listener ), listener::onFailure @@ -211,89 +208,46 @@ private void getPreview( SourceConfig source, String pipeline, String dest, - List aliases, SyncConfig syncConfig, SettingsConfig settingsConfig, + boolean previewAsIndexRequest, ActionListener listener ) { - Client parentTaskClient = new ParentTaskAssigningClient(client, parentTaskId); + var parentTaskClient = new ParentTaskAssigningClient(client, parentTaskId); - final SetOnce> mappings = new SetOnce<>(); + final var mappings = new SetOnce>(); - final Map filteredHeaders = getSecurityHeadersPreferringSecondary( - threadPool, - securityContext, - clusterService.state() - ); + final var filteredHeaders = getSecurityHeadersPreferringSecondary(threadPool, securityContext, clusterService.state()); - ActionListener pipelineResponseActionListener = ActionListener.wrap(simulatePipelineResponse -> { - List> docs = new ArrayList<>(simulatePipelineResponse.getResults().size()); - List> errors = new ArrayList<>(); - for (var simulateDocumentResult : simulatePipelineResponse.getResults()) { - try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { - XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); - Map tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2(); - Map doc = (Map) XContentMapValues.extractValue("doc._source", tempMap); - if (doc != null) { - docs.add(doc); - } - Map error = (Map) XContentMapValues.extractValue("error", tempMap); - if (error != null) { - errors.add(error); - } - } - } - if (errors.isEmpty() == false) { - HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.get(0)); - } - TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( + ActionListener>> responseDocsListener = listener.delegateFailureAndWrap((l, docs) -> { + var generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( destIndexSettings, mappings.get(), transformId, Clock.systemUTC() ); + TransformConfigLinter.getWarnings(function, source, syncConfig).forEach(HeaderWarning::addWarning); + if (previewAsIndexRequest) { + l.onResponse(new Response(docs, generatedDestIndexSettings)); + } else { + l.onResponse( + new Response( + docs.stream().map(doc -> (Map) doc.get(TransformField.DOCUMENT_SOURCE_FIELD)).toList(), + generatedDestIndexSettings + ) + ); + } + }); - List warnings = TransformConfigLinter.getWarnings(function, source, syncConfig); - warnings.forEach(HeaderWarning::addWarning); - listener.onResponse(new Response(docs, generatedDestIndexSettings)); - }, listener::onFailure); - - ActionListener>> previewListener = ActionListener.wrap(docs -> { + ActionListener>> previewListener = responseDocsListener.delegateFailureAndWrap((l, docs) -> { if (pipeline == null) { - TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( - destIndexSettings, - mappings.get(), - transformId, - Clock.systemUTC() - ); - List warnings = TransformConfigLinter.getWarnings(function, source, syncConfig); - warnings.forEach(HeaderWarning::addWarning); - listener.onResponse(new Response(docs, generatedDestIndexSettings)); + l.onResponse(docs); } else { - List> results = docs.stream().map(doc -> { - Map src = new HashMap<>(); - String id = (String) doc.get(TransformField.DOCUMENT_ID_FIELD); - src.put("_source", doc); - src.put("_id", id); - src.put("_index", dest); - return src; - }).collect(Collectors.toList()); - - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - builder.field("docs", results); - builder.endObject(); - var pipelineRequest = new SimulatePipelineRequest( - ReleasableBytesReference.wrap(BytesReference.bytes(builder)), - XContentType.JSON - ); - pipelineRequest.setId(pipeline); - parentTaskClient.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener); - } + simulatePipeline(docs, pipeline, dest, parentTaskClient, l); } - }, listener::onFailure); + }); - ActionListener> deduceMappingsListener = ActionListener.wrap(deducedMappings -> { + ActionListener> deduceMappingsListener = previewListener.delegateFailureAndWrap((l, deducedMappings) -> { if (TransformEffectiveSettings.isDeduceMappingsDisabled(settingsConfig)) { mappings.set(emptyMap()); } else { @@ -307,10 +261,72 @@ private void getPreview( // Use deduced mappings for generating preview even if "settings.deduce_mappings" is set to false deducedMappings, NUMBER_OF_PREVIEW_BUCKETS, - previewListener + l ); - }, listener::onFailure); + }); function.deduceMappings(parentTaskClient, filteredHeaders, transformId, source, deduceMappingsListener); } + + @SuppressWarnings("unchecked") + private void simulatePipeline( + List> previewDocs, + String pipeline, + String dest, + Client client, + ActionListener>> listener + ) throws IOException { + ActionListener pipelineResponseActionListener = listener.delegateFailureAndWrap( + (l, simulatePipelineResponse) -> { + List> docs = new ArrayList<>(simulatePipelineResponse.getResults().size()); + List> errors = new ArrayList<>(); + for (var simulateDocumentResult : simulatePipelineResponse.getResults()) { + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + Map tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON) + .v2(); + Map sourceDoc = (Map) XContentMapValues.extractValue("doc._source", tempMap); + if (sourceDoc != null) { + docs.add( + Map.ofEntries( + Map.entry(TransformField.DOCUMENT_ID_FIELD, XContentMapValues.extractValue("doc._id", tempMap)), + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, sourceDoc) + ) + ); + } + var error = (Map) XContentMapValues.extractValue("error", tempMap); + if (error != null) { + errors.add(error); + } + } + } + if (errors.isEmpty() == false) { + HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.getFirst()); + } + l.onResponse(docs); + } + ); + + var results = previewDocs.stream() + .map( + doc -> Map.ofEntries( + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, doc.get(TransformField.DOCUMENT_SOURCE_FIELD)), + Map.entry(TransformField.DOCUMENT_ID_FIELD, doc.get(TransformField.DOCUMENT_ID_FIELD)), + Map.entry("_index", dest) + ) + ) + .toList(); + + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.field("docs", results); + builder.endObject(); + var pipelineRequest = new SimulatePipelineRequest( + ReleasableBytesReference.wrap(BytesReference.bytes(builder)), + XContentType.JSON + ); + pipelineRequest.setId(pipeline); + client.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener); + } + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java index 7aa45d37a3b41..5fedd3688375c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java @@ -13,7 +13,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; @@ -69,12 +68,15 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient ); } - TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + var timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + var previewAsIndexRequest = restRequest.paramAsBoolean(TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName(), false); SetOnce previewRequestHolder = new SetOnce<>(); if (Strings.isNullOrEmpty(transformId)) { - previewRequestHolder.set(PreviewTransformAction.Request.fromXContent(restRequest.contentOrSourceParamParser(), timeout)); + previewRequestHolder.set( + PreviewTransformAction.Request.fromXContent(restRequest.contentOrSourceParamParser(), timeout, previewAsIndexRequest) + ); } Client client = new RestCancellableNodeClient(nodeClient, restRequest.getHttpChannel()); @@ -97,7 +99,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient ) ); } else { - PreviewTransformAction.Request previewRequest = new PreviewTransformAction.Request(transforms.get(0), timeout); + PreviewTransformAction.Request previewRequest = new PreviewTransformAction.Request( + transforms.getFirst(), + timeout, + previewAsIndexRequest + ); client.execute(PreviewTransformAction.INSTANCE, previewRequest, listener); } }, listener::onFailure)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java index 2de810b2b902d..97e6357c1da03 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java @@ -38,7 +38,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; @@ -82,32 +81,37 @@ public void preview( client, TransportSearchAction.TYPE, buildSearchRequestForValidation("preview", sourceConfig, timeout, numberOfBuckets), - ActionListener.wrap(r -> { + listener.delegateFailureAndWrap((l, r) -> { try { final InternalAggregations aggregations = r.getAggregations(); if (aggregations == null) { - listener.onFailure( + l.onFailure( new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST) ); return; } final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); if (agg == null || agg.getBuckets().isEmpty()) { - listener.onResponse(Collections.emptyList()); + l.onResponse(Collections.emptyList()); return; } - TransformIndexerStats stats = new TransformIndexerStats(); - TransformProgress progress = new TransformProgress(); - List> docs = extractResults(agg, fieldTypeMap, stats, progress).map( - this::documentTransformationFunction - ).collect(Collectors.toList()); + var stats = new TransformIndexerStats(); + var progress = new TransformProgress(); + var docs = extractResults(agg, fieldTypeMap, stats, progress).map(doc -> { + var docId = (String) doc.get(TransformField.DOCUMENT_ID_FIELD); + doc = documentTransformationFunction(doc); + return Map.ofEntries( + Map.entry(TransformField.DOCUMENT_ID_FIELD, docId), + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, doc) + ); + }).toList(); - listener.onResponse(docs); + l.onResponse(docs); } catch (AggregationResultUtils.AggregationExtractionException extractionException) { - listener.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST)); + l.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST)); } - }, listener::onFailure) + }) ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java index 37817bc261be5..1608ba2d840c2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.InternalComposite; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; @@ -39,6 +40,7 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.TransformDeprecations; +import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfigTests; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; @@ -71,9 +73,11 @@ import java.util.stream.Stream; import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.transform.transforms.common.AbstractCompositeAggFunction.COMPOSITE_AGGREGATION_NAME; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -329,7 +333,16 @@ public void testPreviewForCompositeAggregation() throws Exception { SettingsConfigTests.randomSettingsConfig(), TransformConfigVersion.CURRENT, Collections.emptySet() - ); + ) { + protected Stream> extractResults( + CompositeAggregation agg, + Map fieldTypeMap, + TransformIndexerStats transformIndexerStats, + TransformProgress transformProgress + ) { + return Stream.of(Map.ofEntries(Map.entry(TransformField.DOCUMENT_ID_FIELD, "1234"), Map.entry("testKey", "testValue"))); + } + }; CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionHolder = new AtomicReference<>(); @@ -348,7 +361,15 @@ public void testPreviewForCompositeAggregation() throws Exception { } assertThat(exceptionHolder.get(), is(nullValue())); - assertThat(responseHolder.get(), is(empty())); + assertThat( + responseHolder.get(), + containsInAnyOrder( + Map.ofEntries( + Map.entry(TransformField.DOCUMENT_ID_FIELD, "1234"), + Map.entry(TransformField.DOCUMENT_SOURCE_FIELD, Map.of("testKey", "testValue")) + ) + ) + ); } private static SearchResponse searchResponseFromAggs(InternalAggregations aggs) { @@ -425,10 +446,12 @@ protected void Request request, ActionListener listener ) { - SearchResponse response = mock(SearchResponse.class); - InternalComposite compositeAggregation = mock(InternalComposite.class); + SearchResponse response = mock(); + InternalComposite compositeAggregation = mock(); + when(compositeAggregation.getName()).thenReturn(COMPOSITE_AGGREGATION_NAME); + InternalComposite.InternalBucket bucket = mock(); + when(compositeAggregation.getBuckets()).thenReturn(List.of(bucket)); when(response.getAggregations()).thenReturn(InternalAggregations.from(List.of(compositeAggregation))); - when(compositeAggregation.getBuckets()).thenReturn(new ArrayList<>()); listener.onResponse((Response) response); } }