@@ -4,67 +4,58 @@ import org.apache.spark.sql.Row
44import org .apache .spark .sql .catalyst .InternalRow
55import org .apache .spark .sql .catalyst .encoders .{ExpressionEncoder , RowEncoder }
66
7- import scala .collection .concurrent .TrieMap
8-
7+ import scala .collection .mutable
98import scala .reflect .runtime .universe .TypeTag
109
11- object SerializersCache { self =>
10+ object SerializersCache {
1211 /**
13- * The point of {Serizalizer | Deserializer} wrappers to make application atomic.
14- * If that is the chain of encoders, i.e. T <=> InternalRow <=> Row the whole chain applciation should be atomic.
12+ * Spark partitions are executed on a blocking thread pool.
13+ * We can keep the cache of (De)Serializers (every serializer instance creation is pretty expensive),
14+ * but the cache should be local per thread.
15+ *
16+ * When used from multiple threads (De)Serializers tend to corrupt data and / or fail at runtime.
17+ * The alternative can be to use global locks or to use a separate executor per each (De)Serializer.
1518 */
16- case class DeserializerSynchronized [T ](underlying : ExpressionEncoder .Deserializer [T ]) {
17- def apply (i : InternalRow ): T = self.synchronized (underlying.apply(i))
18- }
19-
20- case class SerializerSynchronized [T ](underlying : ExpressionEncoder .Serializer [T ]) {
21- // copy should happen within the same lock, otherwise we're risking to loose the InternalRow
22- def apply (t : T ): InternalRow = self.synchronized (underlying.apply(t).copy())
19+ private class ThreadLocalHashMap [K , V ] extends ThreadLocal [mutable.HashMap [K , V ]] {
20+ override def initialValue (): mutable.HashMap [K , V ] = mutable.HashMap .empty
2321 }
24-
25- case class DeserializerRowSynchronized [T ](underlying : Row => T ) extends AnyVal {
26- def apply (i : Row ): T = self.synchronized (underlying(i))
22+ private object ThreadLocalHashMap {
23+ def empty [K , V ]: ThreadLocalHashMap [K , V ] = new ThreadLocalHashMap
2724 }
2825
29- case class SerializerRowSynchronized [T ](underlying : T => Row ) extends AnyVal {
30- def apply (i : T ): Row = self.synchronized (underlying(i))
26+ /** SerializerSafe ensures that all Serializers from the pool call copy after application. */
27+ case class SerializerSafe [T ](underlying : ExpressionEncoder .Serializer [T ]) {
28+ def apply (t : T ): InternalRow = underlying.apply(t).copy()
3129 }
3230
33- private val cacheSerializer : TrieMap [TypeTag [_], SerializerSynchronized [_]] = TrieMap .empty
34- private val cacheSerializerRow : TrieMap [TypeTag [_], SerializerSynchronized [Row ]] = TrieMap .empty
35- private val cacheDeserializer : TrieMap [TypeTag [_], DeserializerSynchronized [_]] = TrieMap .empty
36- private val cacheDeserializerRow : TrieMap [TypeTag [_], DeserializerSynchronized [Row ]] = TrieMap .empty
37-
38- /** Serializer is threadsafe.*/
39- def serializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): SerializerSynchronized [T ] =
40- cacheSerializer
41- .getOrElseUpdate(tag, SerializerSynchronized (encoder.createSerializer()))
42- .asInstanceOf [SerializerSynchronized [T ]]
43-
44- def rowSerializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): SerializerSynchronized [Row ] =
45- cacheSerializerRow.getOrElseUpdate(tag, SerializerSynchronized (RowEncoder (encoder.schema).createSerializer()))
31+ // T => InternalRow
32+ private val cacheSerializer : ThreadLocalHashMap [TypeTag [_], SerializerSafe [_]] = ThreadLocalHashMap .empty
33+ // Row with Schema T => InternalRow
34+ private val cacheSerializerRow : ThreadLocalHashMap [TypeTag [_], SerializerSafe [Row ]] = ThreadLocalHashMap .empty
35+ // InternalRow => T
36+ private val cacheDeserializer : ThreadLocalHashMap [TypeTag [_], ExpressionEncoder .Deserializer [_]] = ThreadLocalHashMap .empty
37+ // InternalRow => Row with Schema T
38+ private val cacheDeserializerRow : ThreadLocalHashMap [TypeTag [_], ExpressionEncoder .Deserializer [Row ]] = ThreadLocalHashMap .empty
4639
47- /** Both Serializer and Deserializer are not thread safe, and expensive to derive.
48- * Per partition instance would give us no performance regressions,
49- * however would require a significant DynamicExtractors refactor. */
50- def deserializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): DeserializerSynchronized [T ] =
51- cacheDeserializer
52- .getOrElseUpdate(tag, DeserializerSynchronized (encoder.resolveAndBind().createDeserializer()))
53- .asInstanceOf [DeserializerSynchronized [T ]]
40+ def serializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): SerializerSafe [T ] =
41+ cacheSerializer.get.getOrElseUpdate(tag, SerializerSafe (encoder.createSerializer())).asInstanceOf [SerializerSafe [T ]]
5442
43+ def rowSerializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): SerializerSafe [Row ] =
44+ cacheSerializerRow.get.getOrElseUpdate(tag, SerializerSafe (RowEncoder (encoder.schema).createSerializer()))
5545
56- def rowDeserializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): DeserializerSynchronized [Row ] =
57- cacheDeserializerRow
58- .getOrElseUpdate(tag, DeserializerSynchronized (RowEncoder (encoder.schema).resolveAndBind().createDeserializer()))
46+ def deserializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): ExpressionEncoder .Deserializer [T ] =
47+ cacheDeserializer.get.getOrElseUpdate(tag, encoder.resolveAndBind().createDeserializer()).asInstanceOf [ExpressionEncoder .Deserializer [T ]]
5948
49+ def rowDeserializer [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): ExpressionEncoder .Deserializer [Row ] =
50+ cacheDeserializerRow.get.getOrElseUpdate(tag, RowEncoder (encoder.schema).resolveAndBind().createDeserializer())
6051
6152 /**
6253 * https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-RowEncoder.html
6354 * https://github.com/apache/spark/blob/93cec49212fe82816fcadf69f429cebaec60e058/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L75-L86
6455 */
65- def rowDeserialize [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): DeserializerRowSynchronized [ T ] =
66- DeserializerRowSynchronized { row => deserializer[T ](tag, encoder)(rowSerializer[T ](tag, encoder)(row)) }
56+ def rowDeserialize [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): Row => T =
57+ { row => deserializer[T ](tag, encoder)(rowSerializer[T ](tag, encoder)(row)) }
6758
68- def rowSerialize [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): SerializerRowSynchronized [ T ] =
69- SerializerRowSynchronized [ T ] ( { t => rowDeserializer[T ](tag, encoder)(serializer[T ](tag, encoder)(t)) })
59+ def rowSerialize [T ](implicit tag : TypeTag [T ], encoder : ExpressionEncoder [T ]): T => Row =
60+ { t => rowDeserializer[T ](tag, encoder)(serializer[T ](tag, encoder)(t)) }
7061}
0 commit comments