From 4f43e823177ec7654d4e77eaf204d5617749ecf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20Mart=C3=ADn?= Date: Wed, 18 Feb 2026 08:53:31 +0100 Subject: [PATCH 1/3] Add max retry checks for `FetchNotificationsWorker` --- .../workmanager/FetchNotificationsWorker.kt | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt index eeac5ff66fa..bc335f3e558 100644 --- a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt +++ b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt @@ -39,9 +39,12 @@ import kotlinx.coroutines.withTimeoutOrNull import timber.log.Timber import kotlin.time.Duration.Companion.seconds +private const val MAX_RETRY_ATTEMPTS = 3 +private const val TAG = "FetchNotificationsWorker" + @AssistedInject class FetchNotificationsWorker( - @Assisted workerParams: WorkerParameters, + @Assisted private val workerParams: WorkerParameters, @ApplicationContext private val context: Context, private val networkMonitor: NetworkMonitor, private val eventResolver: NotifiableEventResolver, @@ -53,22 +56,27 @@ class FetchNotificationsWorker( private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider, ) : CoroutineWorker(context, workerParams) { override suspend fun doWork(): Result = withContext(coroutineDispatchers.io) { - Timber.d("FetchNotificationsWorker started") + Timber.tag(TAG).d("FetchNotificationsWorker started") val requests = workerDataConverter.deserialize(inputData) ?: return@withContext Result.failure() // Wait for network to be available, but not more than 10 seconds val hasNetwork = withTimeoutOrNull(10.seconds) { networkMonitor.connectivity.first { it == NetworkStatus.Connected } } != null if (!hasNetwork) { - Timber.w("No network, retrying later") - return@withContext Result.retry() + if (workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS) { + Timber.tag(TAG).w("No network, retrying later") + return@withContext Result.retry() + } else { + Timber.tag(TAG).w("No network available and reached max retry attempts (${workerParams.runAttemptCount}/$MAX_RETRY_ATTEMPTS)") + return@withContext Result.failure() + } } val failedSyncForSessions = mutableMapOf() val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap() for ((sessionId, notificationRequests) in groupedRequests) { - Timber.d("Processing notification requests for session $sessionId") + Timber.tag(TAG).d("Processing notification requests for session $sessionId") eventResolver.resolveEvents(sessionId, notificationRequests) .fold( onSuccess = { result -> @@ -87,24 +95,26 @@ class FetchNotificationsWorker( @Suppress("LoopWithTooManyJumpStatements") for ((failedSessionId, exception) in failedSyncForSessions) { if (exception.cause is SessionRestorationException) { - Timber.e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching") + Timber.tag(TAG).e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching") groupedRequests.remove(failedSessionId) continue } val requestsToRetry = groupedRequests[failedSessionId] ?: continue - Timber.d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId") - workManagerScheduler.submit( - SyncNotificationWorkManagerRequest( - sessionId = failedSessionId, - notificationEventRequests = requestsToRetry, - workerDataConverter = workerDataConverter, - buildVersionSdkIntProvider = buildVersionSdkIntProvider, + if (workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS) { + Timber.tag(TAG).d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId") + workManagerScheduler.submit( + SyncNotificationWorkManagerRequest( + sessionId = failedSessionId, + notificationEventRequests = requestsToRetry, + workerDataConverter = workerDataConverter, + buildVersionSdkIntProvider = buildVersionSdkIntProvider, + ) ) - ) + } } } - Timber.d("Notifications processed successfully") + Timber.tag(TAG).d("Notifications processed successfully") performOpportunisticSyncIfNeeded(groupedRequests) @@ -118,7 +128,7 @@ class FetchNotificationsWorker( runCatchingExceptions { syncOnNotifiableEvent(notificationRequests) }.onFailure { - Timber.e(it, "Failed to sync on notifiable events for session $sessionId") + Timber.tag(TAG).e(it, "Failed to sync on notifiable events for session $sessionId") } } } From 9223eb2b44b8ef56370fa322a63a65a26d64b4a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20Mart=C3=ADn?= Date: Wed, 18 Feb 2026 10:33:26 +0100 Subject: [PATCH 2/3] Add a backoff criteria for fetching notifications: retry after 30s in a linear way --- .../impl/workmanager/SyncNotificationWorkManagerRequest.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt index 50ef28903cf..783ceebdf06 100644 --- a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt +++ b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt @@ -9,6 +9,7 @@ package io.element.android.libraries.push.impl.workmanager import android.os.Build +import androidx.work.BackoffPolicy import androidx.work.OneTimeWorkRequestBuilder import androidx.work.OutOfQuotaPolicy import androidx.work.WorkRequest @@ -22,6 +23,8 @@ import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import timber.log.Timber import java.security.InvalidParameterException +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration class SyncNotificationWorkManagerRequest( private val sessionId: SessionId, @@ -46,6 +49,7 @@ class SyncNotificationWorkManagerRequest( setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST) } } + .setBackoffCriteria(BackoffPolicy.LINEAR, 30.seconds.toJavaDuration()) .setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC)) // TODO investigate using this instead of the resolver queue // .setInputMerger() From b3f43add19d099b97a919951ab08e8afb228d635 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20Mart=C3=ADn?= Date: Mon, 23 Feb 2026 23:02:48 +0100 Subject: [PATCH 3/3] Allow requests when network is not available to be re-scheduled indefinitely Also, re-schedule those requests that failed because of a network connection hiccup --- .../workmanager/FetchNotificationsWorker.kt | 108 ++++++++++++++---- .../SyncNotificationWorkManagerRequest.kt | 6 + .../FetchNotificationWorkerTest.kt | 4 +- 3 files changed, 91 insertions(+), 27 deletions(-) diff --git a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt index bc335f3e558..971c7e6e02a 100644 --- a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt +++ b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt @@ -24,6 +24,8 @@ import io.element.android.libraries.core.extensions.runCatchingExceptions import io.element.android.libraries.di.annotations.ApplicationContext import io.element.android.libraries.matrix.api.auth.SessionRestorationException import io.element.android.libraries.matrix.api.core.SessionId +import io.element.android.libraries.matrix.api.exception.ClientException +import io.element.android.libraries.matrix.api.exception.isNetworkError import io.element.android.libraries.push.api.push.NotificationEventRequest import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver @@ -40,7 +42,8 @@ import timber.log.Timber import kotlin.time.Duration.Companion.seconds private const val MAX_RETRY_ATTEMPTS = 3 -private const val TAG = "FetchNotificationsWorker" +private val rescheduleDelay = 30.seconds +private const val TAG = "NotificationsWorker" @AssistedInject class FetchNotificationsWorker( @@ -57,29 +60,47 @@ class FetchNotificationsWorker( ) : CoroutineWorker(context, workerParams) { override suspend fun doWork(): Result = withContext(coroutineDispatchers.io) { Timber.tag(TAG).d("FetchNotificationsWorker started") + val canRetry = workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS + val requests = workerDataConverter.deserialize(inputData) ?: return@withContext Result.failure() // Wait for network to be available, but not more than 10 seconds val hasNetwork = withTimeoutOrNull(10.seconds) { networkMonitor.connectivity.first { it == NetworkStatus.Connected } } != null if (!hasNetwork) { - if (workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS) { - Timber.tag(TAG).w("No network, retrying later") - return@withContext Result.retry() - } else { - Timber.tag(TAG).w("No network available and reached max retry attempts (${workerParams.runAttemptCount}/$MAX_RETRY_ATTEMPTS)") - return@withContext Result.failure() - } + Timber.tag(TAG).w("No network, re-scheduling to retry later") + val sessionId = requests.first().sessionId + workManagerScheduler.submit( + SyncNotificationWorkManagerRequest( + sessionId = sessionId, + notificationEventRequests = requests, + workerDataConverter = workerDataConverter, + buildVersionSdkIntProvider = buildVersionSdkIntProvider, + delay = rescheduleDelay, + ) + ) + return@withContext Result.failure() } val failedSyncForSessions = mutableMapOf() val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap() + val recoverableFailedRequests = mutableSetOf() for ((sessionId, notificationRequests) in groupedRequests) { Timber.tag(TAG).d("Processing notification requests for session $sessionId") eventResolver.resolveEvents(sessionId, notificationRequests) .fold( onSuccess = { result -> + // Store failed but recoverable requests + recoverableFailedRequests.addAll( + result + .filter { (_, eventResult) -> + val exception = eventResult.exceptionOrNull() + exception is ClientException.Generic && exception.isNetworkError() + } + .map { it.key } + ) + // Update the resolved results in the queue (queue.results as MutableSharedFlow).emit(requests to result) }, @@ -90,8 +111,36 @@ class FetchNotificationsWorker( ) } - // If there were failures for whole sessions, we retry all their requests - if (failedSyncForSessions.isNotEmpty()) { + // Handle failures, re-schedule and retry/fail as needed + handleFailures( + canRetry = canRetry, + requests = requests, + recoverableFailedRequests = recoverableFailedRequests, + failedSyncForSessions = failedSyncForSessions, + )?.let { result -> + return@withContext result + } + + Timber.tag(TAG).d("Notifications processed successfully") + + performOpportunisticSyncIfNeeded(groupedRequests) + + Result.success() + } + + private fun handleFailures( + canRetry: Boolean, + requests: List, + recoverableFailedRequests: Set, + failedSyncForSessions: Map, + ): Result? { + val allRequestsFailed = recoverableFailedRequests == requests.toSet() + if (allRequestsFailed) { + return if (canRetry) Result.retry() else Result.failure() + } else if (!canRetry) { + return Result.failure() + } else if (failedSyncForSessions.isNotEmpty()) { + val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap() @Suppress("LoopWithTooManyJumpStatements") for ((failedSessionId, exception) in failedSyncForSessions) { if (exception.cause is SessionRestorationException) { @@ -100,25 +149,34 @@ class FetchNotificationsWorker( continue } val requestsToRetry = groupedRequests[failedSessionId] ?: continue - if (workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS) { - Timber.tag(TAG).d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId") - workManagerScheduler.submit( - SyncNotificationWorkManagerRequest( - sessionId = failedSessionId, - notificationEventRequests = requestsToRetry, - workerDataConverter = workerDataConverter, - buildVersionSdkIntProvider = buildVersionSdkIntProvider, - ) + Timber.tag(TAG).d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId") + workManagerScheduler.submit( + SyncNotificationWorkManagerRequest( + sessionId = failedSessionId, + notificationEventRequests = requestsToRetry, + workerDataConverter = workerDataConverter, + buildVersionSdkIntProvider = buildVersionSdkIntProvider, + delay = rescheduleDelay, ) - } + ) + } + } else if (recoverableFailedRequests.isNotEmpty()) { + val bySessionId = recoverableFailedRequests.groupBy { it.sessionId } + for ((sessionId, failedRequests) in bySessionId) { + Timber.tag(TAG).d("Re-scheduling ${recoverableFailedRequests.size} recoverable failed notification requests for $sessionId") + workManagerScheduler.submit( + SyncNotificationWorkManagerRequest( + sessionId = sessionId, + notificationEventRequests = failedRequests, + workerDataConverter = workerDataConverter, + buildVersionSdkIntProvider = buildVersionSdkIntProvider, + delay = rescheduleDelay, + ) + ) } } - Timber.tag(TAG).d("Notifications processed successfully") - - performOpportunisticSyncIfNeeded(groupedRequests) - - Result.success() + return null } private suspend fun performOpportunisticSyncIfNeeded( diff --git a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt index 783ceebdf06..0b2cb543da0 100644 --- a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt +++ b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt @@ -23,6 +23,7 @@ import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import timber.log.Timber import java.security.InvalidParameterException +import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -31,6 +32,7 @@ class SyncNotificationWorkManagerRequest( private val notificationEventRequests: List, private val workerDataConverter: SyncNotificationsWorkerDataConverter, private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider, + private val delay: Duration? = null, ) : WorkManagerRequest { override fun build(): Result> { if (notificationEventRequests.isEmpty()) { @@ -48,6 +50,10 @@ class SyncNotificationWorkManagerRequest( if (buildVersionSdkIntProvider.isAtLeast(Build.VERSION_CODES.TIRAMISU)) { setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST) } + + if (delay != null) { + setInitialDelay(delay.toJavaDuration()) + } } .setBackoffCriteria(BackoffPolicy.LINEAR, 30.seconds.toJavaDuration()) .setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC)) diff --git a/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt b/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt index d40ef17b53e..150dd4ff604 100644 --- a/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt +++ b/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt @@ -125,7 +125,7 @@ class FetchNotificationWorkerTest { advanceTimeBy(10.seconds) // The process failed due to a timeout in getting the network connectivity, a retry is scheduled - assertThat(result).isEqualTo(ListenableWorker.Result.retry()) + assertThat(result).isEqualTo(ListenableWorker.Result.failure()) } @Test @@ -166,7 +166,7 @@ class FetchNotificationWorkerTest { queue: NotificationResolverQueue = FakeNotificationResolverQueue( processingLambda = { Result.success(ResolvedPushEvent.Event(aNotifiableMessageEvent())) } ), - workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(), + workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(submitLambda = {}), syncOnNotifiableEvent: SyncOnNotifiableEvent = SyncOnNotifiableEvent {}, ) = FetchNotificationsWorker( workerParams = createWorkerParams(workDataOf("requests" to input)),