Skip to content

Commit b245417

Browse files
committed
Backward compatible DatafeedConfig
1 parent d6f73d8 commit b245417

File tree

5 files changed

+50
-61
lines changed

5 files changed

+50
-61
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ static TransportVersion def(int id) {
149149
public static final TransportVersion AGGS_EXCLUDED_DELETED_DOCS = def(8_609_00_0);
150150
public static final TransportVersion ESQL_SERIALIZE_BIG_ARRAY = def(8_610_00_0);
151151
public static final TransportVersion AUTO_SHARDING_ROLLOVER_CONDITION = def(8_611_00_0);
152+
public static final TransportVersion ESQL_IN_DATAFEEDS = def(8_612_00_0);
152153

153154
/*
154155
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlConfigIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public final class MlConfigIndex {
2020
private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
2121

2222
public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000;
23-
public static final int CONFIG_INDEX_MAPPINGS_VERSION = 1;
23+
public static final int CONFIG_INDEX_MAPPINGS_VERSION = 2;
2424

2525
/**
2626
* The name of the index where job, datafeed and analytics configuration is stored

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
233233

234234
private final List<String> indices;
235235
private final QueryProvider queryProvider;
236-
private final AggProvider aggProvider;
237236
private final String esqlQuery;
237+
private final AggProvider aggProvider;
238238
private final List<SearchSourceBuilder.ScriptField> scriptFields;
239239
private final Integer scrollSize;
240240
private final ChunkingConfig chunkingConfig;
@@ -251,8 +251,8 @@ private DatafeedConfig(
251251
TimeValue frequency,
252252
List<String> indices,
253253
QueryProvider queryProvider,
254-
AggProvider aggProvider,
255254
String esqlQuery,
255+
AggProvider aggProvider,
256256
List<SearchSourceBuilder.ScriptField> scriptFields,
257257
Integer scrollSize,
258258
ChunkingConfig chunkingConfig,
@@ -268,8 +268,8 @@ private DatafeedConfig(
268268
this.frequency = frequency;
269269
this.indices = indices == null ? null : Collections.unmodifiableList(indices);
270270
this.queryProvider = queryProvider == null ? null : new QueryProvider(queryProvider);
271-
this.aggProvider = aggProvider == null ? null : new AggProvider(aggProvider);
272271
this.esqlQuery = esqlQuery;
272+
this.aggProvider = aggProvider == null ? null : new AggProvider(aggProvider);
273273
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
274274
this.scrollSize = scrollSize;
275275
this.chunkingConfig = chunkingConfig;
@@ -286,17 +286,15 @@ public DatafeedConfig(StreamInput in) throws IOException {
286286
this.queryDelay = in.readOptionalTimeValue();
287287
this.frequency = in.readOptionalTimeValue();
288288
this.indices = in.readOptionalStringCollectionAsList();
289-
// each of these writables are version aware
290-
this.queryProvider = in.readOptionalWriteable(QueryProvider::fromStream);
291-
// This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
292-
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
293-
this.esqlQuery = in.readOptionalString();
294-
295-
if (in.readBoolean()) {
296-
this.scriptFields = in.readCollectionAsImmutableList(SearchSourceBuilder.ScriptField::new);
289+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_IN_DATAFEEDS)) {
290+
this.queryProvider = in.readOptionalWriteable(QueryProvider::fromStream);
291+
this.esqlQuery = in.readOptionalString();
297292
} else {
298-
this.scriptFields = null;
293+
this.queryProvider = QueryProvider.fromStream(in);
294+
this.esqlQuery = null;
299295
}
296+
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
297+
this.scriptFields = in.readOptionalCollectionAsList(SearchSourceBuilder.ScriptField::new);
300298
this.scrollSize = in.readOptionalVInt();
301299
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
302300
this.headers = in.readImmutableMap(StreamInput::readString);
@@ -524,9 +522,13 @@ public void writeTo(StreamOutput out) throws IOException {
524522
out.writeOptionalTimeValue(queryDelay);
525523
out.writeOptionalTimeValue(frequency);
526524
out.writeOptionalStringCollection(indices);
527-
out.writeOptionalWriteable(queryProvider);
525+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_IN_DATAFEEDS)) {
526+
out.writeOptionalWriteable(queryProvider);
527+
out.writeOptionalString(esqlQuery);
528+
} else {
529+
queryProvider.writeTo(out);
530+
}
528531
out.writeOptionalWriteable(aggProvider);
529-
out.writeOptionalString(esqlQuery);
530532
out.writeOptionalCollection(scriptFields);
531533
out.writeOptionalVInt(scrollSize);
532534
out.writeOptionalWriteable(chunkingConfig);
@@ -778,8 +780,8 @@ public Builder(DatafeedConfig config) {
778780
this.frequency = config.frequency;
779781
this.indices = new ArrayList<>(config.indices);
780782
this.queryProvider = config.queryProvider == null ? null : new QueryProvider(config.queryProvider);
781-
this.aggProvider = config.aggProvider == null ? null : new AggProvider(config.aggProvider);
782783
this.esqlQuery = config.esqlQuery;
784+
this.aggProvider = config.aggProvider == null ? null : new AggProvider(config.aggProvider);
783785
this.scriptFields = config.scriptFields == null ? null : new ArrayList<>(config.scriptFields);
784786
this.scrollSize = config.scrollSize;
785787
this.chunkingConfig = config.chunkingConfig;
@@ -795,22 +797,17 @@ public Builder(StreamInput in) throws IOException {
795797
this.jobId = in.readOptionalString();
796798
this.queryDelay = in.readOptionalTimeValue();
797799
this.frequency = in.readOptionalTimeValue();
798-
if (in.readBoolean()) {
799-
this.indices = in.readCollectionAsImmutableList(StreamInput::readString);
800+
this.indices = in.readOptionalStringCollectionAsList();
801+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_IN_DATAFEEDS)) {
802+
this.queryProvider = in.readOptionalWriteable(QueryProvider::fromStream);
803+
this.esqlQuery = in.readOptionalString();
800804
} else {
801-
this.indices = null;
805+
this.queryProvider = QueryProvider.fromStream(in);
806+
this.esqlQuery = null;
802807
}
803-
// each of these writables are version aware
804-
this.queryProvider = in.readOptionalWriteable(QueryProvider::fromStream);
805808
// This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
806809
this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
807-
this.esqlQuery = in.readOptionalString();
808-
809-
if (in.readBoolean()) {
810-
this.scriptFields = in.readCollectionAsImmutableList(SearchSourceBuilder.ScriptField::new);
811-
} else {
812-
this.scriptFields = null;
813-
}
810+
this.scriptFields = in.readOptionalCollectionAsList(SearchSourceBuilder.ScriptField::new);
814811
this.scrollSize = in.readOptionalVInt();
815812
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
816813
this.headers = in.readImmutableMap(StreamInput::readString);
@@ -828,25 +825,15 @@ public void writeTo(StreamOutput out) throws IOException {
828825
out.writeOptionalString(jobId);
829826
out.writeOptionalTimeValue(queryDelay);
830827
out.writeOptionalTimeValue(frequency);
831-
if (indices != null) {
832-
out.writeBoolean(true);
833-
out.writeStringCollection(indices);
828+
out.writeOptionalStringCollection(indices);
829+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_IN_DATAFEEDS)) {
830+
out.writeOptionalWriteable(queryProvider);
831+
out.writeOptionalString(esqlQuery);
834832
} else {
835-
out.writeBoolean(false);
833+
queryProvider.writeTo(out);
836834
}
837-
838-
// Each of these writables are version aware
839-
out.writeOptionalWriteable(queryProvider);
840-
// This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method
841835
out.writeOptionalWriteable(aggProvider);
842-
out.writeOptionalString(esqlQuery);
843-
844-
if (scriptFields != null) {
845-
out.writeBoolean(true);
846-
out.writeCollection(scriptFields);
847-
} else {
848-
out.writeBoolean(false);
849-
}
836+
out.writeOptionalCollection(scriptFields);
850837
out.writeOptionalVInt(scrollSize);
851838
out.writeOptionalWriteable(chunkingConfig);
852839
out.writeMap(headers, StreamOutput::writeString);
@@ -1082,8 +1069,8 @@ public DatafeedConfig build() {
10821069
frequency,
10831070
indices,
10841071
queryProvider,
1085-
aggProvider,
10861072
esqlQuery,
1073+
aggProvider,
10871074
scriptFields,
10881075
scrollSize,
10891076
chunkingConfig,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public final class Messages {
3131
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed query is not parsable";
3232
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed aggregations are not parsable";
3333
public static final String DATAFEED_CONFIG_INCOMPATIBLE_WITH_ESQL = "[{0}] and [esql] are incompatible";
34+
public static final String DATAFEED_CONFIG_ESQL_UNAVAILABLE = "[esql] not available on all nodes";
3435
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
3536
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
3637
public static final String DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM = "A date_histogram (or histogram) aggregation is required";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.search.TransportSearchAction;
1415
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -61,6 +62,7 @@
6162
import static java.util.function.Predicate.not;
6263
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
6364
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
65+
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_ESQL_UNAVAILABLE;
6466
import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
6567

6668
/**
@@ -305,6 +307,11 @@ private void putDatafeed(
305307
ClusterState clusterState,
306308
ActionListener<PutDatafeedAction.Response> listener
307309
) {
310+
if (clusterState.getMinTransportVersion().before(TransportVersions.ESQL_IN_DATAFEEDS)
311+
&& request.getDatafeed().getEsqlQuery() != null) {
312+
listener.onFailure(ExceptionsHelper.badRequestException(DATAFEED_CONFIG_ESQL_UNAVAILABLE));
313+
}
314+
308315
DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations(xContentRegistry));
309316

310317
CheckedConsumer<Boolean, Exception> mappingsUpdated = ok -> datafeedConfigProvider.putDatafeedConfig(
@@ -313,22 +320,15 @@ private void putDatafeed(
313320
listener.delegateFailureAndWrap((l, response) -> l.onResponse(new PutDatafeedAction.Response(response.v1())))
314321
);
315322

316-
CheckedConsumer<Boolean, Exception> validationOk = ok -> {
317-
if (clusterState == null) {
318-
logger.warn("Cannot update doc mapping because clusterState == null");
319-
mappingsUpdated.accept(false);
320-
return;
321-
}
322-
ElasticsearchMappings.addDocMappingIfMissing(
323-
MlConfigIndex.indexName(),
324-
MlConfigIndex::mapping,
325-
client,
326-
clusterState,
327-
request.masterNodeTimeout(),
328-
ActionListener.wrap(mappingsUpdated, listener::onFailure),
329-
MlConfigIndex.CONFIG_INDEX_MAPPINGS_VERSION
330-
);
331-
};
323+
CheckedConsumer<Boolean, Exception> validationOk = ok -> ElasticsearchMappings.addDocMappingIfMissing(
324+
MlConfigIndex.indexName(),
325+
MlConfigIndex::mapping,
326+
client,
327+
clusterState,
328+
request.masterNodeTimeout(),
329+
ActionListener.wrap(mappingsUpdated, listener::onFailure),
330+
MlConfigIndex.CONFIG_INDEX_MAPPINGS_VERSION
331+
);
332332

333333
CheckedConsumer<Boolean, Exception> jobOk = ok -> jobConfigProvider.validateDatafeedJob(
334334
request.getDatafeed(),

0 commit comments

Comments
 (0)