Skip to content

Commit c52fa2f

Browse files
feat: add serializers for mutable collections: ArrayDeque, ArrayBuffer, HashMap, Queue, HashSet (#315)
1 parent c69a832 commit c52fa2f

File tree

7 files changed

+554
-0
lines changed

7 files changed

+554
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}
6+
7+
import scala.collection.mutable
8+
9+
/** Serializer for [[mutable.ArrayDeque]]. Handle nullable value. */
10+
class MutableArrayDequeSerializer[A](child: TypeSerializer[A], clazz: Class[A])
11+
extends MutableSerializer[mutable.ArrayDeque[A]] {
12+
13+
override def copy(from: mutable.ArrayDeque[A]): mutable.ArrayDeque[A] =
14+
if (from == null) {
15+
from
16+
} else {
17+
val length = from.length
18+
val result = from.clone()
19+
if (!child.isImmutableType) {
20+
var i = 0
21+
while (i < length) {
22+
val element = result(i)
23+
if (element != null) result(i) = child.copy(element)
24+
i += 1
25+
}
26+
}
27+
result
28+
}
29+
30+
override def duplicate(): MutableArrayDequeSerializer[A] = {
31+
val duplicatedChild = child.duplicate()
32+
if (duplicatedChild.eq(child)) {
33+
this
34+
} else {
35+
new MutableArrayDequeSerializer[A](duplicatedChild, clazz)
36+
}
37+
}
38+
39+
override def createInstance(): mutable.ArrayDeque[A] = mutable.ArrayDeque.empty[A]
40+
41+
override def getLength: Int = VariableLengthDataType
42+
43+
override def serialize(records: mutable.ArrayDeque[A], target: DataOutputView): Unit =
44+
if (records == null) {
45+
target.writeInt(NullMarker)
46+
} else {
47+
target.writeInt(records.length)
48+
var i = 0
49+
while (i < records.length) { // while loop is significantly faster than foreach when working on arrays
50+
child.serialize(records(i), target)
51+
i += 1
52+
}
53+
}
54+
55+
override def deserialize(source: DataInputView): mutable.ArrayDeque[A] = {
56+
var remaining = source.readInt()
57+
if (remaining == NullMarker) {
58+
null
59+
} else {
60+
val arrayDeque = createInstance()
61+
while (remaining > 0) {
62+
arrayDeque.append(child.deserialize(source))
63+
remaining -= 1
64+
}
65+
arrayDeque
66+
}
67+
}
68+
69+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
70+
var remaining = source.readInt()
71+
target.writeInt(remaining)
72+
while (remaining > 0) {
73+
child.copy(source, target)
74+
remaining -= 1
75+
}
76+
}
77+
78+
override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.ArrayDeque[A]] =
79+
new CollectionSerializerSnapshot[mutable.ArrayDeque, A, MutableArrayDequeSerializer[A]](
80+
child,
81+
classOf[MutableArrayDequeSerializer[A]],
82+
clazz
83+
)
84+
85+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}
6+
7+
import scala.collection.mutable
8+
9+
/** Serializer for [[mutable.Buffer]]. Handle nullable value. */
10+
class MutableBufferSerializer[A](child: TypeSerializer[A], clazz: Class[A])
11+
extends MutableSerializer[mutable.Buffer[A]] {
12+
13+
override def copy(from: mutable.Buffer[A]): mutable.Buffer[A] =
14+
if (from == null) {
15+
from
16+
} else {
17+
val length = from.length
18+
val result = from.clone()
19+
if (!child.isImmutableType) {
20+
var i = 0
21+
while (i < length) {
22+
val element = result(i)
23+
if (element != null) result(i) = child.copy(element)
24+
i += 1
25+
}
26+
}
27+
result
28+
}
29+
30+
override def duplicate(): MutableBufferSerializer[A] = {
31+
val duplicatedChild = child.duplicate()
32+
if (duplicatedChild.eq(child)) {
33+
this
34+
} else {
35+
new MutableBufferSerializer[A](duplicatedChild, clazz)
36+
}
37+
}
38+
39+
override def createInstance(): mutable.Buffer[A] = mutable.Buffer.empty[A]
40+
41+
override def getLength: Int = VariableLengthDataType
42+
43+
override def serialize(records: mutable.Buffer[A], target: DataOutputView): Unit =
44+
if (records == null) {
45+
target.writeInt(NullMarker)
46+
} else {
47+
target.writeInt(records.length)
48+
records match {
49+
case _: mutable.ArrayBuffer[_] | _: mutable.ArrayDeque[_] =>
50+
var i = 0
51+
while (i < records.length) { // while loop is significantly faster than foreach when working on arrays
52+
child.serialize(records(i), target)
53+
i += 1
54+
}
55+
case _ => records.foreach(element => child.serialize(element, target))
56+
}
57+
}
58+
59+
override def deserialize(source: DataInputView): mutable.Buffer[A] = {
60+
var remaining = source.readInt()
61+
if (remaining == NullMarker) {
62+
null
63+
} else {
64+
val buffer = createInstance()
65+
while (remaining > 0) {
66+
buffer.append(child.deserialize(source))
67+
remaining -= 1
68+
}
69+
buffer
70+
}
71+
}
72+
73+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
74+
var remaining = source.readInt()
75+
target.writeInt(remaining)
76+
while (remaining > 0) {
77+
child.copy(source, target)
78+
remaining -= 1
79+
}
80+
}
81+
82+
override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.Buffer[A]] =
83+
new CollectionSerializerSnapshot[mutable.Buffer, A, MutableBufferSerializer[A]](
84+
child,
85+
classOf[MutableBufferSerializer[A]],
86+
clazz
87+
)
88+
89+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}
6+
7+
import scala.collection.mutable
8+
9+
/** Serializer for [[mutable.Map]]. Handle nullable value. */
10+
class MutableMapSerializer[K, V](
11+
keySerializer: TypeSerializer[K],
12+
valueSerializer: TypeSerializer[V]
13+
) extends MutableSerializer[mutable.Map[K, V]] {
14+
15+
override def copy(from: mutable.Map[K, V]): mutable.Map[K, V] =
16+
if (from == null) {
17+
from
18+
} else {
19+
from.map(element => (keySerializer.copy(element._1), valueSerializer.copy(element._2)))
20+
}
21+
22+
override def duplicate(): MutableMapSerializer[K, V] = {
23+
val duplicatedKs = keySerializer.duplicate()
24+
val duplicatedVs = valueSerializer.duplicate()
25+
if (duplicatedKs.eq(keySerializer) && duplicatedVs.eq(valueSerializer)) {
26+
this
27+
} else {
28+
new MutableMapSerializer(duplicatedKs, duplicatedVs)
29+
}
30+
}
31+
32+
override def createInstance(): mutable.Map[K, V] = mutable.Map.empty[K, V]
33+
34+
override def getLength: Int = VariableLengthDataType
35+
36+
override def serialize(records: mutable.Map[K, V], target: DataOutputView): Unit =
37+
if (records == null) {
38+
target.writeInt(NullMarker)
39+
} else {
40+
target.writeInt(records.size)
41+
records.foreach(element => {
42+
keySerializer.serialize(element._1, target)
43+
valueSerializer.serialize(element._2, target)
44+
})
45+
}
46+
47+
override def deserialize(source: DataInputView): mutable.Map[K, V] = {
48+
var remaining = source.readInt() // The valid range of actual data is >= 0. Only markers are negative
49+
if (remaining == NullMarker) {
50+
null
51+
} else {
52+
val map = createInstance()
53+
while (remaining > 0) {
54+
val key = keySerializer.deserialize(source)
55+
val value = valueSerializer.deserialize(source)
56+
map.put(key, value)
57+
remaining -= 1
58+
}
59+
map
60+
}
61+
}
62+
63+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
64+
var remaining = source.readInt()
65+
target.writeInt(remaining)
66+
while (remaining > 0) {
67+
keySerializer.copy(source, target)
68+
valueSerializer.copy(source, target)
69+
remaining -= 1
70+
}
71+
}
72+
73+
override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.Map[K, V]] =
74+
new MutableMapSerializerSnapshot(keySerializer, valueSerializer)
75+
76+
}
77+
78+
class MutableMapSerializerSnapshot[K, V](
79+
private var keySerializer: TypeSerializer[K],
80+
private var valueSerializer: TypeSerializer[V]
81+
) extends TypeSerializerSnapshot[mutable.Map[K, V]] {
82+
83+
def this() = this(null, null)
84+
85+
override def getCurrentVersion: Int = 1
86+
87+
override def writeSnapshot(out: DataOutputView): Unit = {
88+
TypeSerializerSnapshot.writeVersionedSnapshot(out, keySerializer.snapshotConfiguration())
89+
TypeSerializerSnapshot.writeVersionedSnapshot(out, valueSerializer.snapshotConfiguration())
90+
}
91+
92+
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
93+
keySerializer = TypeSerializerSnapshot.readVersionedSnapshot[K](in, userCodeClassLoader).restoreSerializer()
94+
valueSerializer = TypeSerializerSnapshot.readVersionedSnapshot[V](in, userCodeClassLoader).restoreSerializer()
95+
}
96+
97+
override def resolveSchemaCompatibility(
98+
oldSerializerSnapshot: TypeSerializerSnapshot[mutable.Map[K, V]]
99+
): TypeSerializerSchemaCompatibility[mutable.Map[K, V]] = {
100+
TypeSerializerSchemaCompatibility.compatibleAsIs()
101+
}
102+
103+
override def restoreSerializer(): TypeSerializer[mutable.Map[K, V]] =
104+
new MutableMapSerializer(keySerializer, valueSerializer)
105+
106+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}
6+
7+
import scala.collection.mutable
8+
9+
/** Serializer for [[mutable.Queue]]. Handle nullable value. */
10+
class MutableQueueSerializer[A](child: TypeSerializer[A], clazz: Class[A]) extends MutableSerializer[mutable.Queue[A]] {
11+
12+
override def copy(from: mutable.Queue[A]): mutable.Queue[A] =
13+
if (from == null) {
14+
from
15+
} else {
16+
val length = from.length
17+
val result = from.clone()
18+
if (!child.isImmutableType) {
19+
var i = 0
20+
while (i < length) {
21+
val element = result(i)
22+
if (element != null) result(i) = child.copy(element)
23+
i += 1
24+
}
25+
}
26+
result
27+
}
28+
29+
override def duplicate(): MutableQueueSerializer[A] = {
30+
val duplicatedChild = child.duplicate()
31+
if (duplicatedChild.eq(child)) {
32+
this
33+
} else {
34+
new MutableQueueSerializer[A](duplicatedChild, clazz)
35+
}
36+
}
37+
38+
override def createInstance(): mutable.Queue[A] = mutable.Queue.empty[A]
39+
40+
override def getLength: Int = VariableLengthDataType
41+
42+
override def serialize(records: mutable.Queue[A], target: DataOutputView): Unit =
43+
if (records == null) {
44+
target.writeInt(NullMarker)
45+
} else {
46+
target.writeInt(records.length)
47+
var i = 0
48+
while (i < records.length) { // while loop is significantly faster than foreach when working on arrays
49+
child.serialize(records(i), target)
50+
i += 1
51+
}
52+
}
53+
54+
override def deserialize(source: DataInputView): mutable.Queue[A] = {
55+
var remaining = source.readInt()
56+
if (remaining == NullMarker) {
57+
null
58+
} else {
59+
val queue = createInstance()
60+
while (remaining > 0) {
61+
val a = child.deserialize(source)
62+
queue.append(a)
63+
remaining -= 1
64+
}
65+
queue
66+
}
67+
}
68+
69+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
70+
var remaining = source.readInt()
71+
target.writeInt(remaining)
72+
while (remaining > 0) {
73+
child.copy(source, target)
74+
remaining -= 1
75+
}
76+
}
77+
78+
override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.Queue[A]] =
79+
new CollectionSerializerSnapshot[mutable.Queue, A, MutableQueueSerializer[A]](
80+
child,
81+
classOf[MutableQueueSerializer[A]],
82+
clazz
83+
)
84+
85+
}

0 commit comments

Comments
 (0)