Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/124841.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124841
summary: Pass `IndexReshardingMetadata` over the wire
area: Distributed
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexGraveyardTests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
Expand Down Expand Up @@ -586,13 +587,18 @@ public IndexMetadata randomCreate(String name) {
for (int i = 0; i < aliasCount; i++) {
builder.putAlias(randomAlias());
}
if (randomBoolean()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need any compatibility checks in this test ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering if we have a cluster where the master node is on a different (older) version and the data node that initiates the reshard is on a newer version, how do we handle that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering if we have a cluster where the master node is on a different (older) version and the data node that initiates the reshard is on a newer version, how do we handle that ?

I think we should check at the top of an autoshard action if the cluster-wide minimum version is too low to parse resharding metadata, and fail immediately if so. I don't think it goes in this change though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need any compatibility checks in this test ?

I haven't found BWC tests for the other fields that are conditionally serialized, but it does seem like a good idea. I'll add something.

Copy link
Contributor Author

@bcully bcully Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in fa136c9

builder.reshardingMetadata(
IndexReshardingMetadata.newSplitByMultiple(builder.numberOfShards(), randomIntBetween(2, 5))
);
}
return builder.build();
}

@Override
public IndexMetadata randomChange(IndexMetadata part) {
IndexMetadata.Builder builder = IndexMetadata.builder(part);
switch (randomIntBetween(0, 3)) {
switch (randomIntBetween(0, 4)) {
case 0:
builder.settings(Settings.builder().put(part.getSettings()).put(randomSettings(Settings.EMPTY)));
break;
Expand All @@ -609,6 +615,15 @@ public IndexMetadata randomChange(IndexMetadata part) {
case 3:
builder.putInferenceFields(randomInferenceFields());
break;
case 4:
if (randomBoolean()) {
builder.reshardingMetadata(
IndexReshardingMetadata.newSplitByMultiple(builder.numberOfShards(), randomIntBetween(2, 5))
);
} else {
builder.reshardingMetadata(null);
}
break;
default:
throw new IllegalArgumentException("Shouldn't be here");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00);
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
public static final TransportVersion INDEX_RESHARDING_METADATA = def(9_030_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,11 @@ public IndexLongFieldRange getTimeSeriesTimestampRange(DateFieldMapper.DateField
}
}

@Nullable
public IndexReshardingMetadata getReshardingMetadata() {
return reshardingMetadata;
}

@Override
public boolean equals(Object o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add a check for the IndexReshardingMetadata being equal here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is a good point. It did get me to look at which fields are included and there are some others that are serialized but not included in equality comparison (e.g., timestampRange/eventIngestedRange) and I'm not sure I understand whether that's by design or an oversight. But it does seem like resharding metadata should be included regardless. I'll update.

Copy link
Contributor Author

@bcully bcully Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in fa136c9

if (this == o) {
Expand Down Expand Up @@ -1558,6 +1563,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final IndexMetadataStats stats;
private final Double indexWriteLoadForecast;
private final Long shardSizeInBytesForecast;
private final IndexReshardingMetadata reshardingMetadata;

IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
index = after.index.getName();
Expand Down Expand Up @@ -1597,6 +1603,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
stats = after.stats;
indexWriteLoadForecast = after.writeLoadForecast;
shardSizeInBytesForecast = after.shardSizeInBytesForecast;
reshardingMetadata = after.reshardingMetadata;
}

private static final DiffableUtils.DiffableValueReader<String, AliasMetadata> ALIAS_METADATA_DIFF_VALUE_READER =
Expand Down Expand Up @@ -1669,6 +1676,11 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
} else {
eventIngestedRange = IndexLongFieldRange.UNKNOWN;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
reshardingMetadata = in.readOptionalWriteable(IndexReshardingMetadata::new);
} else {
reshardingMetadata = null;
}
}

@Override
Expand Down Expand Up @@ -1707,6 +1719,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalLong(shardSizeInBytesForecast);
}
eventIngestedRange.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
out.writeOptionalWriteable(reshardingMetadata);
}
}

@Override
Expand Down Expand Up @@ -1739,6 +1754,7 @@ public IndexMetadata apply(IndexMetadata part) {
builder.stats(stats);
builder.indexWriteLoadForecast(indexWriteLoadForecast);
builder.shardSizeInBytesForecast(shardSizeInBytesForecast);
builder.reshardingMetadata(reshardingMetadata);
return builder.build(true);
}
}
Expand Down Expand Up @@ -1810,6 +1826,9 @@ public static IndexMetadata readFrom(StreamInput in, @Nullable Function<String,
builder.shardSizeInBytesForecast(in.readOptionalLong());
}
builder.eventIngestedRange(IndexLongFieldRange.readFrom(in));
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
builder.reshardingMetadata(in.readOptionalWriteable(IndexReshardingMetadata::new));
}
return builder.build(true);
}

Expand Down Expand Up @@ -1859,6 +1878,9 @@ public void writeTo(StreamOutput out, boolean mappingsAsHash) throws IOException
out.writeOptionalLong(shardSizeInBytesForecast);
}
eventIngestedRange.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_RESHARDING_METADATA)) {
out.writeOptionalWriteable(reshardingMetadata);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public void testIndexMetadataSerialization() throws IOException {
assertEquals(metadata.getForecastedWriteLoad(), fromXContentMeta.getForecastedWriteLoad());
assertEquals(metadata.getForecastedShardSizeInBytes(), fromXContentMeta.getForecastedShardSizeInBytes());
assertEquals(metadata.getInferenceFields(), fromXContentMeta.getInferenceFields());
assertEquals(metadata.getReshardingMetadata(), fromXContentMeta.getReshardingMetadata());

final BytesStreamOutput out = new BytesStreamOutput();
metadata.writeTo(out);
Expand All @@ -192,6 +193,7 @@ public void testIndexMetadataSerialization() throws IOException {
assertEquals(metadata.getForecastedShardSizeInBytes(), deserialized.getForecastedShardSizeInBytes());
assertEquals(metadata.getInferenceFields(), deserialized.getInferenceFields());
assertEquals(metadata.getEventIngestedRange(), deserialized.getEventIngestedRange());
assertEquals(metadata.getReshardingMetadata(), deserialized.getReshardingMetadata());
}
}

Expand Down