Skip to content

Commit e0af76a

Browse files
brkyvztdas
authored andcommitted
[SPARK-21370][SS] Add test for state reliability when one read-only state store aborts after read-write state store commits
## What changes were proposed in this pull request? During Streaming Aggregation, we have two StateStores per task, one used as read-only in `StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort` will be called for these StateStores if they haven't committed their results. We need to make sure that `abort` in read-only store after a `commit` in the read-write store doesn't accidentally lead to the deletion of state. This PR adds a test for this condition. ## How was this patch tested? This PR adds a test. Author: Burak Yavuz <[email protected]> Closes apache#18603 from brkyvz/ss-test.
1 parent e16e8c7 commit e0af76a

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,37 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
665665
checkInvalidVersion(3)
666666
}
667667

668+
test("two concurrent StateStores - one for read-only and one for read-write") {
669+
// During Streaming Aggregation, we have two StateStores per task, one used as read-only in
670+
// `StateStoreRestoreExec`, and one read-write used in `StateStoreSaveExec`. `StateStore.abort`
671+
// will be called for these StateStores if they haven't committed their results. We need to
672+
// make sure that `abort` in read-only store after a `commit` in the read-write store doesn't
673+
// accidentally lead to the deletion of state.
674+
val dir = newDir()
675+
val storeId = StateStoreId(dir, 0L, 1)
676+
val provider0 = newStoreProvider(storeId)
677+
// prime state
678+
val store = provider0.getStore(0)
679+
val key = "a"
680+
put(store, key, 1)
681+
store.commit()
682+
assert(rowsToSet(store.iterator()) === Set(key -> 1))
683+
684+
// two state stores
685+
val provider1 = newStoreProvider(storeId)
686+
val restoreStore = provider1.getStore(1)
687+
val saveStore = provider1.getStore(1)
688+
689+
put(saveStore, key, get(restoreStore, key).get + 1)
690+
saveStore.commit()
691+
restoreStore.abort()
692+
693+
// check that state is correct for next batch
694+
val provider2 = newStoreProvider(storeId)
695+
val finalStore = provider2.getStore(2)
696+
assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
697+
}
698+
668699
/** Return a new provider with a random id */
669700
def newStoreProvider(): ProviderClass
670701

0 commit comments

Comments
 (0)