Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ import kotlinx.coroutines.withTimeoutOrNull
import timber.log.Timber
import kotlin.time.Duration.Companion.seconds

private const val MAX_RETRY_ATTEMPTS = 3
private const val TAG = "FetchNotificationsWorker"

@AssistedInject
class FetchNotificationsWorker(
@Assisted workerParams: WorkerParameters,
@Assisted private val workerParams: WorkerParameters,
@ApplicationContext private val context: Context,
private val networkMonitor: NetworkMonitor,
private val eventResolver: NotifiableEventResolver,
Expand All @@ -53,22 +56,27 @@ class FetchNotificationsWorker(
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
) : CoroutineWorker(context, workerParams) {
override suspend fun doWork(): Result = withContext(coroutineDispatchers.io) {
Timber.d("FetchNotificationsWorker started")
Timber.tag(TAG).d("FetchNotificationsWorker started")
val requests = workerDataConverter.deserialize(inputData) ?: return@withContext Result.failure()
// Wait for network to be available, but not more than 10 seconds
val hasNetwork = withTimeoutOrNull(10.seconds) {
networkMonitor.connectivity.first { it == NetworkStatus.Connected }
} != null
if (!hasNetwork) {
Timber.w("No network, retrying later")
return@withContext Result.retry()
if (workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS) {
Timber.tag(TAG).w("No network, retrying later")
return@withContext Result.retry()
} else {
Timber.tag(TAG).w("No network available and reached max retry attempts (${workerParams.runAttemptCount}/$MAX_RETRY_ATTEMPTS)")
return@withContext Result.failure()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if there is no network we should not count as a retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can make an exception just for this case, to be honest 🫤 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could let the worker wait for "network" instead of retrying?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the workers can be alive in background for long. We could try, but I'm sure that would have some kind of penalty with the future scheduling.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make the retry backoff exponential instead of linear: my thoughts on this was to have it linear so it's retried after a short while, but I was thinking about a temporary issue in the HS, not the connection failing.

}

val failedSyncForSessions = mutableMapOf<SessionId, Throwable>()

val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap()
for ((sessionId, notificationRequests) in groupedRequests) {
Timber.d("Processing notification requests for session $sessionId")
Timber.tag(TAG).d("Processing notification requests for session $sessionId")
eventResolver.resolveEvents(sessionId, notificationRequests)
.fold(
onSuccess = { result ->
Expand All @@ -87,24 +95,26 @@ class FetchNotificationsWorker(
@Suppress("LoopWithTooManyJumpStatements")
for ((failedSessionId, exception) in failedSyncForSessions) {
if (exception.cause is SessionRestorationException) {
Timber.e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching")
Timber.tag(TAG).e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching")
groupedRequests.remove(failedSessionId)
continue
}
val requestsToRetry = groupedRequests[failedSessionId] ?: continue
Timber.d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId")
workManagerScheduler.submit(
SyncNotificationWorkManagerRequest(
sessionId = failedSessionId,
notificationEventRequests = requestsToRetry,
workerDataConverter = workerDataConverter,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
if (workerParams.runAttemptCount < MAX_RETRY_ATTEMPTS) {
Timber.tag(TAG).d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId")
workManagerScheduler.submit(
SyncNotificationWorkManagerRequest(
sessionId = failedSessionId,
notificationEventRequests = requestsToRetry,
workerDataConverter = workerDataConverter,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
)
)
)
}
}
}

Timber.d("Notifications processed successfully")
Timber.tag(TAG).d("Notifications processed successfully")

performOpportunisticSyncIfNeeded(groupedRequests)

Expand All @@ -118,7 +128,7 @@ class FetchNotificationsWorker(
runCatchingExceptions {
syncOnNotifiableEvent(notificationRequests)
}.onFailure {
Timber.e(it, "Failed to sync on notifiable events for session $sessionId")
Timber.tag(TAG).e(it, "Failed to sync on notifiable events for session $sessionId")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package io.element.android.libraries.push.impl.workmanager

import android.os.Build
import androidx.work.BackoffPolicy
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.OutOfQuotaPolicy
import androidx.work.WorkRequest
Expand All @@ -22,6 +23,8 @@ import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import timber.log.Timber
import java.security.InvalidParameterException
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

class SyncNotificationWorkManagerRequest(
private val sessionId: SessionId,
Expand All @@ -46,6 +49,7 @@ class SyncNotificationWorkManagerRequest(
setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
}
}
.setBackoffCriteria(BackoffPolicy.LINEAR, 30.seconds.toJavaDuration())
.setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC))
// TODO investigate using this instead of the resolver queue
// .setInputMerger()
Expand Down
Loading