Skip to content

Commit 1e7b9f4

Browse files
fix: improve platform sync (#1451)
* fix: remove txmetadata from report * fix: improve PlatformSyncService * fix: don't notify all TX's after each block * fix: RestoreIdentityWorker fix for init sync * fix: update tx history updates * fix: add cleanup monitor * fix: minor fixes
1 parent 348f6ab commit 1e7b9f4

File tree

12 files changed

+179
-77
lines changed

12 files changed

+179
-77
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ buildscript {
33
kotlin_version = '2.1.0'
44
coroutinesVersion = '1.6.4'
55
ok_http_version = '4.12.0'
6-
dashjVersion = '21.1.13'
6+
dashjVersion = '21.1.14-SNAPSHOT'
77
dppVersion = "2.0.2"
88
hiltVersion = '2.53'
99
hiltCompilerVersion = '1.2.0'

wallet/src/de/schildbach/wallet/service/BlockchainServiceImpl.kt

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import de.schildbach.wallet.ui.main.MainActivity
6666
import de.schildbach.wallet.ui.staking.StakingActivity
6767
import de.schildbach.wallet.util.AllowLockTimeRiskAnalysis
6868
import de.schildbach.wallet.util.AllowLockTimeRiskAnalysis.OfflineAnalyzer
69+
import de.schildbach.wallet.util.AnrException
6970
import de.schildbach.wallet.util.BlockchainStateUtils
7071
import de.schildbach.wallet.util.CrashReporter
7172
import de.schildbach.wallet.util.ThrottlingWalletChangeListener
@@ -76,6 +77,7 @@ import kotlinx.coroutines.Dispatchers
7677
import kotlinx.coroutines.Job
7778
import kotlinx.coroutines.SupervisorJob
7879
import kotlinx.coroutines.delay
80+
import kotlinx.coroutines.isActive
7981
import kotlinx.coroutines.launch
8082
import kotlinx.coroutines.sync.Mutex
8183
import kotlinx.coroutines.sync.withLock
@@ -1275,8 +1277,9 @@ class BlockchainServiceImpl : LifecycleService(), BlockchainService {
12751277
blockStore?.chainHead // detect corruptions as early as possible
12761278
headerStore = SPVBlockStore(Constants.NETWORK_PARAMETERS, headerChainFile)
12771279
headerStore?.chainHead // detect corruptions as early as possible
1280+
wallet.isNotifyTxOnNextBlock = false
12781281
val blockchainStoreMemoryError = serviceConfig.get(BLOCKCHAIN_STORE_MEMORY_FAILURE) ?: false
1279-
if (blockchainStoreMemoryError /*&& serviceConfig.getBlockStoreLastFix() == BlockStoreLastFix.COPY_FILES*/) {
1282+
if (blockchainStoreMemoryError) {
12801283
try {
12811284
// the stores are open, copy 100 blocks to a new store.
12821285
log.info("attempting to fix blockchain memory stalls by copying from store: {}", serviceConfig.getBlockStoreLastFix())
@@ -1694,6 +1697,27 @@ class BlockchainServiceImpl : LifecycleService(), BlockchainService {
16941697
ProcessLifecycleOwner.get().lifecycle.removeObserver(appLifecycleObserver)
16951698

16961699
log.info("receivers unregistered, Now starting coroutine to finish the rest of the cleanup")
1700+
1701+
// Monitor the entire cleanup process with timeout reporting
1702+
val cleanupThread = Thread.currentThread()
1703+
val cleanupMonitorJob = serviceScope.launch {
1704+
delay(5_000) // Wait 5 seconds before first check
1705+
var checkCount = 0
1706+
while (isActive) {
1707+
if (checkCount > 4) return@launch
1708+
checkCount++
1709+
log.warn("onDestroy() cleanup is taking longer than {} seconds", 5 * checkCount)
1710+
try {
1711+
val anrException = AnrException(cleanupThread)
1712+
anrException.logProcessMap()
1713+
} catch (e: Exception) {
1714+
log.error("Failed to dump thread traces during cleanup timeout", e)
1715+
}
1716+
// Wait another 5 seconds before next check
1717+
delay(5_000)
1718+
}
1719+
}
1720+
16971721
serviceScope.launch {
16981722
try {
16991723
log.info("The onCreateCompleted is active: {}", onCreateCompleted.isActive)
@@ -1749,7 +1773,7 @@ class BlockchainServiceImpl : LifecycleService(), BlockchainService {
17491773
log.info("shutting down peerGroup and system services")
17501774
propagateContext()
17511775
log.info("CLEANUP STEP 1: About to close dashSystemService.system")
1752-
dashSystemService.system.close()
1776+
dashSystemService.system.close()
17531777
log.info("CLEANUP STEP 1: Dash system services are shutdown")
17541778
peerGroup!!.removeDisconnectedEventListener(peerConnectivityListener)
17551779
peerGroup!!.removeConnectedEventListener(peerConnectivityListener)
@@ -1814,6 +1838,8 @@ class BlockchainServiceImpl : LifecycleService(), BlockchainService {
18141838
}
18151839
isCleaningUp.set(false)
18161840
cleanupDeferred?.complete(Unit)
1841+
// Cancel the cleanup monitor since cleanup is done
1842+
cleanupMonitorJob.cancel()
18171843
}
18181844
}
18191845
log.info("service was up for " + (System.currentTimeMillis() - serviceCreatedAt) / 1000 / 60 + " minutes")

wallet/src/de/schildbach/wallet/service/platform/PlatformSyncService.kt

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,11 @@ class PlatformSynchronizationService @Inject constructor(
153153

154154
val UPDATE_TIMER_DELAY = 15.seconds
155155
val PUSH_PERIOD = if (BuildConfig.DEBUG || Constants.IS_TESTNET_BUILD) 3.minutes else 3.hours
156-
val WEEKLY_PUSH_PERIOD = 7.days
156+
val WEEKLY_PUSH_PERIOD = 7.days.inWholeMilliseconds
157157
val CUTOFF_MIN = if (BuildConfig.DEBUG || Constants.IS_TESTNET_BUILD) 3.minutes else 3.hours
158158
val CUTOFF_MAX = if (BuildConfig.DEBUG || Constants.IS_TESTNET_BUILD) 6.minutes else 6.hours
159159
private val PUBLISH = MarkerFactory.getMarker("PUBLISH")
160+
val NON_CONTACTS_UPDATE_PERIOD = 1.minutes.inWholeMilliseconds
160161
}
161162

162163
private var platformSyncJob: Job? = null
@@ -171,6 +172,8 @@ class PlatformSynchronizationService @Inject constructor(
171172
// TODO: cancel these on shutdown?
172173
private val syncJob = SupervisorJob()
173174
private val syncScope = CoroutineScope(Dispatchers.IO + syncJob)
175+
private var lastTopupUpdateTime = 0L
176+
private var lastMetadataUpdateTime = 0L
174177

175178
override fun init() {
176179
syncScope.launch { platformRepo.init() }
@@ -234,7 +237,7 @@ class PlatformSynchronizationService @Inject constructor(
234237
val meetsSaveFrequency = when (saveSettings.saveFrequency) {
235238
TxMetadataSaveFrequency.afterTenTransactions -> newDataItems >= 10
236239
TxMetadataSaveFrequency.afterEveryTransaction -> newDataItems >= 1
237-
TxMetadataSaveFrequency.oncePerWeek -> lastPush < now - WEEKLY_PUSH_PERIOD.inWholeMilliseconds && newDataItems >= 1
240+
TxMetadataSaveFrequency.oncePerWeek -> lastPush < now - WEEKLY_PUSH_PERIOD && newDataItems >= 1
238241
}
239242
// publish no more frequently than every 3 hours
240243
val shouldPushToNetwork = (lastPush < now - PUSH_PERIOD.inWholeMilliseconds)
@@ -281,7 +284,8 @@ class PlatformSynchronizationService @Inject constructor(
281284
}
282285
log.info("updateContactRequests($initialSync) checking if can run")
283286
// only allow this method to execute once at a time
284-
if (updatingContacts.get()) {
287+
// allow it to continue if the last state was recovery complete
288+
if (updatingContacts.get() && lastPreBlockStage != PreBlockStage.RecoveryComplete) {
285289
log.info("updateContactRequests is already running: {}", lastPreBlockStage)
286290
return
287291
}
@@ -361,9 +365,10 @@ class PlatformSynchronizationService @Inject constructor(
361365
updatingContacts.set(true)
362366
updateSyncStatus(PreBlockStage.Starting)
363367
updateSyncStatus(PreBlockStage.Initialization)
364-
checkDatabaseIntegrity(userId)
365-
366-
updateSyncStatus(PreBlockStage.FixMissingProfiles)
368+
if (!initialSync) {
369+
checkDatabaseIntegrity(userId)
370+
updateSyncStatus(PreBlockStage.FixMissingProfiles)
371+
}
367372

368373
// Get all out our contact requests
369374
val toContactDocuments = platform.contactRequests.get(
@@ -435,29 +440,47 @@ class PlatformSynchronizationService @Inject constructor(
435440
updateSyncStatus(PreBlockStage.GetNewProfiles)
436441

437442
coroutineScope {
443+
try {
444+
val myEncryptionKey = platformRepo.getWalletEncryptionKey()
445+
438446
awaitAll(
439447
// fetch updated invitations
440448
async {
441-
updateInvitations()
442-
updateSyncStatus(PreBlockStage.GetInvites)
449+
if (Constants.SUPPORTS_INVITES) {
450+
updateInvitations()
451+
updateSyncStatus(PreBlockStage.GetInvites)
452+
}
443453
},
444454
// fetch updated transaction metadata
445455
async {
446-
updateTransactionMetadata()
447-
updateSyncStatus(PreBlockStage.TransactionMetadata)
448-
}, // TODO: this is skipped in VOTING state, but shouldn't be
456+
val shouldUpdate = System.currentTimeMillis() - lastMetadataUpdateTime >= NON_CONTACTS_UPDATE_PERIOD
457+
if (shouldUpdate) {
458+
updateTransactionMetadata(myEncryptionKey)
459+
updateSyncStatus(PreBlockStage.TransactionMetadata)
460+
lastMetadataUpdateTime = System.currentTimeMillis()
461+
}
462+
}, // TODO: this is skipped in VOTING state, but shouldn't be
449463
// fetch updated profiles from the network
450464
async {
451465
updateContactProfiles(userId, min(lastContactRequestTimeToMe, lastContactRequestTimeFromMe))
452466
updateSyncStatus(PreBlockStage.GetUpdatedProfiles)
453467
},
454468
// check for unused topups
455469
async {
456-
checkTopUps()
457-
updateSyncStatus(PreBlockStage.Topups)
470+
val shouldUpdate = System.currentTimeMillis() - lastTopupUpdateTime >= NON_CONTACTS_UPDATE_PERIOD
471+
if (shouldUpdate) {
472+
checkTopUps(myEncryptionKey)
473+
updateSyncStatus(PreBlockStage.Topups)
474+
lastTopupUpdateTime = System.currentTimeMillis()
475+
}
458476
}
459477
)
478+
} catch (e: Exception) {
479+
log.error("error obtaining encryption key", e)
480+
return@coroutineScope
481+
}
460482
}
483+
461484
} else {
462485
if (config.get(DashPayConfig.FREQUENT_CONTACTS) == null) {
463486
platformRepo.updateFrequentContacts()
@@ -788,12 +811,8 @@ class PlatformSynchronizationService @Inject constructor(
788811
}
789812
}
790813

791-
private suspend fun updateTransactionMetadata() {
792-
if (!Constants.SUPPORTS_TXMETADATA) {
793-
return
794-
}
814+
private suspend fun updateTransactionMetadata(myEncryptionKey: KeyParameter?) {
795815
val watch = Stopwatch.createStarted()
796-
val myEncryptionKey = platformRepo.getWalletEncryptionKey()
797816

798817
val lastTxMetadataRequestTime = if (transactionMetadataDocumentDao.countAllRequests() > 0) {
799818
val lastTimeStamp = transactionMetadataDocumentDao.getLastTimestamp()
@@ -1058,6 +1077,7 @@ class PlatformSynchronizationService @Inject constructor(
10581077

10591078
private suspend fun publishTransactionMetadata(
10601079
txMetadataItems: List<TransactionMetadataCacheItem>,
1080+
myEncryptionKey: KeyParameter?,
10611081
progressListener: (suspend (Int) -> Unit)? = null
10621082
): Int {
10631083
if (!platformRepo.hasBlockchainIdentity) {
@@ -1085,11 +1105,11 @@ class PlatformSynchronizationService @Inject constructor(
10851105
)
10861106
}
10871107
progressListener?.invoke(10)
1088-
val walletEncryptionKey = platformRepo.getWalletEncryptionKey()
1108+
//val walletEncryptionKey = platformRepo.getWalletEncryptionKey()
10891109
val keyIndex = 1 + transactionMetadataDocumentDao.countAllRequests()
10901110
platformRepo.blockchainIdentity.publishTxMetaData(
10911111
metadataList,
1092-
walletEncryptionKey,
1112+
myEncryptionKey,
10931113
keyIndex,
10941114
TxMetadataDocument.VERSION_PROTOBUF
10951115
) { progress ->
@@ -1291,7 +1311,8 @@ class PlatformSynchronizationService @Inject constructor(
12911311
log.info("publishing ${itemsToPublish.values.size} tx metadata items to platform")
12921312

12931313
// publish non-empty items
1294-
publishTransactionMetadata(itemsToPublish.values.filter { it.isNotEmpty() }) {
1314+
val myEncryptionKey = platformRepo.getWalletEncryptionKey()
1315+
publishTransactionMetadata(itemsToPublish.values.filter { it.isNotEmpty() }, myEncryptionKey) {
12951316
progressListener?.invoke(10 + it * 90 / 100)
12961317
}
12971318
log.info("published ${itemsToPublish.values.size} tx metadata items to platform")
@@ -1302,7 +1323,7 @@ class PlatformSynchronizationService @Inject constructor(
13021323
config.set(DashPayConfig.LAST_METADATA_PUSH, System.currentTimeMillis())
13031324
itemsSaved = changedItems.size
13041325

1305-
updateTransactionMetadata()
1326+
updateTransactionMetadata(myEncryptionKey)
13061327
} catch (_: CancellationException) {
13071328
log.info("publishing updates canceled")
13081329
} catch (e: Exception) {
@@ -1646,12 +1667,10 @@ class PlatformSynchronizationService @Inject constructor(
16461667
}
16471668

16481669
private var hasCheckedTopups = false // only run once
1649-
private suspend fun checkTopUps() {
1670+
private suspend fun checkTopUps(myEncryptionKey: KeyParameter?) {
16501671
if (!hasCheckedTopups) {
1651-
platformRepo.getWalletEncryptionKey()?.let {
1652-
topUpRepository.checkTopUps(it)
1653-
hasCheckedTopups = true
1654-
}
1672+
topUpRepository.checkTopUps(myEncryptionKey)
1673+
hasCheckedTopups = true
16551674
}
16561675
}
16571676
}

wallet/src/de/schildbach/wallet/service/platform/TopUpRepository.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ interface TopUpRepository {
8282
/** top up identity and save topup state to the db */
8383
suspend fun topUpIdentity(
8484
topupAssetLockTransaction: AssetLockTransaction,
85-
aesKeyParameter: KeyParameter
85+
aesKeyParameter: KeyParameter?
8686
)
8787

88-
suspend fun checkTopUps(aesKeyParameter: KeyParameter)
88+
suspend fun checkTopUps(aesKeyParameter: KeyParameter?)
8989
}
9090

9191
class TopUpRepositoryImpl @Inject constructor(
@@ -251,7 +251,7 @@ class TopUpRepositoryImpl @Inject constructor(
251251

252252
override suspend fun topUpIdentity(
253253
topUpTx: AssetLockTransaction,
254-
aesKeyParameter: KeyParameter
254+
aesKeyParameter: KeyParameter?
255255
) {
256256
val topUp = topUpsDao.getByTxId(
257257
topUpTx.txId
@@ -341,7 +341,7 @@ class TopUpRepositoryImpl @Inject constructor(
341341

342342
private var checkedPreviousTopUps = false
343343

344-
override suspend fun checkTopUps(aesKeyParameter: KeyParameter) {
344+
override suspend fun checkTopUps(aesKeyParameter: KeyParameter?) {
345345
val topUps = topUpsDao.getUnused()
346346
topUps.forEach { topUp ->
347347
try {

wallet/src/de/schildbach/wallet/service/platform/work/RestoreIdentityWorker.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ class RestoreIdentityWorker @AssistedInject constructor(
351351

352352
platformSyncService.updateSyncStatus(PreBlockStage.RecoveryComplete)
353353
platformRepo.init()
354-
platformSyncService.initSync()
354+
platformSyncService.initSync(true)
355355
} catch (e: Exception) {
356356
val blockchainIdentityData = identityConfig.load()
357357
blockchainIdentityData?.let {

0 commit comments

Comments
 (0)