Skip to content
Merged
5 changes: 5 additions & 0 deletions docs/changelog/137455.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137455
summary: Preview index request
area: Transform
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9213000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
resolved_fields_caps,9212000
transform_preview_as_index_request,9213000
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class TransformField {

public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
public static final ParseField CHECK_FOR_DANGLING_TASKS = new ParseField("check_dangling_tasks");
public static final ParseField PREVIEW_AS_INDEX_REQUEST = new ParseField("as_index_request");
/**
* Fields for checkpointing
*/
Expand Down Expand Up @@ -102,6 +103,8 @@ public final class TransformField {

// internal document id
public static final String DOCUMENT_ID_FIELD = "_id";
// internal document source
public static final String DOCUMENT_SOURCE_FIELD = "_source";

public static final PersistentTasksCustomMetadata.Assignment AWAITING_UPGRADE = new PersistentTasksCustomMetadata.Assignment(
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
Expand All @@ -18,6 +20,7 @@
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -55,19 +58,24 @@ private PreviewTransformAction() {

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

static final TransportVersion PREVIEW_AS_INDEX_REQUEST = TransportVersion.fromName("transform_preview_as_index_request");
private final TransformConfig config;
private final boolean previewAsIndexRequest;

public Request(TransformConfig config, TimeValue timeout) {
public Request(TransformConfig config, TimeValue timeout, boolean previewAsIndexRequest) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, timeout);
this.config = config;
this.previewAsIndexRequest = previewAsIndexRequest;
}

public Request(StreamInput in) throws IOException {
super(in);
this.config = new TransformConfig(in);
this.previewAsIndexRequest = in.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST) ? in.readBoolean() : false;
}

public static Request fromXContent(final XContentParser parser, TimeValue timeout) throws IOException {
public static Request fromXContent(final XContentParser parser, TimeValue timeout, boolean previewAsIndexRequest)
throws IOException {
Map<String, Object> content = parser.map();
// dest.index is not required for _preview, so we just supply our own
Map<String, String> tempDestination = new HashMap<>();
Expand All @@ -89,7 +97,7 @@ public static Request fromXContent(final XContentParser parser, TimeValue timeou
XContentType.JSON
)
) {
return new Request(TransformConfig.fromXContent(newParser, null, false), timeout);
return new Request(TransformConfig.fromXContent(newParser, null, false), timeout, previewAsIndexRequest);
}
}

Expand All @@ -115,15 +123,29 @@ public TransformConfig getConfig() {
return config;
}

public boolean previewAsIndexRequest() {
return previewAsIndexRequest;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.config.writeTo(out);
if (out.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST)) {
out.writeBoolean(previewAsIndexRequest);
} else if (previewAsIndexRequest) {
throw new ElasticsearchStatusException(
"_preview with "
+ TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName()
+ " set to true only works if all the nodes support it.",
RestStatus.FORBIDDEN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest BAD_REQUEST

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - in the past I've used 403 or 422 for this, since the request was understood and syntactically correct, but the server is not going to process it. 400 has more of a "I don't know what you just said to me" vibe.

);
}
}

@Override
public int hashCode() {
return Objects.hash(config);
return Objects.hash(config, previewAsIndexRequest);
}

@Override
Expand All @@ -135,7 +157,7 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(config, other.config);
return Objects.equals(config, other.config) && (previewAsIndexRequest == other.previewAsIndexRequest);
}

@Override
Expand Down Expand Up @@ -170,10 +192,6 @@ public List<Map<String, Object>> getDocs() {
return docs;
}

public TransformDestIndexSettings getGeneratedDestIndexSettings() {
return generatedDestIndexSettings;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(docs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request;
Expand All @@ -26,7 +30,9 @@

import java.io.IOException;
import java.util.Map;
import java.util.function.Predicate;

import static org.elasticsearch.test.BWCVersions.DEFAULT_BWC_VERSIONS;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -36,7 +42,7 @@ public class PreviewTransformActionRequestTests extends AbstractSerializingTrans

@Override
protected Request doParseInstance(XContentParser parser) throws IOException {
return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
}

@Override
Expand All @@ -46,7 +52,11 @@ protected Writeable.Reader<Request> instanceReader() {

@Override
protected Request createTestInstance() {
TransformConfig config = new TransformConfig(
return new Request(randomTransformConfig(), randomTimeValue(), randomBoolean());
}

private static TransformConfig randomTransformConfig() {
return new TransformConfig(
"transform-preview",
randomSourceConfig(),
new DestConfig("unused-transform-preview-index", null, null),
Expand All @@ -62,12 +72,54 @@ protected Request createTestInstance() {
null,
null
);
return new Request(config, randomTimeValue());
}

@Override
protected Request createXContextTestInstance(XContentType xContentType) {
return new Request(randomTransformConfig(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
}

@Override
protected Request mutateInstance(Request instance) {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
return randomBoolean()
? new Request(
randomValueOtherThan(instance.getConfig(), PreviewTransformActionRequestTests::randomTransformConfig),
instance.ackTimeout(),
instance.previewAsIndexRequest()
)
: new Request(instance.getConfig(), instance.ackTimeout(), instance.previewAsIndexRequest() == false);
}

public void testAsIndexRequestIsNotBackwardsCompatible() throws IOException {
var unsupportedVersions = DEFAULT_BWC_VERSIONS.stream()
.filter(Predicate.not(version -> version.supports(Request.PREVIEW_AS_INDEX_REQUEST)))
.toList();
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
var testInstance = createTestInstance();
for (var unsupportedVersion : unsupportedVersions) {
if (testInstance.previewAsIndexRequest()) {
var statusException = assertThrows(
ElasticsearchStatusException.class,
() -> copyWriteable(testInstance, getNamedWriteableRegistry(), instanceReader(), unsupportedVersion)
);
assertThat(statusException.status(), equalTo(RestStatus.FORBIDDEN));
assertThat(
statusException.getMessage(),
equalTo("_preview with as_index_request set to true only works if all the nodes support it.")
);
} else {
var deserializedInstance = copyWriteable(
testInstance,
getNamedWriteableRegistry(),
instanceReader(),
unsupportedVersion
);
assertNotSame(unsupportedVersion.toString(), deserializedInstance, testInstance);
assertEquals(unsupportedVersion.toString(), deserializedInstance, testInstance);
assertEquals(unsupportedVersion.toString(), deserializedInstance.hashCode(), testInstance.hashCode());
}
}
}
}

public void testParsingOverwritesIdField() throws IOException {
Expand Down Expand Up @@ -125,13 +177,13 @@ private void testParsingOverwrites(

try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry())
.withDeprecationHandler(DeprecationHandler.THROW_UNSUPPORTED_OPERATION),
json.streamInput()
)
) {

Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
assertThat(request.getConfig().getId(), is(equalTo(expectedTransformId)));
assertThat(request.getConfig().getDestination().getIndex(), is(equalTo(expectedDestIndex)));
assertThat(request.getConfig().getDestination().getPipeline(), is(equalTo(expectedDestPipeline)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,29 @@ public void testLatestWithAggregateMetricDoubleAsUniqueKey() throws Exception {
}
}

@SuppressWarnings("unchecked")
public void testPreviewAsIndexRequest() throws IOException {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
var createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview?as_index_request", null);
createPreviewRequest.setJsonEntity(Strings.format("""
{
"source": {
"index": "%s"
},
"latest": {
"unique_key": [ "user_id" ],
"sort": "@timestamp"
}
}""", REVIEWS_INDEX_NAME));
var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest));
var preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
preview.forEach(p -> {
assertNotNull(XContentMapValues.extractValue("_id", p));
assertNotNull(XContentMapValues.extractValue("_source.@timestamp", p));
assertNotNull(XContentMapValues.extractValue("_source.user_id", p));
});
}

public void testContinuousLatestWithFrom_NoDocs() throws Exception {
testContinuousLatestWithFrom("latest_from_no_docs", "reviews_from_no_docs", "2017-02-20", 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,54 @@ private List<String> previewWithOffset(String offset) throws IOException {
return preview.stream().map(p -> (String) p.get("by_week")).toList();
}

@SuppressWarnings("unchecked")
public void testPreviewAsIndexRequest() throws Exception {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
final Request createPreviewRequest = createRequestWithAuth(
"POST",
getTransformEndpoint() + "_preview?as_index_request",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);

createPreviewRequest.setJsonEntity(Strings.format("""
{
"source": {
"index": "%s"
},
"pivot": {
"group_by": {
"user.id": {
"terms": {
"field": "user_id"
}
},
"by_day": {
"date_histogram": {
"fixed_interval": "1d",
"field": "timestamp"
}
}
},
"aggregations": {
"user.avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", REVIEWS_INDEX_NAME));

var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest));
var preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
preview.forEach(p -> {
assertNotNull(XContentMapValues.extractValue("_id", p));
assertNotNull(XContentMapValues.extractValue("_source.by_day", p));
assertNotNull(XContentMapValues.extractValue("_source.user.id", p));
assertNotNull(XContentMapValues.extractValue("_source.user.avg_rating", p));
});
}

public void testPivotWithMaxOnDateField() throws Exception {
String transformId = "simple_date_histogram_pivot_with_max_time";
String transformIndex = "pivot_reviews_via_date_histogram_with_max_time";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected Settings nodeSettings() {
public void testPreviewTransformWithRemoteIndex() {
String transformId = "transform-with-remote-index";
TransformConfig config = randomConfig(transformId, "remote_cluster:my-index");
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
ElasticsearchStatusException e = expectThrows(
ElasticsearchStatusException.class,
() -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetTransform() {
public void testPreviewTransform() {
String transformId = "transform-1";
TransformConfig config = randomConfig(transformId);
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
ElasticsearchStatusException e = expectThrows(
ElasticsearchStatusException.class,
() -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet()
Expand Down
Loading