Skip to content

Commit 58ad376

Browse files
feat(#286): add serializers for DurationSerializer, FiniteDurationSerializer, Ordering and SortedSet
1 parent 855ae96 commit 58ad376

File tree

9 files changed

+794
-11
lines changed

9 files changed

+794
-11
lines changed

README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,50 @@ implicit val mapper2: TypeMapper[WrappedString, String] = new TypeMapper[Wrapped
352352
}
353353
```
354354

355+
### Ordering
356+
357+
`SortedSet` requires a type-information for its elements and also for the ordering of the elements. Type-information of default orderings are not implicitly available in the context because we cannot make the assumption the user wants to use the natural ordering or a custom one.
358+
359+
Type-information of default ordering are available in `org.apache.flinkx.api.serializer.OrderingTypeInfo` and can be used as follows:
360+
```scala mdoc:reset-object
361+
import org.apache.flinkx.api._
362+
import org.apache.flinkx.api.serializers._
363+
364+
case class Foo(bars: SortedSet[String])
365+
366+
object Foo {
367+
implicit val fooInfo: TypeInformation[Foo] = {
368+
// type-information for Ordering need to be explicitly put in the context
369+
implicit val orderingStringInfo: TypeInformation[Ordering[String]] =
370+
OrderingTypeInfo.DefaultStringOrderingInfo
371+
deriveTypeInformation
372+
}
373+
}
374+
```
375+
376+
It's also possible to derive the type-information of a custom ordering if it's an ADT:
377+
```scala mdoc:reset-object
378+
import org.apache.flinkx.api._
379+
import org.apache.flinkx.api.serializers._
380+
381+
case class Bar(a: Int, b: String)
382+
383+
case object BarOrdering extends Ordering[Bar] {
384+
override def compare(x: Bar, y: Bar): Int = x.a.compare(y.a)
385+
}
386+
387+
case class Foo(bar: SortedSet[Bar])
388+
389+
object Foo {
390+
implicit val fooInfo: TypeInformation[Foo] = {
391+
// Derive the type-information of custom Bar ordering
392+
implicit val barOrderingInfo: TypeInformation[Ordering[Bar]] =
393+
OrderingTypeInfo.deriveOrdering[BarOrdering.type, Bar]
394+
deriveTypeInformation
395+
}
396+
}
397+
```
398+
355399
### Schema evolution
356400

357401
#### ADT

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnap
2222
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
2323
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
2424
import org.apache.flink.types.NullFieldException
25+
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}
2526
import org.apache.flinkx.api.serializer.CaseClassSerializer.EmptyByteArray
2627
import org.slf4j.{Logger, LoggerFactory}
2728

@@ -103,11 +104,13 @@ class CaseClassSerializer[T <: Product](
103104
createInstance(fields)
104105
}
105106

106-
override val getLength: Int = if (super.getLength == -1) -1 else super.getLength + 4 // +4 bytes for the arity field
107+
override val getLength: Int =
108+
if (super.getLength == VariableLengthDataType) VariableLengthDataType
109+
else super.getLength + 4 // +4 bytes for the arity field
107110

108111
def serialize(value: T, target: DataOutputView): Unit = {
109-
// Write an arity of -1 to indicate null value
110-
val sourceArity = if (value == null) -1 else arity
112+
// Write a negative arity to indicate null value
113+
val sourceArity = if (value == null) NullMarker else arity
111114
target.writeInt(sourceArity)
112115
if (value == null) target.write(nullPadding)
113116

@@ -129,7 +132,7 @@ class CaseClassSerializer[T <: Product](
129132

130133
def deserialize(source: DataInputView): T = {
131134
val sourceArity = source.readInt()
132-
if (sourceArity == -1) {
135+
if (sourceArity < 0) {
133136
source.skipBytesToRead(nullPadding.length)
134137
null.asInstanceOf[T]
135138
} else {
@@ -146,7 +149,7 @@ class CaseClassSerializer[T <: Product](
146149
override def copy(source: DataInputView, target: DataOutputView): Unit = {
147150
val sourceArity = source.readInt()
148151
target.writeInt(sourceArity)
149-
if (sourceArity == -1) {
152+
if (sourceArity < 0) {
150153
source.skipBytesToRead(nullPadding.length)
151154
target.skipBytesToWrite(nullPadding.length)
152155
} else {

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]](
6969
case "long" => out.writeUTF("java.lang.Long")
7070
case "byte" => out.writeUTF("java.lang.Byte")
7171
case "short" => out.writeUTF("java.lang.Short")
72-
case "char" => out.writeUTF("java.lang.Char")
72+
case "char" => out.writeUTF("java.lang.Character")
7373
case "boolean" => out.writeUTF("java.lang.Boolean")
74+
case "void" => out.writeUTF("java.lang.Void")
7475
case other => out.writeUTF(other)
7576
}
7677
TypeSerializerSnapshot.writeVersionedSnapshot(out, nestedSerializer.snapshotConfiguration())
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
6+
/** Generic serializer snapshot for sorted collection.
7+
* @param aSerializer
8+
* the serializer of `A`
9+
* @param aOrderingSerializer
10+
* the serializer of `Ordering[A]`
11+
* @param sClass
12+
* the class of `S`
13+
* @param aClass
14+
* the class of `A`
15+
* @tparam F
16+
* the type of the serialized collection
17+
* @tparam A
18+
* the type of the collection's elements
19+
* @tparam S
20+
* the type of the collection serializer
21+
*/
22+
class SortedCollectionSerializerSnapshot[F[_], A, S <: TypeSerializer[F[A]]](
23+
aSerializer: TypeSerializer[A],
24+
sClass: Class[S],
25+
aClass: Class[A],
26+
private var aOrderingSerializer: TypeSerializer[Ordering[A]]
27+
) extends CollectionSerializerSnapshot[F, A, S](aSerializer, sClass, aClass) {
28+
29+
// Empty constructor is required to instantiate this class during deserialization.
30+
def this() = this(null, null, null, null)
31+
32+
override def getCurrentVersion: Int = super.getCurrentVersion // Must be aligned with CollectionSerializerSnapshot
33+
34+
override def writeSnapshot(out: DataOutputView): Unit = {
35+
super.writeSnapshot(out)
36+
TypeSerializerSnapshot.writeVersionedSnapshot(out, aOrderingSerializer.snapshotConfiguration())
37+
}
38+
39+
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
40+
super.readSnapshot(readVersion, in, userCodeClassLoader)
41+
aOrderingSerializer =
42+
TypeSerializerSnapshot.readVersionedSnapshot[Ordering[A]](in, userCodeClassLoader).restoreSerializer()
43+
}
44+
45+
override def restoreSerializer(): TypeSerializer[F[A]] = {
46+
val constructor = clazz.getConstructors()(0)
47+
constructor.newInstance(nestedSerializer, vclazz, aOrderingSerializer).asInstanceOf[TypeSerializer[F[A]]]
48+
}
49+
50+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}
6+
7+
import java.util.Objects
8+
import scala.collection.immutable.SortedSet
9+
import scala.collection.mutable
10+
11+
/** Serializer for [[SortedSet]]. Handle nullable value. */
12+
class SortedSetSerializer[A](
13+
val aSerializer: TypeSerializer[A],
14+
val aClass: Class[A],
15+
val aOrderingSerializer: TypeSerializer[Ordering[A]]
16+
) extends MutableSerializer[SortedSet[A]] { // SortedSet is immutable, but its elements can be mutable
17+
18+
override val isImmutableType: Boolean = aSerializer.isImmutableType && aOrderingSerializer.isImmutableType
19+
20+
override def duplicate(): SortedSetSerializer[A] = {
21+
val duplicatedASerializer = aSerializer.duplicate()
22+
val duplicatedAOrderingSerializer = aOrderingSerializer.duplicate()
23+
if (duplicatedASerializer.eq(aSerializer) && duplicatedAOrderingSerializer.eq(aOrderingSerializer)) {
24+
this
25+
} else {
26+
new SortedSetSerializer[A](duplicatedASerializer, aClass, duplicatedAOrderingSerializer)
27+
}
28+
}
29+
30+
override def createInstance(): SortedSet[A] = SortedSet.empty[A](aOrderingSerializer.createInstance())
31+
32+
override def copy(from: SortedSet[A]): SortedSet[A] =
33+
if (from == null || isImmutableType) {
34+
from
35+
} else {
36+
implicit val ordering: Ordering[A] =
37+
if (aOrderingSerializer.isImmutableType) from.ordering else aOrderingSerializer.copy(from.ordering)
38+
if (aSerializer.isImmutableType) {
39+
SortedSet.from(from)
40+
} else {
41+
from.map(aSerializer.copy)
42+
}
43+
}
44+
45+
override def getLength: Int = VariableLengthDataType
46+
47+
override def serialize(records: SortedSet[A], target: DataOutputView): Unit =
48+
if (records == null) {
49+
target.writeInt(NullMarker)
50+
} else {
51+
target.writeInt(records.size)
52+
records.foreach(element => aSerializer.serialize(element, target))
53+
aOrderingSerializer.serialize(records.ordering, target)
54+
}
55+
56+
override def deserialize(source: DataInputView): SortedSet[A] = {
57+
var remaining = source.readInt()
58+
if (remaining == NullMarker) {
59+
null
60+
} else {
61+
val buffer = mutable.Buffer[A]()
62+
while (remaining > 0) {
63+
buffer.append(aSerializer.deserialize(source))
64+
remaining -= 1
65+
}
66+
implicit val ordering: Ordering[A] = aOrderingSerializer.deserialize(source)
67+
buffer.to(SortedSet)
68+
}
69+
}
70+
71+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
72+
var remaining = source.readInt()
73+
target.writeInt(remaining)
74+
if (remaining >= 0) {
75+
while (remaining > 0) {
76+
aSerializer.copy(source, target)
77+
remaining -= 1
78+
}
79+
aOrderingSerializer.copy(source, target)
80+
}
81+
}
82+
83+
override def snapshotConfiguration(): TypeSerializerSnapshot[SortedSet[A]] =
84+
new SortedCollectionSerializerSnapshot[SortedSet, A, SortedSetSerializer[A]](
85+
aSerializer,
86+
classOf[SortedSetSerializer[A]],
87+
aClass,
88+
aOrderingSerializer
89+
)
90+
91+
override def hashCode(): Int = Objects.hash(aSerializer, aClass, aOrderingSerializer)
92+
93+
override def equals(obj: Any): Boolean =
94+
obj match {
95+
case other: SortedSetSerializer[_] =>
96+
aSerializer == other.aSerializer && aClass == other.aClass && aOrderingSerializer == other.aOrderingSerializer
97+
case _ => false
98+
}
99+
100+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.{VariableLengthDataType, serializers}
6+
7+
import java.lang.Long.{BYTES => LongBytes}
8+
import scala.concurrent.duration.{Duration, FiniteDuration, TimeUnit}
9+
10+
/** Serializer for [[Duration]]. Handle nullable value. */
11+
object DurationSerializer extends ImmutableSerializer[Duration] {
12+
13+
val NullDiscriminant = -1
14+
val FiniteDurationDiscriminant = 0
15+
val UndefinedDurationDiscriminant = 1
16+
val PositiveInfiniteDurationDiscriminant = 2
17+
val NegativeInfiniteDurationDiscriminant = 3
18+
19+
override def createInstance: Duration = Duration.Zero
20+
21+
override def getLength: Int = VariableLengthDataType
22+
23+
override def serialize(duration: Duration, target: DataOutputView): Unit =
24+
duration match {
25+
case null => target.writeByte(NullDiscriminant) // Cannot use NullMarker here as we are writing a byte
26+
case finiteDuration: FiniteDuration =>
27+
target.writeByte(FiniteDurationDiscriminant)
28+
FiniteDurationSerializer.serialize(finiteDuration, target)
29+
case Duration.Inf => target.writeByte(PositiveInfiniteDurationDiscriminant)
30+
case Duration.MinusInf => target.writeByte(NegativeInfiniteDurationDiscriminant)
31+
case _: Duration.Infinite => // Last to handle Duration.Undefined which doesn't equal itself
32+
target.writeByte(UndefinedDurationDiscriminant)
33+
}
34+
35+
override def deserialize(source: DataInputView): Duration =
36+
source.readByte() match {
37+
case NullDiscriminant => null
38+
case FiniteDurationDiscriminant => FiniteDurationSerializer.deserialize(source)
39+
case UndefinedDurationDiscriminant => Duration.Undefined
40+
case PositiveInfiniteDurationDiscriminant => Duration.Inf
41+
case NegativeInfiniteDurationDiscriminant => Duration.MinusInf
42+
case _ => throw new IllegalArgumentException("Unknown duration type")
43+
}
44+
45+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
46+
val discriminant = source.readByte()
47+
target.writeByte(discriminant)
48+
if (discriminant == FiniteDurationDiscriminant) FiniteDurationSerializer.copy(source, target)
49+
}
50+
51+
override def snapshotConfiguration(): TypeSerializerSnapshot[Duration] = new DurationSerializerSnapshot()
52+
53+
}
54+
55+
/** Serializer snapshot for [[Duration]]. */
56+
class DurationSerializerSnapshot extends SimpleTypeSerializerSnapshot[Duration](() => DurationSerializer)
57+
58+
/** Serializer for [[FiniteDuration]].
59+
*
60+
* Don't handle null value. If you need a nullable finite duration, use [[DurationSerializer]].
61+
*/
62+
object FiniteDurationSerializer extends ImmutableSerializer[FiniteDuration] {
63+
64+
private val timeUnitSerializer: TypeSerializer[TimeUnit] = serializers.infoToSer(serializers.timeUnitInfo)
65+
66+
override def createInstance: FiniteDuration = Duration.Zero
67+
68+
override def getLength: Int = LongBytes + timeUnitSerializer.getLength // 1 Long + TimeUnit
69+
70+
override def serialize(finiteDuration: FiniteDuration, target: DataOutputView): Unit = {
71+
target.writeLong(finiteDuration.length)
72+
timeUnitSerializer.serialize(finiteDuration.unit, target)
73+
}
74+
75+
override def deserialize(source: DataInputView): FiniteDuration = {
76+
val length = source.readLong()
77+
val unit = timeUnitSerializer.deserialize(source)
78+
new FiniteDuration(length, unit)
79+
}
80+
81+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
82+
target.writeLong(source.readLong())
83+
timeUnitSerializer.copy(source, target)
84+
}
85+
86+
override def snapshotConfiguration(): TypeSerializerSnapshot[FiniteDuration] = new FiniteDurationSerializerSnapshot()
87+
88+
}
89+
90+
/** Serializer snapshot for [[FiniteDuration]]. */
91+
class FiniteDurationSerializerSnapshot
92+
extends SimpleTypeSerializerSnapshot[FiniteDuration](() => FiniteDurationSerializer)

0 commit comments

Comments
 (0)