Skip to content

Commit 61f0ca4

Browse files
committed
[SPARK-24699][SS] Make watermarks work with Trigger.Once by saving updated watermark to commit log
## What changes were proposed in this pull request? Streaming queries with watermarks do not work with Trigger.Once because of the following. - Watermark is updated in the driver memory after a batch completes, but it is persisted to checkpoint (in the offset log) only when the next batch is planned - In trigger.once, the query terminated as soon as one batch has completed. Hence, the updated watermark is never persisted anywhere. The simple solution is to persist the updated watermark value in the commit log when a batch is marked as completed. Then the next batch, in the next trigger.once run can pick it up from the commit log. ## How was this patch tested? new unit tests Co-authored-by: Tathagata Das <tathagata.das1565gmail.com> Co-authored-by: c-horn <chorn4033gmail.com> Author: Tathagata Das <[email protected]> Closes apache#21746 from tdas/SPARK-24699.
1 parent 2edf17e commit 61f0ca4

File tree

20 files changed

+177
-42
lines changed

20 files changed

+177
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets._
2222

2323
import scala.io.{Source => IOSource}
2424

25+
import org.json4s.NoTypeHints
26+
import org.json4s.jackson.Serialization
27+
2528
import org.apache.spark.sql.SparkSession
2629

2730
/**
@@ -43,36 +46,28 @@ import org.apache.spark.sql.SparkSession
4346
* line 2: metadata (optional json string)
4447
*/
4548
class CommitLog(sparkSession: SparkSession, path: String)
46-
extends HDFSMetadataLog[String](sparkSession, path) {
49+
extends HDFSMetadataLog[CommitMetadata](sparkSession, path) {
4750

4851
import CommitLog._
4952

50-
def add(batchId: Long): Unit = {
51-
super.add(batchId, EMPTY_JSON)
52-
}
53-
54-
override def add(batchId: Long, metadata: String): Boolean = {
55-
throw new UnsupportedOperationException(
56-
"CommitLog does not take any metadata, use 'add(batchId)' instead")
57-
}
58-
59-
override protected def deserialize(in: InputStream): String = {
53+
override protected def deserialize(in: InputStream): CommitMetadata = {
6054
// called inside a try-finally where the underlying stream is closed in the caller
6155
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
6256
if (!lines.hasNext) {
6357
throw new IllegalStateException("Incomplete log file in the offset commit log")
6458
}
6559
parseVersion(lines.next.trim, VERSION)
66-
EMPTY_JSON
60+
val metadataJson = if (lines.hasNext) lines.next else EMPTY_JSON
61+
CommitMetadata(metadataJson)
6762
}
6863

69-
override protected def serialize(metadata: String, out: OutputStream): Unit = {
64+
override protected def serialize(metadata: CommitMetadata, out: OutputStream): Unit = {
7065
// called inside a try-finally where the underlying stream is closed in the caller
7166
out.write(s"v${VERSION}".getBytes(UTF_8))
7267
out.write('\n')
7368

7469
// write metadata
75-
out.write(EMPTY_JSON.getBytes(UTF_8))
70+
out.write(metadata.json.getBytes(UTF_8))
7671
}
7772
}
7873

@@ -81,3 +76,13 @@ object CommitLog {
8176
private val EMPTY_JSON = "{}"
8277
}
8378

79+
80+
case class CommitMetadata(nextBatchWatermarkMs: Long = 0) {
81+
def json: String = Serialization.write(this)(CommitMetadata.format)
82+
}
83+
84+
object CommitMetadata {
85+
implicit val format = Serialization.formats(NoTypeHints)
86+
87+
def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json)
88+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class MicroBatchExecution(
268268
* latest batch id in the offset log, then we can safely move to the next batch
269269
* i.e., committedBatchId + 1 */
270270
commitLog.getLatest() match {
271-
case Some((latestCommittedBatchId, _)) =>
271+
case Some((latestCommittedBatchId, commitMetadata)) =>
272272
if (latestBatchId == latestCommittedBatchId) {
273273
/* The last batch was successfully committed, so we can safely process a
274274
* new next batch but first:
@@ -286,7 +286,8 @@ class MicroBatchExecution(
286286
currentBatchId = latestCommittedBatchId + 1
287287
isCurrentBatchConstructed = false
288288
committedOffsets ++= availableOffsets
289-
// Construct a new batch be recomputing availableOffsets
289+
watermarkTracker.setWatermark(
290+
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
290291
} else if (latestCommittedBatchId < latestBatchId - 1) {
291292
logWarning(s"Batch completion log latest batch id is " +
292293
s"${latestCommittedBatchId}, which is not trailing " +
@@ -536,11 +537,11 @@ class MicroBatchExecution(
536537
}
537538

538539
withProgressLocked {
539-
commitLog.add(currentBatchId)
540+
watermarkTracker.updateWatermark(lastExecution.executedPlan)
541+
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
540542
committedOffsets ++= availableOffsets
541543
awaitProgressLockCondition.signalAll()
542544
}
543-
watermarkTracker.updateWatermark(lastExecution.executedPlan)
544545
logDebug(s"Completed batch ${currentBatchId}")
545546
}
546547

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ class ContinuousExecution(
314314
// Record offsets before updating `committedOffsets`
315315
recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
316316
if (queryExecutionThread.isAlive) {
317-
commitLog.add(epoch)
317+
commitLog.add(epoch, CommitMetadata())
318318
val offset =
319319
continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
320320
committedOffsets ++= Seq(continuousSources(0) -> offset)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"id":"73f7f943-0a08-4ffb-a504-9fa88ff7612a"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1531991874513,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
3+
0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":5000,"batchTimestampMs":1531991878604,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
3+
1

0 commit comments

Comments
 (0)