Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.matrix.api.auth.SessionRestorationException
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.exception.ClientException
import io.element.android.libraries.matrix.api.exception.isNetworkError
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver
Expand All @@ -39,9 +41,13 @@ import kotlinx.coroutines.withTimeoutOrNull
import timber.log.Timber
import kotlin.time.Duration.Companion.seconds

private const val MAX_RETRY_ATTEMPTS = 3
private val rescheduleDelay = 30.seconds
private const val TAG = "NotificationsWorker"

@AssistedInject
class FetchNotificationsWorker(
@Assisted workerParams: WorkerParameters,
@Assisted private val workerParams: WorkerParameters,
@ApplicationContext private val context: Context,
private val networkMonitor: NetworkMonitor,
private val eventResolver: NotifiableEventResolver,
Expand All @@ -53,25 +59,48 @@ class FetchNotificationsWorker(
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
) : CoroutineWorker(context, workerParams) {
override suspend fun doWork(): Result = withContext(coroutineDispatchers.io) {
Timber.d("FetchNotificationsWorker started")
Timber.tag(TAG).d("FetchNotificationsWorker started")
val canRetry = workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS

val requests = workerDataConverter.deserialize(inputData) ?: return@withContext Result.failure()
// Wait for network to be available, but not more than 10 seconds
val hasNetwork = withTimeoutOrNull(10.seconds) {
networkMonitor.connectivity.first { it == NetworkStatus.Connected }
} != null
if (!hasNetwork) {
Timber.w("No network, retrying later")
return@withContext Result.retry()
Timber.tag(TAG).w("No network, re-scheduling to retry later")
val sessionId = requests.first().sessionId
workManagerScheduler.submit(
SyncNotificationWorkManagerRequest(
sessionId = sessionId,
notificationEventRequests = requests,
workerDataConverter = workerDataConverter,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
delay = rescheduleDelay,
)
)
return@withContext Result.failure()
}

val failedSyncForSessions = mutableMapOf<SessionId, Throwable>()

val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap()
val recoverableFailedRequests = mutableSetOf<NotificationEventRequest>()
for ((sessionId, notificationRequests) in groupedRequests) {
Timber.d("Processing notification requests for session $sessionId")
Timber.tag(TAG).d("Processing notification requests for session $sessionId")
eventResolver.resolveEvents(sessionId, notificationRequests)
.fold(
onSuccess = { result ->
// Store failed but recoverable requests
recoverableFailedRequests.addAll(
result
.filter { (_, eventResult) ->
val exception = eventResult.exceptionOrNull()
exception is ClientException.Generic && exception.isNetworkError()
}
.map { it.key }
)

// Update the resolved results in the queue
(queue.results as MutableSharedFlow).emit(requests to result)
},
Expand All @@ -82,33 +111,72 @@ class FetchNotificationsWorker(
)
}

// If there were failures for whole sessions, we retry all their requests
if (failedSyncForSessions.isNotEmpty()) {
// Handle failures, re-schedule and retry/fail as needed
handleFailures(
canRetry = canRetry,
requests = requests,
recoverableFailedRequests = recoverableFailedRequests,
failedSyncForSessions = failedSyncForSessions,
)?.let { result ->
return@withContext result
}

Timber.tag(TAG).d("Notifications processed successfully")

performOpportunisticSyncIfNeeded(groupedRequests)

Result.success()
}

private fun handleFailures(
canRetry: Boolean,
requests: List<NotificationEventRequest>,
recoverableFailedRequests: Set<NotificationEventRequest>,
failedSyncForSessions: Map<SessionId, Throwable>,
): Result? {
val allRequestsFailed = recoverableFailedRequests == requests.toSet()
if (allRequestsFailed) {
return if (canRetry) Result.retry() else Result.failure()
} else if (!canRetry) {
return Result.failure()
} else if (failedSyncForSessions.isNotEmpty()) {
val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap()
@Suppress("LoopWithTooManyJumpStatements")
for ((failedSessionId, exception) in failedSyncForSessions) {
if (exception.cause is SessionRestorationException) {
Timber.e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching")
Timber.tag(TAG).e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching")
groupedRequests.remove(failedSessionId)
continue
}
val requestsToRetry = groupedRequests[failedSessionId] ?: continue
Timber.d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId")
Timber.tag(TAG).d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId")
workManagerScheduler.submit(
SyncNotificationWorkManagerRequest(
sessionId = failedSessionId,
notificationEventRequests = requestsToRetry,
workerDataConverter = workerDataConverter,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
delay = rescheduleDelay,
)
)
}
} else if (recoverableFailedRequests.isNotEmpty()) {
val bySessionId = recoverableFailedRequests.groupBy { it.sessionId }
for ((sessionId, failedRequests) in bySessionId) {
Timber.tag(TAG).d("Re-scheduling ${recoverableFailedRequests.size} recoverable failed notification requests for $sessionId")
workManagerScheduler.submit(
SyncNotificationWorkManagerRequest(
sessionId = sessionId,
notificationEventRequests = failedRequests,
workerDataConverter = workerDataConverter,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
delay = rescheduleDelay,
)
)
}
}

Timber.d("Notifications processed successfully")

performOpportunisticSyncIfNeeded(groupedRequests)

Result.success()
return null
}

private suspend fun performOpportunisticSyncIfNeeded(
Expand All @@ -118,7 +186,7 @@ class FetchNotificationsWorker(
runCatchingExceptions {
syncOnNotifiableEvent(notificationRequests)
}.onFailure {
Timber.e(it, "Failed to sync on notifiable events for session $sessionId")
Timber.tag(TAG).e(it, "Failed to sync on notifiable events for session $sessionId")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package io.element.android.libraries.push.impl.workmanager

import android.os.Build
import androidx.work.BackoffPolicy
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.OutOfQuotaPolicy
import androidx.work.WorkRequest
Expand All @@ -22,12 +23,16 @@ import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import timber.log.Timber
import java.security.InvalidParameterException
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

class SyncNotificationWorkManagerRequest(
private val sessionId: SessionId,
private val notificationEventRequests: List<NotificationEventRequest>,
private val workerDataConverter: SyncNotificationsWorkerDataConverter,
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
private val delay: Duration? = null,
) : WorkManagerRequest {
override fun build(): Result<List<WorkRequest>> {
if (notificationEventRequests.isEmpty()) {
Expand All @@ -45,7 +50,12 @@ class SyncNotificationWorkManagerRequest(
if (buildVersionSdkIntProvider.isAtLeast(Build.VERSION_CODES.TIRAMISU)) {
setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
}

if (delay != null) {
setInitialDelay(delay.toJavaDuration())
}
}
.setBackoffCriteria(BackoffPolicy.LINEAR, 30.seconds.toJavaDuration())
.setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC))
// TODO investigate using this instead of the resolver queue
// .setInputMerger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class FetchNotificationWorkerTest {
advanceTimeBy(10.seconds)

// The process failed due to a timeout in getting the network connectivity, a retry is scheduled
assertThat(result).isEqualTo(ListenableWorker.Result.retry())
assertThat(result).isEqualTo(ListenableWorker.Result.failure())
}

@Test
Expand Down Expand Up @@ -166,7 +166,7 @@ class FetchNotificationWorkerTest {
queue: NotificationResolverQueue = FakeNotificationResolverQueue(
processingLambda = { Result.success(ResolvedPushEvent.Event(aNotifiableMessageEvent())) }
),
workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(),
workManagerScheduler: FakeWorkManagerScheduler = FakeWorkManagerScheduler(submitLambda = {}),
syncOnNotifiableEvent: SyncOnNotifiableEvent = SyncOnNotifiableEvent {},
) = FetchNotificationsWorker(
workerParams = createWorkerParams(workDataOf("requests" to input)),
Expand Down
Loading