Skip to content
Merged
5 changes: 5 additions & 0 deletions docs/changelog/137455.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137455
summary: Preview index request
area: Transform
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9213000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
resolved_fields_caps,9212000
transform_preview_as_index_request,9213000
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class TransformField {

public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
public static final ParseField CHECK_FOR_DANGLING_TASKS = new ParseField("check_dangling_tasks");
public static final ParseField PREVIEW_AS_INDEX_REQUEST = new ParseField("as_index_request");
/**
* Fields for checkpointing
*/
Expand Down Expand Up @@ -102,6 +103,8 @@ public final class TransformField {

// internal document id
public static final String DOCUMENT_ID_FIELD = "_id";
// internal document source
public static final String DOCUMENT_SOURCE_FIELD = "_source";

public static final PersistentTasksCustomMetadata.Assignment AWAITING_UPGRADE = new PersistentTasksCustomMetadata.Assignment(
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
Expand Down Expand Up @@ -55,19 +56,24 @@ private PreviewTransformAction() {

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

static final TransportVersion PREVIEW_AS_INDEX_REQUEST = TransportVersion.fromName("transform_preview_as_index_request");
private final TransformConfig config;
private final boolean previewAsIndexRequest;

public Request(TransformConfig config, TimeValue timeout) {
public Request(TransformConfig config, TimeValue timeout, boolean previewAsIndexRequest) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, timeout);
this.config = config;
this.previewAsIndexRequest = previewAsIndexRequest;
}

public Request(StreamInput in) throws IOException {
super(in);
this.config = new TransformConfig(in);
this.previewAsIndexRequest = in.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST) ? in.readBoolean() : false;
}

public static Request fromXContent(final XContentParser parser, TimeValue timeout) throws IOException {
public static Request fromXContent(final XContentParser parser, TimeValue timeout, boolean previewAsIndexRequest)
throws IOException {
Map<String, Object> content = parser.map();
// dest.index is not required for _preview, so we just supply our own
Map<String, String> tempDestination = new HashMap<>();
Expand All @@ -89,7 +95,7 @@ public static Request fromXContent(final XContentParser parser, TimeValue timeou
XContentType.JSON
)
) {
return new Request(TransformConfig.fromXContent(newParser, null, false), timeout);
return new Request(TransformConfig.fromXContent(newParser, null, false), timeout, previewAsIndexRequest);
}
}

Expand All @@ -115,15 +121,22 @@ public TransformConfig getConfig() {
return config;
}

public boolean previewAsIndexRequest() {
return previewAsIndexRequest;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.config.writeTo(out);
if (out.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST)) {
out.writeBoolean(previewAsIndexRequest);
}
Comment on lines 134 to 143
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's throw if somebody specifically requested true, but the transport version doesn't support it. This way they don't think they ran into a bug, but instead their cluster just isn't fully upgraded yet.

Copy link
Member Author

@prwhelan prwhelan Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an example of this kind of thing elsewhere? This would effectively break the API until they're on the same version - would it be easier/better to use NodeFeature to instead disable the feature until every node is on the latest version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would effectively break the API until they're on the same version - would it be easier/better to use NodeFeature to instead disable the feature until every node is on the latest version?

No it wouldn't. Folks can still use false which is the default value. Why would we want to silently fail the request if somebody specifically asked for a new feature that we cannot provide?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be easier/better to use NodeFeature

This I do not know, but that has additional guarantees and costs that we don't really need.

I think transport version is perfectly fine. I am saying that the default behavior of the API should still be usable, but if somebody sends a parameter that is not supported by nodes that will receive the message, we should not silently ignore it and instead tell the user "Hey, this feature you want to use isn't available yet"

This has been done before by other APIs in the past, I don't have any examples handy.

}

@Override
public int hashCode() {
return Objects.hash(config);
return Objects.hash(config, previewAsIndexRequest);
}

@Override
Expand All @@ -135,7 +148,7 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(config, other.config);
return Objects.equals(config, other.config) && (previewAsIndexRequest == other.previewAsIndexRequest);
}

@Override
Expand Down Expand Up @@ -170,10 +183,6 @@ public List<Map<String, Object>> getDocs() {
return docs;
}

public TransformDestIndexSettings getGeneratedDestIndexSettings() {
return generatedDestIndexSettings;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(docs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
package org.elasticsearch.xpack.core.transform;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.test.AbstractBWCSerializationTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -30,14 +27,12 @@
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import static java.util.Collections.emptyList;

public abstract class AbstractSerializingTransformTestCase<T extends ToXContent & Writeable> extends AbstractXContentSerializingTestCase<
T> {
public abstract class AbstractSerializingTransformTestCase<T extends ToXContent & Writeable> extends AbstractBWCSerializationTestCase<T> {

protected static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(TransformField.FOR_INTERNAL_STORAGE, "true")
Expand Down Expand Up @@ -111,22 +106,9 @@ protected NamedXContentRegistry xContentRegistry() {
return namedXContentRegistry;
}

protected <X extends Writeable, Y extends Writeable> Y writeAndReadBWCObject(
X original,
NamedWriteableRegistry registry,
Writeable.Writer<X> writer,
Writeable.Reader<Y> reader,
TransportVersion version
) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setTransportVersion(version);
original.writeTo(output);

try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) {
in.setTransportVersion(version);
return reader.read(in);
}
}
@Override
protected T mutateInstanceForVersion(T instance, TransportVersion version) {
return instance;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -16,6 +17,8 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request;
Expand All @@ -36,7 +39,7 @@ public class PreviewTransformActionRequestTests extends AbstractSerializingTrans

@Override
protected Request doParseInstance(XContentParser parser) throws IOException {
return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
return Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
}

@Override
Expand All @@ -46,7 +49,11 @@ protected Writeable.Reader<Request> instanceReader() {

@Override
protected Request createTestInstance() {
TransformConfig config = new TransformConfig(
return new Request(randomTransformConfig(), randomTimeValue(), randomBoolean());
}

private static TransformConfig randomTransformConfig() {
return new TransformConfig(
"transform-preview",
randomSourceConfig(),
new DestConfig("unused-transform-preview-index", null, null),
Expand All @@ -62,12 +69,31 @@ protected Request createTestInstance() {
null,
null
);
return new Request(config, randomTimeValue());
}

@Override
protected Request createXContextTestInstance(XContentType xContentType) {
return new Request(randomTransformConfig(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
}

@Override
protected Request mutateInstance(Request instance) {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
return randomBoolean()
? new Request(
randomValueOtherThan(instance.getConfig(), PreviewTransformActionRequestTests::randomTransformConfig),
instance.ackTimeout(),
instance.previewAsIndexRequest()
)
: new Request(instance.getConfig(), instance.ackTimeout(), instance.previewAsIndexRequest() == false);
}

@Override
protected Request mutateInstanceForVersion(Request instance, TransportVersion version) {
if (version.supports(Request.PREVIEW_AS_INDEX_REQUEST)) {
return instance;
} else {
return new Request(instance.getConfig(), instance.ackTimeout(), false);
}
}

public void testParsingOverwritesIdField() throws IOException {
Expand Down Expand Up @@ -125,13 +151,13 @@ private void testParsingOverwrites(

try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry())
.withDeprecationHandler(DeprecationHandler.THROW_UNSUPPORTED_OPERATION),
json.streamInput()
)
) {

Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
Request request = Request.fromXContent(parser, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
assertThat(request.getConfig().getId(), is(equalTo(expectedTransformId)));
assertThat(request.getConfig().getDestination().getIndex(), is(equalTo(expectedDestIndex)));
assertThat(request.getConfig().getDestination().getPipeline(), is(equalTo(expectedDestPipeline)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,29 @@ public void testLatestWithAggregateMetricDoubleAsUniqueKey() throws Exception {
}
}

@SuppressWarnings("unchecked")
public void testPreviewAsIndexRequest() throws IOException {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
var createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview?as_index_request", null);
createPreviewRequest.setJsonEntity(Strings.format("""
{
"source": {
"index": "%s"
},
"latest": {
"unique_key": [ "user_id" ],
"sort": "@timestamp"
}
}""", REVIEWS_INDEX_NAME));
var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest));
var preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
preview.forEach(p -> {
assertNotNull(XContentMapValues.extractValue("_id", p));
assertNotNull(XContentMapValues.extractValue("_source.@timestamp", p));
assertNotNull(XContentMapValues.extractValue("_source.user_id", p));
});
}

public void testContinuousLatestWithFrom_NoDocs() throws Exception {
testContinuousLatestWithFrom("latest_from_no_docs", "reviews_from_no_docs", "2017-02-20", 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,54 @@ private List<String> previewWithOffset(String offset) throws IOException {
return preview.stream().map(p -> (String) p.get("by_week")).toList();
}

@SuppressWarnings("unchecked")
public void testPreviewAsIndexRequest() throws Exception {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
final Request createPreviewRequest = createRequestWithAuth(
"POST",
getTransformEndpoint() + "_preview?as_index_request",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);

createPreviewRequest.setJsonEntity(Strings.format("""
{
"source": {
"index": "%s"
},
"pivot": {
"group_by": {
"user.id": {
"terms": {
"field": "user_id"
}
},
"by_day": {
"date_histogram": {
"fixed_interval": "1d",
"field": "timestamp"
}
}
},
"aggregations": {
"user.avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", REVIEWS_INDEX_NAME));

var previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest));
var preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
preview.forEach(p -> {
assertNotNull(XContentMapValues.extractValue("_id", p));
assertNotNull(XContentMapValues.extractValue("_source.by_day", p));
assertNotNull(XContentMapValues.extractValue("_source.user.id", p));
assertNotNull(XContentMapValues.extractValue("_source.user.avg_rating", p));
});
}

public void testPivotWithMaxOnDateField() throws Exception {
String transformId = "simple_date_histogram_pivot_with_max_time";
String transformIndex = "pivot_reviews_via_date_histogram_with_max_time";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected Settings nodeSettings() {
public void testPreviewTransformWithRemoteIndex() {
String transformId = "transform-with-remote-index";
TransformConfig config = randomConfig(transformId, "remote_cluster:my-index");
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
ElasticsearchStatusException e = expectThrows(
ElasticsearchStatusException.class,
() -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetTransform() {
public void testPreviewTransform() {
String transformId = "transform-1";
TransformConfig config = randomConfig(transformId);
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
PreviewTransformAction.Request request = new PreviewTransformAction.Request(config, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false);
ElasticsearchStatusException e = expectThrows(
ElasticsearchStatusException.class,
() -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet()
Expand Down
Loading