Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ static TransportVersion def(int id) {
public static final TransportVersion V_7_9_0 = def(7_09_00_99);
public static final TransportVersion V_7_10_0 = def(7_10_00_99);
public static final TransportVersion V_8_0_0 = def(8_00_00_99);
public static final TransportVersion V_8_6_0 = def(8_06_00_99);
public static final TransportVersion V_8_6_1 = def(8_06_01_99);
public static final TransportVersion V_8_7_0 = def(8_07_00_99);
public static final TransportVersion V_8_7_1 = def(8_07_01_99);
public static final TransportVersion V_8_8_0 = def(8_08_00_99);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
this.nodeInfo = new NodeInfo(in);
this.nodeStats = new NodeStats(in);
this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
searchUsageStats = new SearchUsageStats(in);
} else {
searchUsageStats = new SearchUsageStats();
}
searchUsageStats = new SearchUsageStats(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
repositoryUsageStats = RepositoryUsageStats.readFrom(in);
searchCcsMetrics = new CCSTelemetrySnapshot(in);
Expand Down Expand Up @@ -120,9 +116,7 @@ public void writeTo(StreamOutput out) throws IOException {
nodeInfo.writeTo(out);
nodeStats.writeTo(out);
out.writeArray(shardsStats);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
searchUsageStats.writeTo(out);
}
searchUsageStats.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
repositoryUsageStats.writeTo(out);
searchCcsMetrics.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -92,8 +91,6 @@ public class PublicationTransportHandler {
TransportRequestOptions.Type.STATE
);

public static final TransportVersion INCLUDES_LAST_COMMITTED_DATA_VERSION = TransportVersions.V_8_6_0;

private final SerializationStatsTracker serializationStatsTracker = new SerializationStatsTracker();

public PublicationTransportHandler(
Expand Down Expand Up @@ -187,29 +184,20 @@ private ClusterState deserializeAndApplyDiff(BytesTransportRequest request, Stre
ClusterState incomingState;
try {
final Diff<ClusterState> diff;
final boolean includesLastCommittedData = request.version().onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION);
final boolean clusterUuidCommitted;
final CoordinationMetadata.VotingConfiguration lastCommittedConfiguration;

// Close stream early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
diff = ClusterState.readDiffFrom(input, currentState.nodes().getLocalNode());
if (includesLastCommittedData) {
clusterUuidCommitted = in.readBoolean();
lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in);
} else {
clusterUuidCommitted = false;
lastCommittedConfiguration = null;
}
clusterUuidCommitted = in.readBoolean();
lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in);
assert input.read() == -1;
}
incomingState = diff.apply(currentState); // might throw IncompatibleClusterStateVersionException
if (includesLastCommittedData) {
final var adjustedMetadata = incomingState.metadata()
.withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration);
if (adjustedMetadata != incomingState.metadata()) {
incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build();
}
final var adjustedMetadata = incomingState.metadata().withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration);
if (adjustedMetadata != incomingState.metadata()) {
incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build();
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
Expand Down Expand Up @@ -305,10 +293,8 @@ private ReleasableBytesReference serializeDiffClusterState(
stream.setTransportVersion(version);
stream.writeBoolean(false);
diff.writeTo(stream);
if (version.onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION)) {
stream.writeBoolean(newState.metadata().clusterUUIDCommitted());
newState.getLastCommittedConfiguration().writeTo(stream);
}
stream.writeBoolean(newState.metadata().clusterUUIDCommitted());
newState.getLastCommittedConfiguration().writeTo(stream);
uncompressedBytes = stream.position();
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,6 @@ public Iterator<Setting<?>> settings() {

public static final String INDEX_STATE_FILE_PREFIX = "state-";

static final TransportVersion STATS_AND_FORECAST_ADDED = TransportVersions.V_8_6_0;

private final int routingNumShards;
private final int routingFactor;
private final int routingPartitionSize;
Expand Down Expand Up @@ -1726,15 +1724,9 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
}
isSystem = in.readBoolean();
timestampRange = IndexLongFieldRange.readFrom(in);
if (in.getTransportVersion().onOrAfter(STATS_AND_FORECAST_ADDED)) {
stats = in.readOptionalWriteable(IndexMetadataStats::new);
indexWriteLoadForecast = in.readOptionalDouble();
shardSizeInBytesForecast = in.readOptionalLong();
} else {
stats = null;
indexWriteLoadForecast = null;
shardSizeInBytesForecast = null;
}
stats = in.readOptionalWriteable(IndexMetadataStats::new);
indexWriteLoadForecast = in.readOptionalDouble();
shardSizeInBytesForecast = in.readOptionalLong();
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
eventIngestedRange = IndexLongFieldRange.readFrom(in);
} else {
Expand Down Expand Up @@ -1773,11 +1765,9 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeBoolean(isSystem);
timestampRange.writeTo(out);
if (out.getTransportVersion().onOrAfter(STATS_AND_FORECAST_ADDED)) {
out.writeOptionalWriteable(stats);
out.writeOptionalDouble(indexWriteLoadForecast);
out.writeOptionalLong(shardSizeInBytesForecast);
}
out.writeOptionalWriteable(stats);
out.writeOptionalDouble(indexWriteLoadForecast);
out.writeOptionalLong(shardSizeInBytesForecast);
eventIngestedRange.writeTo(out);
if (out.getTransportVersion().supports(INDEX_RESHARDING_METADATA)) {
out.writeOptionalWriteable(reshardingMetadata);
Expand Down Expand Up @@ -1880,11 +1870,9 @@ public static IndexMetadata readFrom(StreamInput in, @Nullable Function<String,
builder.system(in.readBoolean());
builder.timestampRange(IndexLongFieldRange.readFrom(in));

if (in.getTransportVersion().onOrAfter(STATS_AND_FORECAST_ADDED)) {
builder.stats(in.readOptionalWriteable(IndexMetadataStats::new));
builder.indexWriteLoadForecast(in.readOptionalDouble());
builder.shardSizeInBytesForecast(in.readOptionalLong());
}
builder.stats(in.readOptionalWriteable(IndexMetadataStats::new));
builder.indexWriteLoadForecast(in.readOptionalDouble());
builder.shardSizeInBytesForecast(in.readOptionalLong());
builder.eventIngestedRange(IndexLongFieldRange.readFrom(in));
if (in.getTransportVersion().supports(INDEX_RESHARDING_METADATA)) {
builder.reshardingMetadata(in.readOptionalWriteable(IndexReshardingMetadata::new));
Expand Down Expand Up @@ -1932,11 +1920,9 @@ public void writeTo(StreamOutput out, boolean mappingsAsHash) throws IOException
}
out.writeBoolean(isSystem);
timestampRange.writeTo(out);
if (out.getTransportVersion().onOrAfter(STATS_AND_FORECAST_ADDED)) {
out.writeOptionalWriteable(stats);
out.writeOptionalDouble(writeLoadForecast);
out.writeOptionalLong(shardSizeInBytesForecast);
}
out.writeOptionalWriteable(stats);
out.writeOptionalDouble(writeLoadForecast);
out.writeOptionalLong(shardSizeInBytesForecast);
eventIngestedRange.writeTo(out);
if (out.getTransportVersion().supports(INDEX_RESHARDING_METADATA)) {
out.writeOptionalWriteable(reshardingMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
Expand Down Expand Up @@ -42,7 +41,6 @@ public final class ShardRouting implements Writeable, ToXContentObject {
* Used if shard size is not available
*/
public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1;
private static final TransportVersion RELOCATION_FAILURE_INFO_VERSION = TransportVersions.V_8_6_0;

private final ShardId shardId;
private final String currentNodeId;
Expand Down Expand Up @@ -342,11 +340,7 @@ public ShardRouting(ShardId shardId, StreamInput in) throws IOException {
recoverySource = null;
}
unassignedInfo = in.readOptionalWriteable(UnassignedInfo::fromStreamInput);
if (in.getTransportVersion().onOrAfter(RELOCATION_FAILURE_INFO_VERSION)) {
relocationFailureInfo = RelocationFailureInfo.readFrom(in);
} else {
relocationFailureInfo = RelocationFailureInfo.NO_FAILURES;
}
relocationFailureInfo = RelocationFailureInfo.readFrom(in);
allocationId = in.readOptionalWriteable(AllocationId::new);
if (state == ShardRoutingState.RELOCATING || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.STARTED) {
expectedShardSize = in.readLong();
Expand Down Expand Up @@ -380,9 +374,7 @@ public void writeToThin(StreamOutput out) throws IOException {
recoverySource.writeTo(out);
}
out.writeOptionalWriteable(unassignedInfo);
if (out.getTransportVersion().onOrAfter(RELOCATION_FAILURE_INFO_VERSION)) {
relocationFailureInfo.writeTo(out);
}
relocationFailureInfo.writeTo(out);
out.writeOptionalWriteable(allocationId);
if (state == ShardRoutingState.RELOCATING || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.STARTED) {
out.writeLong(expectedShardSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public static class Stats implements Writeable, ToXContentFragment {
private static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = TransportVersion.fromName(
"index_stats_and_metadata_include_peak_write_load"
);
private static final TransportVersion WRITE_LOAD_AVG_SUPPORTED_VERSION = TransportVersions.V_8_6_0;
private static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = TransportVersion.fromName(
"write_load_includes_buffer_writes"
);
Expand Down Expand Up @@ -74,10 +73,8 @@ public Stats(StreamInput in) throws IOException {
noopUpdateCount = in.readVLong();
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();
if (in.getTransportVersion().onOrAfter(WRITE_LOAD_AVG_SUPPORTED_VERSION)) {
totalIndexingTimeSinceShardStartedInNanos = in.readLong();
totalActiveTimeInNanos = in.readLong();
}
totalIndexingTimeSinceShardStartedInNanos = in.readLong();
totalActiveTimeInNanos = in.readLong();
if (in.getTransportVersion().supports(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) {
recentIndexingLoad = in.readDouble();
} else {
Expand Down Expand Up @@ -312,10 +309,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(noopUpdateCount);
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);
if (out.getTransportVersion().onOrAfter(WRITE_LOAD_AVG_SUPPORTED_VERSION)) {
out.writeLong(totalIndexingTimeSinceShardStartedInNanos);
out.writeLong(totalActiveTimeInNanos);
}
out.writeLong(totalIndexingTimeSinceShardStartedInNanos);
out.writeLong(totalActiveTimeInNanos);
if (out.getTransportVersion().supports(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) {
out.writeDouble(recentIndexingLoad);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public DfsSearchResult(StreamInput in) throws IOException {
DfsKnnResults results = in.readOptionalWriteable(DfsKnnResults::new);
knnResults = results != null ? List.of(results) : List.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
searchProfileDfsPhaseResult = in.readOptionalWriteable(SearchProfileDfsPhaseResult::new);
}
searchProfileDfsPhaseResult = in.readOptionalWriteable(SearchProfileDfsPhaseResult::new);
}

public DfsSearchResult(ShardSearchContextId contextId, SearchShardTarget shardTarget, ShardSearchRequest shardSearchRequest) {
Expand Down Expand Up @@ -146,9 +144,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeOptionalWriteable(knnResults == null || knnResults.isEmpty() ? null : knnResults.get(0));
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
out.writeOptionalWriteable(searchProfileDfsPhaseResult);
}
out.writeOptionalWriteable(searchProfileDfsPhaseResult);
}

public static void writeFieldStats(StreamOutput out, Map<String, CollectionStatistics> fieldStatistics) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.search.profile;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -43,9 +42,7 @@ public SearchProfileQueryPhaseResult(
}

public SearchProfileQueryPhaseResult(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
searchProfileDfsPhaseResult = in.readOptionalWriteable(SearchProfileDfsPhaseResult::new);
}
searchProfileDfsPhaseResult = in.readOptionalWriteable(SearchProfileDfsPhaseResult::new);
int profileSize = in.readVInt();
List<QueryProfileShardResult> queryProfileResults = new ArrayList<>(profileSize);
for (int i = 0; i < profileSize; i++) {
Expand All @@ -58,9 +55,7 @@ public SearchProfileQueryPhaseResult(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
out.writeOptionalWriteable(searchProfileDfsPhaseResult);
}
out.writeOptionalWriteable(searchProfileDfsPhaseResult);
out.writeVInt(queryProfileResults.size());
for (QueryProfileShardResult queryShardResult : queryProfileResults) {
queryShardResult.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,22 +278,14 @@ public Response(StreamInput in) throws IOException {
super(in);

// Multiple results added in 8.6.1
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_6_1)) {
results = in.readNamedWriteableCollectionAsList(InferenceResults.class);
} else {
results = List.of(in.readNamedWriteable(InferenceResults.class));
}
results = in.readNamedWriteableCollectionAsList(InferenceResults.class);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_6_1)) {
out.writeNamedWriteableCollection(results);
} else {
out.writeNamedWriteable(results.get(0));
}
out.writeNamedWriteableCollection(results);
}

public List<InferenceResults> getResults() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,7 @@ public Request(StreamInput in) throws IOException {
threadsPerAllocation = in.readVInt();
queueCapacity = in.readVInt();
this.cacheSize = in.readOptionalWriteable(ByteSizeValue::readFrom);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
this.priority = in.readEnum(Priority.class);
} else {
this.priority = Priority.NORMAL;
}
this.priority = in.readEnum(Priority.class);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
this.deploymentId = in.readString();
} else {
Expand Down Expand Up @@ -302,9 +298,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(threadsPerAllocation);
out.writeVInt(queueCapacity);
out.writeOptionalWriteable(cacheSize);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
out.writeEnum(priority);
}
out.writeEnum(priority);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
out.writeString(deploymentId);
}
Expand Down Expand Up @@ -574,11 +568,7 @@ public TaskParams(StreamInput in) throws IOException {
this.numberOfAllocations = in.readVInt();
this.queueCapacity = in.readVInt();
this.cacheSize = in.readOptionalWriteable(ByteSizeValue::readFrom);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
this.priority = in.readEnum(Priority.class);
} else {
this.priority = Priority.NORMAL;
}
this.priority = in.readEnum(Priority.class);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
this.deploymentId = in.readString();
} else {
Expand Down Expand Up @@ -638,9 +628,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numberOfAllocations);
out.writeVInt(queueCapacity);
out.writeOptionalWriteable(cacheSize);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_6_0)) {
out.writeEnum(priority);
}
out.writeEnum(priority);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
out.writeString(deploymentId);
}
Expand Down
Loading