Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/124841.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124841
summary: Pass `IndexReshardingMetadata` over the wire
area: Distributed
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexGraveyardTests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
Expand Down Expand Up @@ -586,13 +587,18 @@ public IndexMetadata randomCreate(String name) {
for (int i = 0; i < aliasCount; i++) {
builder.putAlias(randomAlias());
}
if (randomBoolean()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need any compatibility checks in this test ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering if we have a cluster where the master node is on a different (older) version and the data node that initiates the reshard is on a newer version, how do we handle that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering if we have a cluster where the master node is on a different (older) version and the data node that initiates the reshard is on a newer version, how do we handle that ?

I think we should check at the top of an autoshard action if the cluster-wide minimum version is too low to parse resharding metadata, and fail immediately if so. I don't think it goes in this change though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need any compatibility checks in this test ?

I haven't found BWC tests for the other fields that are conditionally serialized, but it does seem like a good idea. I'll add something.

Copy link
Contributor Author

@bcully bcully Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in fa136c9

builder.reshardingMetadata(
IndexReshardingMetadata.newSplitByMultiple(builder.numberOfShards(), randomIntBetween(2, 5))
);
}
return builder.build();
}

@Override
public IndexMetadata randomChange(IndexMetadata part) {
IndexMetadata.Builder builder = IndexMetadata.builder(part);
switch (randomIntBetween(0, 3)) {
switch (randomIntBetween(0, 4)) {
case 0:
builder.settings(Settings.builder().put(part.getSettings()).put(randomSettings(Settings.EMPTY)));
break;
Expand All @@ -609,6 +615,15 @@ public IndexMetadata randomChange(IndexMetadata part) {
case 3:
builder.putInferenceFields(randomInferenceFields());
break;
case 4:
if (randomBoolean()) {
builder.reshardingMetadata(
IndexReshardingMetadata.newSplitByMultiple(builder.numberOfShards(), randomIntBetween(2, 5))
);
} else {
builder.reshardingMetadata(null);
}
break;
default:
throw new IllegalArgumentException("Shouldn't be here");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE = def(9_030_00_0);
public static final TransportVersion INDEX_RESHARDING_METADATA = def(9_031_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,11 @@ public IndexLongFieldRange getTimeSeriesTimestampRange(DateFieldMapper.DateField
}
}

@Nullable
public IndexReshardingMetadata getReshardingMetadata() {
return reshardingMetadata;
}

@Override
public boolean equals(Object o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add a check for the IndexReshardingMetadata being equal here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is a good point. It did get me to look at which fields are included and there are some others that are serialized but not included in equality comparison (e.g., timestampRange/eventIngestedRange) and I'm not sure I understand whether that's by design or an oversight. But it does seem like resharding metadata should be included regardless. I'll update.

Copy link
Contributor Author

@bcully bcully Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in fa136c9

if (this == o) {
Expand Down Expand Up @@ -1478,6 +1483,12 @@ public boolean equals(Object o) {
if (isSystem != that.isSystem) {
return false;
}
if (reshardingMetadata == null && that.reshardingMetadata != null) {
return false;
}
if (reshardingMetadata != null && reshardingMetadata.equals(that.reshardingMetadata) == false) {
return false;
}
return true;
}

Expand All @@ -1497,6 +1508,9 @@ public int hashCode() {
result = 31 * result + rolloverInfos.hashCode();
result = 31 * result + inferenceFields.hashCode();
result = 31 * result + Boolean.hashCode(isSystem);
if (reshardingMetadata != null) {
result = 31 * result + reshardingMetadata.hashCode();
}
return result;
}

Expand Down Expand Up @@ -1558,6 +1572,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final IndexMetadataStats stats;
private final Double indexWriteLoadForecast;
private final Long shardSizeInBytesForecast;
private final IndexReshardingMetadata reshardingMetadata;

IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
index = after.index.getName();
Expand Down Expand Up @@ -1597,6 +1612,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
stats = after.stats;
indexWriteLoadForecast = after.writeLoadForecast;
shardSizeInBytesForecast = after.shardSizeInBytesForecast;
reshardingMetadata = after.reshardingMetadata;
}

private static final DiffableUtils.DiffableValueReader<String, AliasMetadata> ALIAS_METADATA_DIFF_VALUE_READER =
Expand Down Expand Up @@ -1669,6 +1685,11 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
} else {
eventIngestedRange = IndexLongFieldRange.UNKNOWN;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
reshardingMetadata = in.readOptionalWriteable(IndexReshardingMetadata::new);
} else {
reshardingMetadata = null;
}
}

@Override
Expand Down Expand Up @@ -1707,6 +1728,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalLong(shardSizeInBytesForecast);
}
eventIngestedRange.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
out.writeOptionalWriteable(reshardingMetadata);
}
}

@Override
Expand Down Expand Up @@ -1739,6 +1763,7 @@ public IndexMetadata apply(IndexMetadata part) {
builder.stats(stats);
builder.indexWriteLoadForecast(indexWriteLoadForecast);
builder.shardSizeInBytesForecast(shardSizeInBytesForecast);
builder.reshardingMetadata(reshardingMetadata);
return builder.build(true);
}
}
Expand Down Expand Up @@ -1810,6 +1835,9 @@ public static IndexMetadata readFrom(StreamInput in, @Nullable Function<String,
builder.shardSizeInBytesForecast(in.readOptionalLong());
}
builder.eventIngestedRange(IndexLongFieldRange.readFrom(in));
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
builder.reshardingMetadata(in.readOptionalWriteable(IndexReshardingMetadata::new));
}
return builder.build(true);
}

Expand Down Expand Up @@ -1859,6 +1887,9 @@ public void writeTo(StreamOutput out, boolean mappingsAsHash) throws IOException
out.writeOptionalLong(shardSizeInBytesForecast);
}
eventIngestedRange.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
out.writeOptionalWriteable(reshardingMetadata);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardDocsCondition;
Expand Down Expand Up @@ -166,6 +168,7 @@ public void testIndexMetadataSerialization() throws IOException {
assertEquals(metadata.getForecastedWriteLoad(), fromXContentMeta.getForecastedWriteLoad());
assertEquals(metadata.getForecastedShardSizeInBytes(), fromXContentMeta.getForecastedShardSizeInBytes());
assertEquals(metadata.getInferenceFields(), fromXContentMeta.getInferenceFields());
assertEquals(metadata.getReshardingMetadata(), fromXContentMeta.getReshardingMetadata());

final BytesStreamOutput out = new BytesStreamOutput();
metadata.writeTo(out);
Expand All @@ -192,6 +195,7 @@ public void testIndexMetadataSerialization() throws IOException {
assertEquals(metadata.getForecastedShardSizeInBytes(), deserialized.getForecastedShardSizeInBytes());
assertEquals(metadata.getInferenceFields(), deserialized.getInferenceFields());
assertEquals(metadata.getEventIngestedRange(), deserialized.getEventIngestedRange());
assertEquals(metadata.getReshardingMetadata(), deserialized.getReshardingMetadata());
}
}

Expand Down Expand Up @@ -676,6 +680,34 @@ public void testInferenceFieldMetadata() {
assertThat(idxMeta2.getInferenceFields(), equalTo(dynamicFields));
}

public void testReshardingBWCSerialization() throws IOException {
final int numShards = randomIntBetween(1, 8);
final var settings = indexSettings(IndexVersion.current(), numShards, 0);
final var reshardingMetadata = IndexReshardingMetadata.newSplitByMultiple(numShards, randomIntBetween(2, 5));
IndexMetadata idx = IndexMetadata.builder("test").settings(settings).reshardingMetadata(reshardingMetadata).build();

// the version prior to TransportVersions.INDEX_RESHARDING_METADATA
final var version = TransportVersions.ESQL_FAILURE_FROM_REMOTE;
// should round trip
final var deserialized = roundTripWithVersion(idx, version);

// but be missing resharding metadata
assertNull(deserialized.getReshardingMetadata());
// but otherwise be equal
assertEquals(idx, IndexMetadata.builder(deserialized).reshardingMetadata(reshardingMetadata).build());
}

private IndexMetadata roundTripWithVersion(IndexMetadata indexMetadata, TransportVersion version) throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setTransportVersion(version);
indexMetadata.writeTo(out);
try (StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), writableRegistry())) {
in.setTransportVersion(version);
return IndexMetadata.readFrom(in);
}
}
}

private static Settings indexSettingsWithDataTier(String dataTier) {
return indexSettings(IndexVersion.current(), 1, 0).put(DataTier.TIER_PREFERENCE, dataTier).build();
}
Expand Down