Skip to content

Commit ac2f3a4

Browse files
fix a missing copy of the arity field in direct-memory copy() (#272)
1 parent e89ae23 commit ac2f3a4

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class CaseClassSerializer[T <: Product](
4343
private val nullPadding: Array[Byte] = if (super.getLength > 0) new Array(super.getLength) else EmptyByteArray
4444

4545
override val isImmutableType: Boolean = isCaseClassImmutable && fieldSerializers.forall(_.isImmutableType)
46-
val isImmutableSerializer: Boolean = fieldSerializers.forall(s => s.duplicate().eq(s))
46+
val isImmutableSerializer: Boolean = fieldSerializers.forall(s => s.duplicate().eq(s))
4747

4848
// In Flink, serializers & serializer snapshotters have strict ser/de requirements.
4949
// Both need to be capable of creating one another.
@@ -131,6 +131,17 @@ class CaseClassSerializer[T <: Product](
131131
}
132132
}
133133

134+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
135+
val sourceArity = source.readInt()
136+
target.writeInt(sourceArity)
137+
if (sourceArity == -1) {
138+
source.skipBytesToRead(nullPadding.length)
139+
target.skipBytesToWrite(nullPadding.length)
140+
} else {
141+
super.copy(source, target)
142+
}
143+
}
144+
134145
override def createInstance(fields: Array[AnyRef]): T = {
135146
constructor(fields)
136147
}

modules/flink-common-api/src/test/scala/org/apache/flinkx/api/serializer/CaseClassSerializerTest.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer
22

33
import org.apache.flink.api.common.typeutils.base.StringSerializer
44
import org.apache.flink.api.java.typeutils.runtime.RowSerializer
5+
import org.apache.flink.core.memory.{DataInputDeserializer, DataOutputSerializer}
56
import org.apache.flinkx.api.serializer.CaseClassSerializerTest.{Immutable, Mutable, OuterImmutable, OuterMutable}
67
import org.scalatest.flatspec.AnyFlatSpec
78
import org.scalatest.matchers.should.Matchers
@@ -90,6 +91,22 @@ class CaseClassSerializerTest extends AnyFlatSpec with Matchers {
9091
resultData shouldEqual expectedData
9192
}
9293

94+
it should "copy the serialized stream" in {
95+
val serializer = new CaseClassSerializer[Immutable](classOf[Immutable], Array(StringSerializer.INSTANCE), true)
96+
val outerSerializer = new CaseClassSerializer[OuterMutable](classOf[OuterMutable], Array(serializer), true)
97+
val expectedData = OuterMutable(Immutable("a"))
98+
99+
val output = new DataOutputSerializer(1024)
100+
outerSerializer.serialize(expectedData, output)
101+
val input = new DataInputDeserializer(output.getSharedBuffer)
102+
val newOutput = new DataOutputSerializer(1024)
103+
outerSerializer.copy(input, newOutput)
104+
val newInput = new DataInputDeserializer(newOutput.getSharedBuffer)
105+
val resultData = outerSerializer.deserialize(newInput)
106+
107+
resultData shouldEqual expectedData
108+
}
109+
93110
"duplicate" should "return itself when the serializer is immutable" in {
94111
val serializer = new CaseClassSerializer[Mutable](classOf[Mutable], Array(StringSerializer.INSTANCE), false)
95112
serializer.duplicate() should be theSameInstanceAs serializer

0 commit comments

Comments
 (0)