Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -131,6 +133,13 @@ public void writeTo(StreamOutput out) throws IOException {
this.config.writeTo(out);
if (out.getTransportVersion().supports(PREVIEW_AS_INDEX_REQUEST)) {
out.writeBoolean(previewAsIndexRequest);
} else if (previewAsIndexRequest) {
throw new ElasticsearchStatusException(
"_preview with "
+ TransformField.PREVIEW_AS_INDEX_REQUEST.getPreferredName()
+ " set to true only works if all the nodes support it.",
RestStatus.FORBIDDEN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest BAD_REQUEST

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - in the past I've used 403 or 422 for this, since the request was understood and syntactically correct, but the server is not going to process it. 400 has more of a "I don't know what you just said to me" vibe.

);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
package org.elasticsearch.xpack.core.transform;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.test.AbstractBWCSerializationTestCase;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -27,12 +30,14 @@
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import static java.util.Collections.emptyList;

public abstract class AbstractSerializingTransformTestCase<T extends ToXContent & Writeable> extends AbstractBWCSerializationTestCase<T> {
public abstract class AbstractSerializingTransformTestCase<T extends ToXContent & Writeable> extends AbstractXContentSerializingTestCase<
T> {

protected static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(TransformField.FOR_INTERNAL_STORAGE, "true")
Expand Down Expand Up @@ -106,9 +111,22 @@ protected NamedXContentRegistry xContentRegistry() {
return namedXContentRegistry;
}

@Override
protected T mutateInstanceForVersion(T instance, TransportVersion version) {
return instance;
protected <X extends Writeable, Y extends Writeable> Y writeAndReadBWCObject(
X original,
NamedWriteableRegistry registry,
Writeable.Writer<X> writer,
Writeable.Reader<Y> reader,
TransportVersion version
) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setTransportVersion(version);
original.writeTo(output);

try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) {
in.setTransportVersion(version);
return reader.read(in);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -29,7 +30,9 @@

import java.io.IOException;
import java.util.Map;
import java.util.function.Predicate;

import static org.elasticsearch.test.BWCVersions.DEFAULT_BWC_VERSIONS;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -87,12 +90,35 @@ protected Request mutateInstance(Request instance) {
: new Request(instance.getConfig(), instance.ackTimeout(), instance.previewAsIndexRequest() == false);
}

@Override
protected Request mutateInstanceForVersion(Request instance, TransportVersion version) {
if (version.supports(Request.PREVIEW_AS_INDEX_REQUEST)) {
return instance;
} else {
return new Request(instance.getConfig(), instance.ackTimeout(), false);
public void testAsIndexRequestIsNotBackwardsCompatible() throws IOException {
var unsupportedVersions = DEFAULT_BWC_VERSIONS.stream()
.filter(Predicate.not(version -> version.supports(Request.PREVIEW_AS_INDEX_REQUEST)))
.toList();
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
var testInstance = createTestInstance();
for (var unsupportedVersion : unsupportedVersions) {
if (testInstance.previewAsIndexRequest()) {
var statusException = assertThrows(
ElasticsearchStatusException.class,
() -> copyWriteable(testInstance, getNamedWriteableRegistry(), instanceReader(), unsupportedVersion)
);
assertThat(statusException.status(), equalTo(RestStatus.FORBIDDEN));
assertThat(
statusException.getMessage(),
equalTo("_preview with as_index_request set to true only works if all the nodes support it.")
);
} else {
var deserializedInstance = copyWriteable(
testInstance,
getNamedWriteableRegistry(),
instanceReader(),
unsupportedVersion
);
assertNotSame(unsupportedVersion.toString(), deserializedInstance, testInstance);
assertEquals(unsupportedVersion.toString(), deserializedInstance, testInstance);
assertEquals(unsupportedVersion.toString(), deserializedInstance.hashCode(), testInstance.hashCode());
}
}
}
}

Expand Down