Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 71c2b81

Browse files
jose-torreszsxwing
authored andcommitted
[SPARK-22230] Swap per-row order in state store restore.
## What changes were proposed in this pull request? In state store restore, for each row, put the saved state before the row in the iterator instead of after. This fixes an issue where agg(last('attr)) will forever return the last value of 'attr from the first microbatch. ## How was this patch tested? new unit test Author: Jose Torres <[email protected]> Closes apache#19461 from joseph-torres/SPARK-22230.
1 parent 155ab63 commit 71c2b81

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ case class StateStoreRestoreExec(
225225
val key = getKey(row)
226226
val savedState = store.get(key)
227227
numOutputRows += 1
228-
row +: Option(savedState).toSeq
228+
Option(savedState).toSeq :+ row
229229
}
230230
}
231231
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,22 @@ class StreamingAggregationSuite extends StateStoreMetricsTest
520520
}
521521
}
522522

523+
test("SPARK-22230: last should change with new batches") {
524+
val input = MemoryStream[Int]
525+
526+
val aggregated = input.toDF().agg(last('value))
527+
testStream(aggregated, OutputMode.Complete())(
528+
AddData(input, 1, 2, 3),
529+
CheckLastBatch(3),
530+
AddData(input, 4, 5, 6),
531+
CheckLastBatch(6),
532+
AddData(input),
533+
CheckLastBatch(6),
534+
AddData(input, 0),
535+
CheckLastBatch(0)
536+
)
537+
}
538+
523539
/** Add blocks of data to the `BlockRDDBackedSource`. */
524540
case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) extends AddData {
525541
override def addData(query: Option[StreamExecution]): (Source, Offset) = {

0 commit comments

Comments
 (0)