diff --git a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt index eeac5ff66fa..971c7e6e02a 100644 --- a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt +++ b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationsWorker.kt @@ -24,6 +24,8 @@ import io.element.android.libraries.core.extensions.runCatchingExceptions import io.element.android.libraries.di.annotations.ApplicationContext import io.element.android.libraries.matrix.api.auth.SessionRestorationException import io.element.android.libraries.matrix.api.core.SessionId +import io.element.android.libraries.matrix.api.exception.ClientException +import io.element.android.libraries.matrix.api.exception.isNetworkError import io.element.android.libraries.push.api.push.NotificationEventRequest import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver @@ -39,9 +41,13 @@ import kotlinx.coroutines.withTimeoutOrNull import timber.log.Timber import kotlin.time.Duration.Companion.seconds +private const val MAX_RETRY_ATTEMPTS = 3 +private val rescheduleDelay = 30.seconds +private const val TAG = "NotificationsWorker" + @AssistedInject class FetchNotificationsWorker( - @Assisted workerParams: WorkerParameters, + @Assisted private val workerParams: WorkerParameters, @ApplicationContext private val context: Context, private val networkMonitor: NetworkMonitor, private val eventResolver: NotifiableEventResolver, @@ -53,25 +59,48 @@ class FetchNotificationsWorker( private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider, ) : CoroutineWorker(context, workerParams) { override suspend fun doWork(): Result = withContext(coroutineDispatchers.io) { - Timber.d("FetchNotificationsWorker started") + Timber.tag(TAG).d("FetchNotificationsWorker started") + val canRetry = workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS + val requests = workerDataConverter.deserialize(inputData) ?: return@withContext Result.failure() // Wait for network to be available, but not more than 10 seconds val hasNetwork = withTimeoutOrNull(10.seconds) { networkMonitor.connectivity.first { it == NetworkStatus.Connected } } != null if (!hasNetwork) { - Timber.w("No network, retrying later") - return@withContext Result.retry() + Timber.tag(TAG).w("No network, re-scheduling to retry later") + val sessionId = requests.first().sessionId + workManagerScheduler.submit( + SyncNotificationWorkManagerRequest( + sessionId = sessionId, + notificationEventRequests = requests, + workerDataConverter = workerDataConverter, + buildVersionSdkIntProvider = buildVersionSdkIntProvider, + delay = rescheduleDelay, + ) + ) + return@withContext Result.failure() } val failedSyncForSessions = mutableMapOf() val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap() + val recoverableFailedRequests = mutableSetOf() for ((sessionId, notificationRequests) in groupedRequests) { - Timber.d("Processing notification requests for session $sessionId") + Timber.tag(TAG).d("Processing notification requests for session $sessionId") eventResolver.resolveEvents(sessionId, notificationRequests) .fold( onSuccess = { result -> + // Store failed but recoverable requests + recoverableFailedRequests.addAll( + result + .filter { (_, eventResult) -> + val exception = eventResult.exceptionOrNull() + exception is ClientException.Generic && exception.isNetworkError() + } + .map { it.key } + ) + // Update the resolved results in the queue (queue.results as MutableSharedFlow).emit(requests to result) }, @@ -82,33 +111,72 @@ class FetchNotificationsWorker( ) } - // If there were failures for whole sessions, we retry all their requests - if (failedSyncForSessions.isNotEmpty()) { + // Handle failures, re-schedule and retry/fail as needed + handleFailures( + canRetry = canRetry, + requests = requests, + recoverableFailedRequests = recoverableFailedRequests, + failedSyncForSessions = failedSyncForSessions, + )?.let { result -> + return@withContext result + } + + Timber.tag(TAG).d("Notifications processed successfully") + + performOpportunisticSyncIfNeeded(groupedRequests) + + Result.success() + } + + private fun handleFailures( + canRetry: Boolean, + requests: List, + recoverableFailedRequests: Set, + failedSyncForSessions: Map, + ): Result? { + val allRequestsFailed = recoverableFailedRequests == requests.toSet() + if (allRequestsFailed) { + return if (canRetry) Result.retry() else Result.failure() + } else if (!canRetry) { + return Result.failure() + } else if (failedSyncForSessions.isNotEmpty()) { + val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap() @Suppress("LoopWithTooManyJumpStatements") for ((failedSessionId, exception) in failedSyncForSessions) { if (exception.cause is SessionRestorationException) { - Timber.e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching") + Timber.tag(TAG).e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching") groupedRequests.remove(failedSessionId) continue } val requestsToRetry = groupedRequests[failedSessionId] ?: continue - Timber.d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId") + Timber.tag(TAG).d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId") workManagerScheduler.submit( SyncNotificationWorkManagerRequest( sessionId = failedSessionId, notificationEventRequests = requestsToRetry, workerDataConverter = workerDataConverter, buildVersionSdkIntProvider = buildVersionSdkIntProvider, + delay = rescheduleDelay, + ) + ) + } + } else if (recoverableFailedRequests.isNotEmpty()) { + val bySessionId = recoverableFailedRequests.groupBy { it.sessionId } + for ((sessionId, failedRequests) in bySessionId) { + Timber.tag(TAG).d("Re-scheduling ${recoverableFailedRequests.size} recoverable failed notification requests for $sessionId") + workManagerScheduler.submit( + SyncNotificationWorkManagerRequest( + sessionId = sessionId, + notificationEventRequests = failedRequests, + workerDataConverter = workerDataConverter, + buildVersionSdkIntProvider = buildVersionSdkIntProvider, + delay = rescheduleDelay, ) ) } } - Timber.d("Notifications processed successfully") - - performOpportunisticSyncIfNeeded(groupedRequests) - - Result.success() + return null } private suspend fun performOpportunisticSyncIfNeeded( @@ -118,7 +186,7 @@ class FetchNotificationsWorker( runCatchingExceptions { syncOnNotifiableEvent(notificationRequests) }.onFailure { - Timber.e(it, "Failed to sync on notifiable events for session $sessionId") + Timber.tag(TAG).e(it, "Failed to sync on notifiable events for session $sessionId") } } } diff --git a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt index 50ef28903cf..0b2cb543da0 100644 --- a/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt +++ b/libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/workmanager/SyncNotificationWorkManagerRequest.kt @@ -9,6 +9,7 @@ package io.element.android.libraries.push.impl.workmanager import android.os.Build +import androidx.work.BackoffPolicy import androidx.work.OneTimeWorkRequestBuilder import androidx.work.OutOfQuotaPolicy import androidx.work.WorkRequest @@ -22,12 +23,16 @@ import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import timber.log.Timber import java.security.InvalidParameterException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration class SyncNotificationWorkManagerRequest( private val sessionId: SessionId, private val notificationEventRequests: List, private val workerDataConverter: SyncNotificationsWorkerDataConverter, private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider, + private val delay: Duration? = null, ) : WorkManagerRequest { override fun build(): Result> { if (notificationEventRequests.isEmpty()) { @@ -45,7 +50,12 @@ class SyncNotificationWorkManagerRequest( if (buildVersionSdkIntProvider.isAtLeast(Build.VERSION_CODES.TIRAMISU)) { setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST) } + + if (delay != null) { + setInitialDelay(delay.toJavaDuration()) + } } + .setBackoffCriteria(BackoffPolicy.LINEAR, 30.seconds.toJavaDuration()) .setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC)) // TODO investigate using this instead of the resolver queue // .setInputMerger() diff --git a/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt b/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt index d40ef17b53e..150dd4ff604 100644 --- a/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt +++ b/libraries/push/impl/src/test/kotlin/io/element/android/libraries/push/impl/workmanager/FetchNotificationWorkerTest.kt @@ -125,7 +125,7 @@ class FetchNotificationWorkerTest { advanceTimeBy(10.seconds) // The process failed due to a timeout in getting the network connectivity, a retry is scheduled - assertThat(result).isEqualTo(ListenableWorker.Result.retry()) + assertThat(result).isEqualTo(ListenableWorker.Result.failure()) } @Test @@ -166,7 +166,7 @@ class FetchNotificationWorkerTest { queue: NotificationResolverQueue = FakeNotificationResolverQueue( processingLambda = { Result.success(ResolvedPushEvent.Event(aNotifiableMessageEvent())) } ), - workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(), + workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(submitLambda = {}), syncOnNotifiableEvent: SyncOnNotifiableEvent = SyncOnNotifiableEvent {}, ) = FetchNotificationsWorker( workerParams = createWorkerParams(workDataOf("requests" to input)),