Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 63693c1

Browse files
etcondiezsxwing
authored andcommitted
[SPARK-18790][SS] Keep a general offset history of stream batches
## What changes were proposed in this pull request? Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches: the offsets that are present in each batch versions of the state store the files lists stored for the FileStreamSource the metadata log stored by the FileStreamSink marmbrus zsxwing ## How was this patch tested? The following tests were added. ### StreamExecution offset metadata Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain ### CompactibleFileStreamLog Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <[email protected]> Closes apache#16219 from tcondie/offset_hist. (cherry picked from commit 83a4289) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent d5f1416 commit 63693c1

File tree

9 files changed

+170
-64
lines changed

9 files changed

+170
-64
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
5252
/** Needed to serialize type T into JSON when using Jackson */
5353
private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
5454

55+
protected val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
56+
5557
/**
5658
* If we delete the old files after compaction at once, there is a race condition in S3: other
5759
* processes may see the old files are deleted but still cannot see the compaction file using
@@ -152,11 +154,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
152154
}
153155

154156
override def add(batchId: Long, logs: Array[T]): Boolean = {
155-
if (isCompactionBatch(batchId, compactInterval)) {
156-
compact(batchId, logs)
157-
} else {
158-
super.add(batchId, logs)
157+
val batchAdded =
158+
if (isCompactionBatch(batchId, compactInterval)) {
159+
compact(batchId, logs)
160+
} else {
161+
super.add(batchId, logs)
162+
}
163+
if (batchAdded && isDeletingExpiredLog) {
164+
deleteExpiredLog(batchId)
159165
}
166+
batchAdded
160167
}
161168

162169
/**
@@ -167,9 +174,6 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
167174
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
168175
val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
169176
if (super.add(batchId, compactLogs(allLogs).toArray)) {
170-
if (isDeletingExpiredLog) {
171-
deleteExpiredLog(batchId)
172-
}
173177
true
174178
} else {
175179
// Return false as there is another writer.
@@ -210,26 +214,41 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
210214
}
211215

212216
/**
213-
* Since all logs before `compactionBatchId` are compacted and written into the
214-
* `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
215-
* S3, the compaction file may not be seen by other processes at once. So we only delete files
216-
* created `fileCleanupDelayMs` milliseconds ago.
217+
* Delete expired log entries that proceed the currentBatchId and retain
218+
* sufficient minimum number of batches (given by minBatchsToRetain). This
219+
* equates to retaining the earliest compaction log that proceeds
220+
* batch id position currentBatchId + 1 - minBatchesToRetain. All log entries
221+
* prior to the earliest compaction log proceeding that position will be removed.
222+
* However, due to the eventual consistency of S3, the compaction file may not
223+
* be seen by other processes at once. So we only delete files created
224+
* `fileCleanupDelayMs` milliseconds ago.
217225
*/
218-
private def deleteExpiredLog(compactionBatchId: Long): Unit = {
219-
val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
220-
fileManager.list(metadataPath, new PathFilter {
221-
override def accept(path: Path): Boolean = {
222-
try {
223-
val batchId = getBatchIdFromFileName(path.getName)
224-
batchId < compactionBatchId
225-
} catch {
226-
case _: NumberFormatException =>
227-
false
226+
private def deleteExpiredLog(currentBatchId: Long): Unit = {
227+
if (compactInterval <= currentBatchId + 1 - minBatchesToRetain) {
228+
// Find the first compaction batch id that maintains minBatchesToRetain
229+
val minBatchId = currentBatchId + 1 - minBatchesToRetain
230+
val minCompactionBatchId = minBatchId - (minBatchId % compactInterval) - 1
231+
assert(isCompactionBatch(minCompactionBatchId, compactInterval),
232+
s"$minCompactionBatchId is not a compaction batch")
233+
234+
logInfo(s"Current compact batch id = $currentBatchId " +
235+
s"min compaction batch id to delete = $minCompactionBatchId")
236+
237+
val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
238+
fileManager.list(metadataPath, new PathFilter {
239+
override def accept(path: Path): Boolean = {
240+
try {
241+
val batchId = getBatchIdFromFileName(path.getName)
242+
batchId < minCompactionBatchId
243+
} catch {
244+
case _: NumberFormatException =>
245+
false
246+
}
247+
}
248+
}).foreach { f =>
249+
if (f.getModificationTime <= expiredTime) {
250+
fileManager.delete(f.getPath)
228251
}
229-
}
230-
}).foreach { f =>
231-
if (f.getModificationTime <= expiredTime) {
232-
fileManager.delete(f.getPath)
233252
}
234253
}
235254
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ class StreamExecution(
5858

5959
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
6060

61+
private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
62+
require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
63+
6164
/**
6265
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
6366
*/
@@ -400,10 +403,11 @@ class StreamExecution(
400403
}
401404
}
402405

403-
// Now that we have logged the new batch, no further processing will happen for
404-
// the batch before the previous batch, and it is safe to discard the old metadata.
406+
// It is now safe to discard the metadata beyond the minimum number to retain.
405407
// Note that purge is exclusive, i.e. it purges everything before the target ID.
406-
offsetLog.purge(currentBatchId - 1)
408+
if (minBatchesToRetain < currentBatchId) {
409+
offsetLog.purge(currentBatchId - minBatchesToRetain)
410+
}
407411
}
408412
} else {
409413
awaitBatchLock.lock()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,6 @@ private[state] class HDFSBackedStateStoreProvider(
303303
val mapFromFile = readSnapshotFile(version).getOrElse {
304304
val prevMap = loadMap(version - 1)
305305
val newMap = new MapType(prevMap)
306-
newMap.putAll(prevMap)
307306
updateFromDeltaFile(version, newMap)
308307
newMap
309308
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
2626

2727
val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot
2828

29-
val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
29+
val minVersionsToRetain = conf.minBatchesToRetain
3030
}
3131

3232
private[streaming] object StateStoreConf {
3333
val empty = new StateStoreConf()
34+
35+
def apply(conf: SQLConf): StateStoreConf = new StateStoreConf(conf)
3436
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -472,18 +472,17 @@ object SQLConf {
472472
.intConf
473473
.createWithDefault(10)
474474

475-
val STATE_STORE_MIN_VERSIONS_TO_RETAIN =
476-
SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain")
477-
.internal()
478-
.doc("Minimum number of versions of a state store's data to retain after cleaning.")
479-
.intConf
480-
.createWithDefault(2)
481-
482475
val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
483476
.doc("The default location for storing checkpoint data for streaming queries.")
484477
.stringConf
485478
.createOptional
486479

480+
val MIN_BATCHES_TO_RETAIN = SQLConfigBuilder("spark.sql.streaming.minBatchesToRetain")
481+
.internal()
482+
.doc("The minimum number of batches that must be retained and made recoverable.")
483+
.intConf
484+
.createWithDefault(100)
485+
487486
val UNSUPPORTED_OPERATION_CHECK_ENABLED =
488487
SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
489488
.internal()
@@ -642,8 +641,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
642641

643642
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
644643

645-
def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
646-
647644
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
648645

649646
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
@@ -697,6 +694,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
697694
def minNumPostShufflePartitions: Int =
698695
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
699696

697+
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
698+
700699
def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
701700

702701
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
104104
withFakeCompactibleFileStreamLog(
105105
fileCleanupDelayMs = Long.MaxValue,
106106
defaultCompactInterval = 3,
107+
defaultMinBatchesToRetain = 1,
107108
compactibleLog => {
108109
assert("0" === compactibleLog.batchIdToPath(0).getName)
109110
assert("1" === compactibleLog.batchIdToPath(1).getName)
@@ -118,6 +119,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
118119
withFakeCompactibleFileStreamLog(
119120
fileCleanupDelayMs = Long.MaxValue,
120121
defaultCompactInterval = 3,
122+
defaultMinBatchesToRetain = 1,
121123
compactibleLog => {
122124
val logs = Array("entry_1", "entry_2", "entry_3")
123125
val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
@@ -138,6 +140,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
138140
withFakeCompactibleFileStreamLog(
139141
fileCleanupDelayMs = Long.MaxValue,
140142
defaultCompactInterval = 3,
143+
defaultMinBatchesToRetain = 1,
141144
compactibleLog => {
142145
val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
143146
|"entry_1"
@@ -157,6 +160,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
157160
withFakeCompactibleFileStreamLog(
158161
fileCleanupDelayMs = Long.MaxValue,
159162
defaultCompactInterval = 3,
163+
defaultMinBatchesToRetain = 1,
160164
compactibleLog => {
161165
for (batchId <- 0 to 10) {
162166
compactibleLog.add(batchId, Array("some_path_" + batchId))
@@ -175,6 +179,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
175179
withFakeCompactibleFileStreamLog(
176180
fileCleanupDelayMs = 0,
177181
defaultCompactInterval = 3,
182+
defaultMinBatchesToRetain = 1,
178183
compactibleLog => {
179184
val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
180185

@@ -194,25 +199,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
194199
compactibleLog.add(1, Array("some_path_1"))
195200
assert(Set("0", "1") === listBatchFiles())
196201
compactibleLog.add(2, Array("some_path_2"))
197-
assert(Set("2.compact") === listBatchFiles())
202+
assert(Set("0", "1", "2.compact") === listBatchFiles())
198203
compactibleLog.add(3, Array("some_path_3"))
199204
assert(Set("2.compact", "3") === listBatchFiles())
200205
compactibleLog.add(4, Array("some_path_4"))
201206
assert(Set("2.compact", "3", "4") === listBatchFiles())
202207
compactibleLog.add(5, Array("some_path_5"))
203-
assert(Set("5.compact") === listBatchFiles())
208+
assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
209+
compactibleLog.add(6, Array("some_path_6"))
210+
assert(Set("5.compact", "6") === listBatchFiles())
204211
})
205212
}
206213

207214
private def withFakeCompactibleFileStreamLog(
208215
fileCleanupDelayMs: Long,
209216
defaultCompactInterval: Int,
217+
defaultMinBatchesToRetain: Int,
210218
f: FakeCompactibleFileStreamLog => Unit
211219
): Unit = {
212220
withTempDir { file =>
213221
val compactibleLog = new FakeCompactibleFileStreamLog(
214222
fileCleanupDelayMs,
215223
defaultCompactInterval,
224+
defaultMinBatchesToRetain,
216225
spark,
217226
file.getCanonicalPath)
218227
f(compactibleLog)
@@ -227,6 +236,7 @@ object FakeCompactibleFileStreamLog {
227236
class FakeCompactibleFileStreamLog(
228237
_fileCleanupDelayMs: Long,
229238
_defaultCompactInterval: Int,
239+
_defaultMinBatchesToRetain: Int,
230240
sparkSession: SparkSession,
231241
path: String)
232242
extends CompactibleFileStreamLog[String](
@@ -241,5 +251,7 @@ class FakeCompactibleFileStreamLog(
241251

242252
override protected def defaultCompactInterval: Int = _defaultCompactInterval
243253

254+
override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain
255+
244256
override def compactLogs(logs: Seq[String]): Seq[String] = logs
245257
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,11 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
151151

152152
testWithUninterruptibleThread("delete expired file") {
153153
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
154-
// deterministically
154+
// deterministically and one min batches to retain
155155
withSQLConf(
156156
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
157-
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
157+
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0",
158+
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
158159
withFileStreamSinkLog { sinkLog =>
159160
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
160161

@@ -174,13 +175,52 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
174175
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
175176
assert(Set("0", "1") === listBatchFiles())
176177
sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
177-
assert(Set("2.compact") === listBatchFiles())
178+
assert(Set("0", "1", "2.compact") === listBatchFiles())
178179
sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
179180
assert(Set("2.compact", "3") === listBatchFiles())
180181
sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
181182
assert(Set("2.compact", "3", "4") === listBatchFiles())
182183
sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
183-
assert(Set("5.compact") === listBatchFiles())
184+
assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
185+
sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION)))
186+
assert(Set("5.compact", "6") === listBatchFiles())
187+
}
188+
}
189+
190+
withSQLConf(
191+
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
192+
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0",
193+
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
194+
withFileStreamSinkLog { sinkLog =>
195+
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
196+
197+
def listBatchFiles(): Set[String] = {
198+
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
199+
try {
200+
getBatchIdFromFileName(fileName)
201+
true
202+
} catch {
203+
case _: NumberFormatException => false
204+
}
205+
}.toSet
206+
}
207+
208+
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
209+
assert(Set("0") === listBatchFiles())
210+
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
211+
assert(Set("0", "1") === listBatchFiles())
212+
sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
213+
assert(Set("0", "1", "2.compact") === listBatchFiles())
214+
sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
215+
assert(Set("0", "1", "2.compact", "3") === listBatchFiles())
216+
sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
217+
assert(Set("2.compact", "3", "4") === listBatchFiles())
218+
sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
219+
assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
220+
sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION)))
221+
assert(Set("2.compact", "3", "4", "5.compact", "6") === listBatchFiles())
222+
sinkLog.add(7, Array(newFakeSinkFileStatus("/a/b/7", FileStreamSinkLog.ADD_ACTION)))
223+
assert(Set("5.compact", "6", "7") === listBatchFiles())
184224
}
185225
}
186226
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,9 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
376376
val opId = 0
377377
val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
378378
val storeId = StateStoreId(dir, opId, 0)
379-
val storeConf = StateStoreConf.empty
379+
val sqlConf = new SQLConf()
380+
sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
381+
val storeConf = StateStoreConf(sqlConf)
380382
val hadoopConf = new Configuration()
381383
val provider = new HDFSBackedStateStoreProvider(
382384
storeId, keySchema, valueSchema, storeConf, hadoopConf)
@@ -606,6 +608,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
606608
): HDFSBackedStateStoreProvider = {
607609
val sqlConf = new SQLConf()
608610
sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot)
611+
sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
609612
new HDFSBackedStateStoreProvider(
610613
StateStoreId(dir, opId, partition),
611614
keySchema,

0 commit comments

Comments
 (0)