Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 0bad10d

Browse files
jose-torrestdas
authored andcommitted
[SPARK-22017] Take minimum of all watermark execs in StreamExecution.
## What changes were proposed in this pull request? Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily. ## How was this patch tested? new unit test Author: Jose Torres <[email protected]> Closes apache#19239 from joseph-torres/SPARK-22017.
1 parent c7307ac commit 0bad10d

File tree

3 files changed

+113
-6
lines changed

3 files changed

+113
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class IncrementalExecution(
3939
val checkpointLocation: String,
4040
val runId: UUID,
4141
val currentBatchId: Long,
42-
offsetSeqMetadata: OffsetSeqMetadata)
42+
val offsetSeqMetadata: OffsetSeqMetadata)
4343
extends QueryExecution(sparkSession, logicalPlan) with Logging {
4444

4545
// Modified planner with stateful operations.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ class StreamExecution(
130130
protected var offsetSeqMetadata = OffsetSeqMetadata(
131131
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
132132

133+
/**
134+
* A map of current watermarks, keyed by the position of the watermark operator in the
135+
* physical plan.
136+
*
137+
* This state is 'soft state', which does not affect the correctness and semantics of watermarks
138+
* and is not persisted across query restarts.
139+
* The fault-tolerant watermark state is in offsetSeqMetadata.
140+
*/
141+
protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap()
142+
133143
override val id: UUID = UUID.fromString(streamMetadata.id)
134144

135145
override val runId: UUID = UUID.randomUUID
@@ -560,13 +570,32 @@ class StreamExecution(
560570
}
561571
if (hasNewData) {
562572
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
563-
// Update the eventTime watermark if we find one in the plan.
573+
// Update the eventTime watermarks if we find any in the plan.
564574
if (lastExecution != null) {
565575
lastExecution.executedPlan.collect {
566-
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
567-
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
568-
e.eventTimeStats.value.max - e.delayMs
569-
}.headOption.foreach { newWatermarkMs =>
576+
case e: EventTimeWatermarkExec => e
577+
}.zipWithIndex.foreach {
578+
case (e, index) if e.eventTimeStats.value.count > 0 =>
579+
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
580+
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
581+
val prevWatermarkMs = watermarkMsMap.get(index)
582+
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
583+
watermarkMsMap.put(index, newWatermarkMs)
584+
}
585+
586+
// Populate 0 if we haven't seen any data yet for this watermark node.
587+
case (_, index) =>
588+
if (!watermarkMsMap.isDefinedAt(index)) {
589+
watermarkMsMap.put(index, 0)
590+
}
591+
}
592+
593+
// Update the global watermark to the minimum of all watermark nodes.
594+
// This is the safest option, because only the global watermark is fault-tolerant. Making
595+
// it the minimum of all individual watermarks guarantees it will never advance past where
596+
// any individual watermark operator would be if it were in a plan by itself.
597+
if(!watermarkMsMap.isEmpty) {
598+
val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
570599
if (newWatermarkMs > batchWatermarkMs) {
571600
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
572601
batchWatermarkMs = newWatermarkMs

sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,84 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
300300
)
301301
}
302302

303+
test("watermark with 2 streams") {
304+
import org.apache.spark.sql.functions.sum
305+
val first = MemoryStream[Int]
306+
307+
val firstDf = first.toDF()
308+
.withColumn("eventTime", $"value".cast("timestamp"))
309+
.withWatermark("eventTime", "10 seconds")
310+
.select('value)
311+
312+
val second = MemoryStream[Int]
313+
314+
val secondDf = second.toDF()
315+
.withColumn("eventTime", $"value".cast("timestamp"))
316+
.withWatermark("eventTime", "5 seconds")
317+
.select('value)
318+
319+
withTempDir { checkpointDir =>
320+
val unionWriter = firstDf.union(secondDf).agg(sum('value))
321+
.writeStream
322+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
323+
.format("memory")
324+
.outputMode("complete")
325+
.queryName("test")
326+
327+
val union = unionWriter.start()
328+
329+
def getWatermarkAfterData(
330+
firstData: Seq[Int] = Seq.empty,
331+
secondData: Seq[Int] = Seq.empty,
332+
query: StreamingQuery = union): Long = {
333+
if (firstData.nonEmpty) first.addData(firstData)
334+
if (secondData.nonEmpty) second.addData(secondData)
335+
query.processAllAvailable()
336+
// add a dummy batch so lastExecution has the new watermark
337+
first.addData(0)
338+
query.processAllAvailable()
339+
// get last watermark
340+
val lastExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
341+
lastExecution.offsetSeqMetadata.batchWatermarkMs
342+
}
343+
344+
// Global watermark starts at 0 until we get data from both sides
345+
assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
346+
assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
347+
// Global watermark stays at left watermark 1 when right watermark moves to 2
348+
assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
349+
// Global watermark switches to right side value 2 when left watermark goes higher
350+
assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
351+
// Global watermark goes back to left
352+
assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
353+
// Global watermark stays on left as long as it's below right
354+
assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
355+
assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
356+
// Global watermark switches back to right again
357+
assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)
358+
359+
// Global watermark is updated correctly with simultaneous data from both sides
360+
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000)
361+
assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000)
362+
assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000)
363+
364+
// Global watermark doesn't decrement with simultaneous data
365+
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000)
366+
assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000)
367+
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000)
368+
369+
// Global watermark recovers after restart, but left side watermark ahead of it does not.
370+
assert(getWatermarkAfterData(firstData = Seq(200), secondData = Seq(190)) == 185000)
371+
union.stop()
372+
val union2 = unionWriter.start()
373+
assert(getWatermarkAfterData(query = union2) == 185000)
374+
// Even though the left side was ahead of 185000 in the last execution, the watermark won't
375+
// increment until it gets past it in this execution.
376+
assert(getWatermarkAfterData(secondData = Seq(200), query = union2) == 185000)
377+
assert(getWatermarkAfterData(firstData = Seq(200), query = union2) == 190000)
378+
}
379+
}
380+
303381
test("complete mode") {
304382
val inputData = MemoryStream[Int]
305383

0 commit comments

Comments
 (0)