@@ -38,7 +38,6 @@ import com.harrytmthy.safebox.storage.SafeBoxRecoveryBlobStore
3838import com.harrytmthy.safebox.strategy.ValueFallbackStrategy
3939import kotlinx.coroutines.CompletableDeferred
4040import kotlinx.coroutines.CoroutineDispatcher
41- import kotlinx.coroutines.Job
4241import kotlinx.coroutines.delay
4342import kotlinx.coroutines.launch
4443import kotlinx.coroutines.runBlocking
@@ -78,11 +77,9 @@ internal class SafeBoxEngine private constructor(
7877
7978 private val scanScheduled = AtomicBoolean (false )
8079
81- private var callback: Callback ? = null
82-
83- private var recoveryJob: Job ? = null
80+ private val recoveryScheduled = AtomicBoolean (false )
8481
85- private var recoveryBackoffMs = 1_000L
82+ private var callback : Callback ? = null
8683
8784 init {
8885 launchWithStartingState {
@@ -93,10 +90,6 @@ internal class SafeBoxEngine private constructor(
9390 entries + = it
9491 recoveryEntries + = it
9592 }
96- }.invokeOnCompletion {
97- if (recoveryEntries.isNotEmpty()) {
98- scheduleRecoveryEntriesWrite()
99- }
10093 }
10194 }
10295
@@ -215,7 +208,6 @@ internal class SafeBoxEngine private constructor(
215208 encryptedValue = encryptedValue,
216209 )
217210 recoveryEntries[encryptedKey] = encryptedValue
218- scheduleRecoveryEntriesWrite()
219211 }
220212 }
221213 is Remove -> {
@@ -228,46 +220,32 @@ internal class SafeBoxEngine private constructor(
228220 }
229221 }
230222
231- private fun scheduleRecoveryEntriesWrite () {
232- if (recoveryJob?.isActive == true ) {
233- return
234- }
235- recoveryJob = launchWriteAsync {
236- val fileId = blobStore.getFileName().toBytes()
237- val snapshot = ArrayList (recoveryEntries.entries)
238- var failed = false
239- for ((encryptedKey, encryptedValue) in snapshot) {
240- try {
241- blobStore.write(encryptedKey, encryptedValue)
242- recoveryEntries.remove(encryptedKey)
243- } catch (_: Exception ) {
244- failed = true
245- break
246- }
247- }
248- if (! failed && recoveryEntries.isEmpty()) {
249- recoveryBlobStore.delete(fileId)
250- recoveryBackoffMs = 1_000L
251- }
252- }.apply {
253- invokeOnCompletion { cause ->
254- if (recoveryEntries.isEmpty()) {
255- return @invokeOnCompletion
256- }
257- if (cause != null ) {
258- recoveryBackoffMs = (recoveryBackoffMs * 2 ).coerceAtMost(30_000L )
259- }
260- safeBoxScope.launch(ioDispatcher) {
261- delay(recoveryBackoffMs)
262- scheduleRecoveryEntriesWrite()
223+ private fun scheduleRecoveryEntriesWrite (recoveryBackoffMs : Long ) {
224+ safeBoxScope.launch(ioDispatcher) {
225+ delay(recoveryBackoffMs)
226+ }.invokeOnCompletion {
227+ val nextBackoffMs = (recoveryBackoffMs * 2 ).coerceAtMost(MAX_BACKOFF_MS )
228+ launchWriteAsync(nextBackoffMs) {
229+ val fileName = blobStore.getFileName().toBytes()
230+ val snapshot = ArrayList (recoveryEntries.entries)
231+ val removedKeys = ArrayList <Bytes >()
232+ for ((encryptedKey, encryptedValue) in snapshot) {
233+ try {
234+ blobStore.write(encryptedKey, encryptedValue)
235+ recoveryEntries.remove(encryptedKey)
236+ removedKeys + = encryptedKey
237+ } catch (_: Exception ) {
238+ continue
239+ }
263240 }
241+ recoveryBlobStore.delete(fileName, * removedKeys.toTypedArray())
264242 }
265243 }
266244 }
267245
268- private inline fun launchWithStartingState (crossinline block : suspend () -> Unit ): Job {
246+ private inline fun launchWithStartingState (crossinline block : suspend () -> Unit ) {
269247 updateState(STARTING )
270- return safeBoxScope.launch(ioDispatcher) {
248+ safeBoxScope.launch(ioDispatcher) {
271249 try {
272250 block()
273251 } finally {
@@ -276,12 +254,15 @@ internal class SafeBoxEngine private constructor(
276254 updateState(IDLE )
277255 }
278256 }
257+ }.invokeOnCompletion {
258+ if (recoveryEntries.isNotEmpty()) {
259+ recoveryScheduled.set(true )
260+ scheduleRecoveryEntriesWrite(DEFAULT_BACKOFF_MS )
261+ }
279262 }
280263 }
281264
282- private inline fun launchWriteBlocking (
283- crossinline block : suspend () -> Unit ,
284- ): Boolean {
265+ private inline fun launchWriteBlocking (crossinline block : suspend () -> Unit ): Boolean {
285266 val currentWriteBarrier = CompletableDeferred <Unit >()
286267 val previousWriteBarrier = writeBarrier.getAndSet(currentWriteBarrier)
287268 return runBlocking {
@@ -293,21 +274,37 @@ internal class SafeBoxEngine private constructor(
293274 false
294275 } finally {
295276 currentWriteBarrier.complete(Unit )
277+ if (recoveryEntries.isNotEmpty() && recoveryScheduled.compareAndSet(false , true )) {
278+ scheduleRecoveryEntriesWrite(DEFAULT_BACKOFF_MS )
279+ }
296280 }
297281 }
298282 }
299283
300- private inline fun launchWriteAsync (crossinline block : suspend () -> Unit ): Job {
284+ private inline fun launchWriteAsync (
285+ recoveryBackoffMs : Long = DEFAULT_BACKOFF_MS ,
286+ crossinline block : suspend () -> Unit ,
287+ ) {
301288 val currentWriteBarrier = CompletableDeferred <Unit >()
302289 val previousWriteBarrier = writeBarrier.getAndSet(currentWriteBarrier)
303- return safeBoxScope.launch(ioDispatcher) {
290+ safeBoxScope.launch(ioDispatcher) {
304291 try {
305292 withStateTransition(previousWriteBarrier, block)
306293 } catch (e: Exception ) {
307294 Log .e(" SafeBox" , " Failed to commit changes." , e)
308295 } finally {
309296 currentWriteBarrier.complete(Unit )
310297 }
298+ }.invokeOnCompletion {
299+ if (recoveryBackoffMs == DEFAULT_BACKOFF_MS && recoveryScheduled.get()) {
300+ return @invokeOnCompletion
301+ }
302+ if (recoveryEntries.isNotEmpty()) {
303+ recoveryScheduled.set(true )
304+ scheduleRecoveryEntriesWrite(recoveryBackoffMs)
305+ } else {
306+ recoveryScheduled.set(false )
307+ }
311308 }
312309 }
313310
@@ -390,6 +387,10 @@ internal class SafeBoxEngine private constructor(
390387
391388 internal companion object {
392389
390+ private const val DEFAULT_BACKOFF_MS = 1000L
391+
392+ private const val MAX_BACKOFF_MS = 30000L
393+
393394 fun create (
394395 context : Context ,
395396 fileName : String ,
0 commit comments