Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_DEEPSEEK_8_19 = def(8_841_0_09);
public static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE_8_19 = def(8_841_0_10);
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE_8_19 = def(8_841_0_11);
public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA_8_19 = def(8_841_0_13);
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0 = def(9_000_0_00);
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01);
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02);
Expand Down
17 changes: 16 additions & 1 deletion server/src/main/java/org/elasticsearch/cluster/NamedDiff.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,23 @@
*/
public interface NamedDiff<T extends Diffable<T>> extends Diff<T>, NamedWriteable {
/**
* The minimal version of the recipient this custom object can be sent to
* The minimal version of the recipient this object can be sent to.
* See {@link #supportsVersion(TransportVersion)} for the default serialization check.
*/
TransportVersion getMinimalSupportedVersion();

/**
* Determines whether this instance should be serialized based on the provided transport version.
*
* The default implementation returns {@code true} if the given transport version is
* equal to or newer than {@link #getMinimalSupportedVersion()}.
* Subclasses may override this method to define custom serialization logic.
*
* @param version the transport version of the receiving node
* @return {@code true} if the instance should be serialized, {@code false} otherwise
*/
default boolean supportsVersion(TransportVersion version) {
return version.onOrAfter(getMinimalSupportedVersion());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public T read(StreamInput in, String key) throws IOException {

@Override
public boolean supportsVersion(Diff<T> value, TransportVersion version) {
return version.onOrAfter(((NamedDiff<?>) value).getMinimalSupportedVersion());
return ((NamedDiff<?>) value).supportsVersion(version);
}

@Override
public boolean supportsVersion(T value, TransportVersion version) {
return version.onOrAfter(value.getMinimalSupportedVersion());
return value.supportsVersion(version);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,27 @@ public interface VersionedNamedWriteable extends NamedWriteable {
String getWriteableName();

/**
* The minimal version of the recipient this object can be sent to
* The minimal version of the recipient this object can be sent to.
* See {@link #supportsVersion(TransportVersion)} for the default serialization check.
*/
TransportVersion getMinimalSupportedVersion();

/**
* Tests whether or not the custom should be serialized. The criteria is the output stream must be at least the minimum supported
* version of the custom. That is, we only serialize customs to clients than can understand the custom based on the version of the
* client.
* Determines whether this instance should be serialized based on the provided transport version.
*
* @param out the output stream
* @param custom the custom to serialize
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
* The default implementation returns {@code true} if the given transport version is
* equal to or newer than {@link #getMinimalSupportedVersion()}.
* Subclasses may override this method to define custom serialization logic.
*
* @param version the transport version of the receiving node
* @return {@code true} if the instance should be serialized, {@code false} otherwise
*/
static <T extends VersionedNamedWriteable> boolean shouldSerialize(final StreamOutput out, final T custom) {
return out.getTransportVersion().onOrAfter(custom.getMinimalSupportedVersion());
default boolean supportsVersion(TransportVersion version) {
return version.onOrAfter(getMinimalSupportedVersion());
}

/**
* Writes all those values in the given map to {@code out} that pass the version check in {@link #shouldSerialize} as a list.
* Writes all those values in the given map to {@code out} that pass the version check in {@link VersionedNamedWriteable#supportsVersion} as a list.
*
* @param out stream to write to
* @param customs map of customs
Expand All @@ -58,13 +59,13 @@ static void writeVersionedWriteables(StreamOutput out, Iterable<? extends Versio
// filter out objects not supported by the stream version
int numberOfCompatibleValues = 0;
for (var value : writeables) {
if (shouldSerialize(out, value)) {
if (value.supportsVersion(out.getTransportVersion())) {
numberOfCompatibleValues++;
}
}
out.writeVInt(numberOfCompatibleValues);
for (var value : writeables) {
if (shouldSerialize(out, value)) {
if (value.supportsVersion(out.getTransportVersion())) {
out.writeNamedWriteable(value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA;
import static org.elasticsearch.TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA_8_19;
import static org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper.ElementType;
import static org.elasticsearch.inference.TaskType.CHAT_COMPLETION;
import static org.elasticsearch.inference.TaskType.COMPLETION;
Expand Down Expand Up @@ -161,7 +163,12 @@ public String getWriteableName() {

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA;
return TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA_8_19;
}

@Override
public boolean supportsVersion(TransportVersion version) {
return version.isPatchFrom(INFERENCE_MODEL_REGISTRY_METADATA_8_19) || version.onOrAfter(INFERENCE_MODEL_REGISTRY_METADATA);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -102,7 +101,7 @@ default long getNumberOfTasksOnNode(String nodeId, String taskName) {
default void doWriteTo(StreamOutput out) throws IOException {
out.writeLong(getLastAllocationId());
Map<String, PersistentTask<?>> filteredTasks = tasks().stream()
.filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams()))
.filter(t -> t.getParams().supportsVersion(out.getTransportVersion()))
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
out.writeMap(filteredTasks, StreamOutput::writeWriteable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -33,7 +32,7 @@ public static OperatorStatus readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(operator);
out.writeOptionalNamedWriteable(status != null && VersionedNamedWriteable.shouldSerialize(out, status) ? status : null);
out.writeOptionalNamedWriteable(status != null && status.supportsVersion(out.getTransportVersion()) ? status : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA;
import static org.elasticsearch.TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA_8_19;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -234,7 +236,12 @@ public String getWriteableName() {

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA;
return INFERENCE_MODEL_REGISTRY_METADATA_8_19;
}

@Override
public boolean supportsVersion(TransportVersion version) {
return shouldSerialize(version);
}

@Override
Expand Down Expand Up @@ -300,7 +307,12 @@ public String getWriteableName() {

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.INFERENCE_MODEL_REGISTRY_METADATA;
return INFERENCE_MODEL_REGISTRY_METADATA_8_19;
}

@Override
public boolean supportsVersion(TransportVersion version) {
return shouldSerialize(version);
}

@Override
Expand All @@ -313,4 +325,8 @@ public Metadata.ProjectCustom apply(Metadata.ProjectCustom part) {
}
}
}

static boolean shouldSerialize(TransportVersion version) {
return version.isPatchFrom(INFERENCE_MODEL_REGISTRY_METADATA_8_19) || version.onOrAfter(INFERENCE_MODEL_REGISTRY_METADATA);
}
}