Skip to content

Commit 2c54aae

Browse files
liutang123gatorsmile
authored andcommitted
[SPARK-24809][SQL] Serializing LongToUnsafeRowMap in executor may result in data error
When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk. ## What changes were proposed in this pull request? Restore cursor value when deserializing. Author: liulijia <[email protected]> Closes apache#21772 from liutang123/SPARK-24809.
1 parent 8fe5d2c commit 2c54aae

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
772772
array = readLongArray(readBuffer, length)
773773
val pageLength = readLong().toInt
774774
page = readLongArray(readBuffer, pageLength)
775+
// Restore cursor variable to make this map able to be serialized again on executors.
776+
cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET
775777
}
776778

777779
override def readExternal(in: ObjectInput): Unit = {

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,35 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
278278
map.free()
279279
}
280280

281+
test("SPARK-24809: Serializing LongToUnsafeRowMap in executor may result in data error") {
282+
val unsafeProj = UnsafeProjection.create(Array[DataType](LongType))
283+
val originalMap = new LongToUnsafeRowMap(mm, 1)
284+
285+
val key1 = 1L
286+
val value1 = 4852306286022334418L
287+
288+
val key2 = 2L
289+
val value2 = 8813607448788216010L
290+
291+
originalMap.append(key1, unsafeProj(InternalRow(value1)))
292+
originalMap.append(key2, unsafeProj(InternalRow(value2)))
293+
originalMap.optimize()
294+
295+
val ser = sparkContext.env.serializer.newInstance()
296+
// Simulate serialize/deserialize twice on driver and executor
297+
val firstTimeSerialized = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap))
298+
val secondTimeSerialized =
299+
ser.deserialize[LongToUnsafeRowMap](ser.serialize(firstTimeSerialized))
300+
301+
val resultRow = new UnsafeRow(1)
302+
assert(secondTimeSerialized.getValue(key1, resultRow).getLong(0) === value1)
303+
assert(secondTimeSerialized.getValue(key2, resultRow).getLong(0) === value2)
304+
305+
originalMap.free()
306+
firstTimeSerialized.free()
307+
secondTimeSerialized.free()
308+
}
309+
281310
test("Spark-14521") {
282311
val ser = new KryoSerializer(
283312
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()

0 commit comments

Comments
 (0)