Skip to content

Commit 7108963

Browse files
authored
[ML][Transforms] fix bwc serialization with 7.3 (#48021) (#48052)
1 parent 30eba04 commit 7108963

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public DataFrameTransformStats(StreamInput in) throws IOException {
123123
DataFrameTransformState transformState = new DataFrameTransformState(in);
124124
this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState());
125125
this.reason = transformState.getReason();
126-
this.node = null;
126+
this.node = transformState.getNode();
127127
this.indexerStats = new DataFrameIndexerTransformStats(in);
128128
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
129129
}
@@ -171,8 +171,8 @@ public void writeTo(StreamOutput out) throws IOException {
171171
checkpointingInfo.getNext().getPosition(),
172172
checkpointingInfo.getLast().getCheckpoint(),
173173
reason,
174-
checkpointingInfo.getNext().getCheckpointProgress()).writeTo(out);
175-
out.writeBoolean(false);
174+
checkpointingInfo.getNext().getCheckpointProgress(),
175+
node).writeTo(out);
176176
indexerStats.writeTo(out);
177177
checkpointingInfo.writeTo(out);
178178
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@
66

77
package org.elasticsearch.xpack.core.dataframe.transforms;
88

9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
11+
import org.elasticsearch.common.io.stream.StreamInput;
912
import org.elasticsearch.common.io.stream.Writeable.Reader;
1013
import org.elasticsearch.common.xcontent.XContentParser;
1114
import org.elasticsearch.test.AbstractSerializingTestCase;
1215

1316
import java.io.IOException;
1417
import java.util.function.Predicate;
1518

19+
import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats.State.STARTED;
20+
import static org.hamcrest.Matchers.equalTo;
21+
1622
public class DataFrameTransformStatsTests extends AbstractSerializingTestCase<DataFrameTransformStats> {
1723

1824
public static DataFrameTransformStats randomDataFrameTransformStats() {
@@ -53,4 +59,28 @@ protected String[] getShuffleFieldsExceptions() {
5359
protected Predicate<String> getRandomFieldsExcludeFilter() {
5460
return field -> !field.isEmpty();
5561
}
62+
63+
public void testBwcWith73() throws IOException {
64+
for(int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
65+
DataFrameTransformStats stats = new DataFrameTransformStats("bwc-id",
66+
STARTED,
67+
randomBoolean() ? null : randomAlphaOfLength(100),
68+
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
69+
new DataFrameIndexerTransformStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
70+
new DataFrameTransformCheckpointingInfo(
71+
new DataFrameTransformCheckpointStats(0, null, null, 10, 100),
72+
new DataFrameTransformCheckpointStats(0, null, null, 100, 1000),
73+
// changesLastDetectedAt aren't serialized back
74+
100, null));
75+
try (BytesStreamOutput output = new BytesStreamOutput()) {
76+
output.setVersion(Version.V_7_3_0);
77+
stats.writeTo(output);
78+
try (StreamInput in = output.bytes().streamInput()) {
79+
in.setVersion(Version.V_7_3_0);
80+
DataFrameTransformStats statsFromOld = new DataFrameTransformStats(in);
81+
assertThat(statsFromOld, equalTo(stats));
82+
}
83+
}
84+
}
85+
}
5686
}

0 commit comments

Comments
 (0)