Skip to content

Commit c81cf43

Browse files
[refacto] Fix missing conversions of TypeSerializerSnapshot.resolveSchemaCompatibility()
1 parent e1f0eaa commit c81cf43

File tree

5 files changed

+29
-13
lines changed

5 files changed

+29
-13
lines changed

modules/flink-common-api/src/main/java/org/apache/flinkx/api/serializer/ScalaCaseClassSerializerSnapshot.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import static org.apache.flink.util.Preconditions.checkNotNull;
3333
import static org.apache.flink.util.Preconditions.checkState;
3434

35-
/** {@link TypeSerializerSnapshot} for {@link CaseClassSerializer}. */
35+
/**
36+
* {@link TypeSerializerSnapshot} for {@link CaseClassSerializer}.
37+
*/
3638
@Internal
3739
public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product>
3840
extends CompositeTypeSerializerSnapshot<T, CaseClassSerializer<T>> {
@@ -42,13 +44,17 @@ public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product>
4244
private Class<T> type;
4345
private boolean isCaseClassImmutable;
4446

45-
/** Used via reflection. */
47+
/**
48+
* Used via reflection.
49+
*/
4650
@SuppressWarnings("unused")
4751
public ScalaCaseClassSerializerSnapshot() {
4852
super(CaseClassSerializer.class);
4953
}
5054

51-
/** Used for the snapshot path. */
55+
/**
56+
* Used for the snapshot path.
57+
*/
5258
public ScalaCaseClassSerializerSnapshot(CaseClassSerializer<T> serializerInstance) {
5359
super(serializerInstance);
5460
this.type = checkNotNull(serializerInstance.getTupleClass(), "tuple class can not be NULL");
@@ -91,9 +97,13 @@ protected void readOuterSnapshot(
9197

9298
@Override
9399
protected CompositeTypeSerializerSnapshot.OuterSchemaCompatibility
94-
resolveOuterSchemaCompatibility(CaseClassSerializer<T> newSerializer) {
100+
resolveOuterSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot) {
101+
if (!(oldSerializerSnapshot instanceof ScalaCaseClassSerializerSnapshot)) {
102+
return OuterSchemaCompatibility.INCOMPATIBLE;
103+
}
104+
var caseClassSerializerSnapshot = (ScalaCaseClassSerializerSnapshot<T>) oldSerializerSnapshot;
95105
var currentTypeName = Optional.ofNullable(type).map(Class::getName);
96-
var newTypeName = Optional.ofNullable(newSerializer.getTupleClass()).map(Class::getName);
106+
var newTypeName = Optional.ofNullable(caseClassSerializerSnapshot.type).map(Class::getName);
97107
if (currentTypeName.equals(newTypeName)) {
98108
return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
99109
} else {

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ object CoproductSerializer {
117117
})
118118
}
119119

120-
override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] =
120+
override def resolveSchemaCompatibility(
121+
oldSerializer: TypeSerializerSnapshot[T]
122+
): TypeSerializerSchemaCompatibility[T] =
121123
TypeSerializerSchemaCompatibility.compatibleAsIs()
122124

123125
override def restoreSerializer(): TypeSerializer[T] =

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ object MapSerializer {
9999
}
100100

101101
override def resolveSchemaCompatibility(
102-
newSerializer: TypeSerializer[Map[K, V]]
103-
): TypeSerializerSchemaCompatibility[Map[K, V]] = TypeSerializerSchemaCompatibility.compatibleAsIs()
102+
oldSerializer: TypeSerializerSnapshot[Map[K, V]]
103+
): TypeSerializerSchemaCompatibility[Map[K, V]] =
104+
TypeSerializerSchemaCompatibility.compatibleAsIs()
104105

105106
override def restoreSerializer(): TypeSerializer[Map[K, V]] = new MapSerializer(keySerializer, valueSerializer)
106107
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ object MappedSerializer {
9292
}
9393
}
9494

95-
override def resolveSchemaCompatibility(newSerializer: TypeSerializer[A]): TypeSerializerSchemaCompatibility[A] =
95+
override def resolveSchemaCompatibility(
96+
oldSerializer: TypeSerializerSnapshot[A]
97+
): TypeSerializerSchemaCompatibility[A] =
9698
TypeSerializerSchemaCompatibility.compatibleAsIs()
9799

98100
override def writeSnapshot(out: DataOutputView): Unit = {

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package org.apache.flinkx.api.serializer
22

3-
import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot
43
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
5-
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
64
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
75
import org.apache.flink.util.InstantiationUtil
6+
import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot
87

98
class ScalaCaseObjectSerializer[T](clazz: Class[T]) extends ImmutableSerializer[T] {
109
override def copy(source: DataInputView, target: DataOutputView): Unit = {}
@@ -32,8 +31,10 @@ object ScalaCaseObjectSerializer {
3231
out.writeUTF(clazz.getName)
3332
}
3433

35-
override def getCurrentVersion: Int = 1
36-
override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] =
34+
override def getCurrentVersion: Int = 1
35+
override def resolveSchemaCompatibility(
36+
oldSerializer: TypeSerializerSnapshot[T]
37+
): TypeSerializerSchemaCompatibility[T] =
3738
TypeSerializerSchemaCompatibility.compatibleAsIs()
3839

3940
override def restoreSerializer(): TypeSerializer[T] =

0 commit comments

Comments
 (0)