Skip to content

Commit fdd09aa

Browse files
committed
Integrate retryability for outgoing mutation queue
1 parent 162f46a commit fdd09aa

File tree

19 files changed

+2019
-1463
lines changed

19 files changed

+2019
-1463
lines changed

Amplify.xcodeproj/project.pbxproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
2861F5F799C24671B5C4DB8D /* Pods_Amplify_AmplifyTestConfigs_AmplifyTestCommon.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C448F4F6DD01A268675E1C68 /* Pods_Amplify_AmplifyTestConfigs_AmplifyTestCommon.framework */; };
9494
2CFB61C7E80D065C0A885A2F /* Pods_Amplify_AWSPluginsCore_AWSPluginsTestConfigs_AWSPluginsTestCommon.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = D5363CAF9EFAA822FED56808 /* Pods_Amplify_AWSPluginsCore_AWSPluginsTestConfigs_AWSPluginsTestCommon.framework */; };
9595
3263D332138415AF42E64FF7 /* Pods_AmplifyTestApp.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = CDC7F1C368154B364CB74742 /* Pods_AmplifyTestApp.framework */; };
96+
6BB7441023A9954900B0EB6C /* DispatchSource+MakeOneOff.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6BB7440F23A9954900B0EB6C /* DispatchSource+MakeOneOff.swift */; };
9697
7D5ED6C78E25246DDAF2F2EC /* Pods_Amplify.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 84F3A76FB68CEFA45F4BB1BB /* Pods_Amplify.framework */; platformFilter = ios; };
9798
7F27B1DCE59C1E674172CCD6 /* Pods_Amplify_AmplifyTestConfigs_AmplifyTests.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 976D972EC2BBCAAD023694EB /* Pods_Amplify_AmplifyTestConfigs_AmplifyTests.framework */; };
9899
881246F5DCC59436DC932469 /* Pods_Amplify_AWSPluginsCore.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 35D92182B8445C8F9B0FAE94 /* Pods_Amplify_AWSPluginsCore.framework */; };
@@ -642,6 +643,7 @@
642643
687B09E9348F8D29979A2404 /* Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests.debug.xcconfig"; path = "Target Support Files/Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests/Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests.debug.xcconfig"; sourceTree = "<group>"; };
643644
6AF0E4775809F0866F9C44D9 /* Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests.debug.xcconfig"; path = "Target Support Files/Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests/Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests.debug.xcconfig"; sourceTree = "<group>"; };
644645
6BAC32194A15ACB56F07DC87 /* Pods-AWSS3StoragePlugin.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AWSS3StoragePlugin.debug.xcconfig"; path = "Target Support Files/Pods-AWSS3StoragePlugin/Pods-AWSS3StoragePlugin.debug.xcconfig"; sourceTree = "<group>"; };
646+
6BB7440F23A9954900B0EB6C /* DispatchSource+MakeOneOff.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "DispatchSource+MakeOneOff.swift"; sourceTree = "<group>"; };
645647
6C41D3730B7ED4FD62A43E40 /* Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests.debug.xcconfig"; path = "Target Support Files/Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests/Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests.debug.xcconfig"; sourceTree = "<group>"; };
646648
6D51240C78418B733FFA6829 /* Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests.debug.xcconfig"; path = "Target Support Files/Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests/Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests.debug.xcconfig"; sourceTree = "<group>"; };
647649
6D62C9C57736C3BEADEB1E30 /* Pods-AWSPinpointAnalyticsPlugin.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AWSPinpointAnalyticsPlugin.debug.xcconfig"; path = "Target Support Files/Pods-AWSPinpointAnalyticsPlugin/Pods-AWSPinpointAnalyticsPlugin.debug.xcconfig"; sourceTree = "<group>"; };
@@ -1948,6 +1950,7 @@
19481950
FAA64FC42397344D00B9C3C6 /* AtomicValue+RangeReplaceableCollection.swift */,
19491951
FADB3A6723612940006D6FE9 /* BasicClosure.swift */,
19501952
FA56F72222B14B420039754A /* Cancellable.swift */,
1953+
6BB7440F23A9954900B0EB6C /* DispatchSource+MakeOneOff.swift */,
19511954
FA09B9402321BB78000E064D /* JSONValue.swift */,
19521955
FACD264F2386E9410068FBE6 /* JSONValue+KeyPath.swift */,
19531956
FACD264E2386E9410068FBE6 /* JSONValue+Subscript.swift */,
@@ -3439,6 +3442,7 @@
34393442
FAA2E8C223A00D5800E420EA /* APICategory+Resettable.swift in Sources */,
34403443
FAA2E8CC23A02A5400E420EA /* HubCategory+Resettable.swift in Sources */,
34413444
2142099823721F4400FA140C /* RESTOperationRequest.swift in Sources */,
3445+
6BB7441023A9954900B0EB6C /* DispatchSource+MakeOneOff.swift in Sources */,
34423446
21FFF994230C96CB005878EA /* StorageUploadDataOperation.swift in Sources */,
34433447
95DAAB30237E63370028544F /* IdentifyAction.swift in Sources */,
34443448
21D79FE32377F4120057D00D /* SubscriptionConnectionState.swift in Sources */,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//
2+
// Copyright 2018-2019 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Foundation
9+
10+
extension DispatchSource {
11+
/// Convenience function to encapsulate creation of a one-off DispatchSourceTimer for different versions of Swift
12+
///
13+
/// - Parameters:
14+
/// - interval: The future DispatchInterval at which to fire the timer
15+
/// - queue: The queue on which the timer should perform its block
16+
/// - block: The block to invoke when the timer is fired
17+
/// - Returns: The unstarted timer
18+
public static func makeOneOffDispatchSourceTimer(interval: DispatchTimeInterval,
19+
queue: DispatchQueue,
20+
block: @escaping () -> Void ) -> DispatchSourceTimer {
21+
let deadline = DispatchTime.now() + interval
22+
return makeOneOffDispatchSourceTimer(deadline: deadline, queue: queue, block: block)
23+
}
24+
25+
/// Convenience function to encapsulate creation of a one-off DispatchSourceTimer for different versions of Swift
26+
/// - Parameters:
27+
/// - deadline: The time to fire the timer
28+
/// - queue: The queue on which the timer should perform its block
29+
/// - block: The block to invoke when the timer is fired
30+
public static func makeOneOffDispatchSourceTimer(deadline: DispatchTime,
31+
queue: DispatchQueue,
32+
block: @escaping () -> Void ) -> DispatchSourceTimer {
33+
let timer = DispatchSource.makeTimerSource(flags: DispatchSource.TimerFlags(rawValue: 0), queue: queue)
34+
#if swift(>=4)
35+
timer.schedule(deadline: deadline)
36+
#else
37+
timer.scheduleOneshot(deadline: deadline)
38+
#endif
39+
timer.setEventHandler(handler: block)
40+
return timer
41+
}
42+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
//
2+
// Copyright 2018-2019 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import Foundation
10+
import AWSPluginsCore
11+
import Combine
12+
13+
@available(iOS 13.0, *)
14+
final class MutationRetryNotifier {
15+
private var nextSyncTimer: DispatchSourceTimer?
16+
private var handlerQueue = DispatchQueue.global(qos: .default)
17+
var retryMutationCallback: () -> Void
18+
private var reachabilitySubscription: Subscription?
19+
20+
init(advice: RequestRetryAdvice,
21+
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, DataStoreError>?,
22+
retryMutationCallback: @escaping BasicClosure) {
23+
self.retryMutationCallback = retryMutationCallback
24+
25+
networkReachabilityPublisher?.subscribe(self)
26+
27+
let deadline = DispatchTime.now() + advice.retryInterval
28+
scheduleTimer(at: deadline)
29+
}
30+
31+
deinit {
32+
cancel()
33+
}
34+
35+
private func scheduleTimer(at deadline: DispatchTime) {
36+
nextSyncTimer = DispatchSource.makeOneOffDispatchSourceTimer(deadline: deadline, queue: handlerQueue) {
37+
self.notifyCallback()
38+
}
39+
nextSyncTimer?.resume()
40+
}
41+
42+
func cancel() {
43+
reachabilitySubscription?.cancel()
44+
nextSyncTimer?.cancel()
45+
}
46+
47+
func notifyCallback() {
48+
// Call the cancel routine as the purpose of retry is fulfilled
49+
cancel()
50+
retryMutationCallback()
51+
}
52+
}
53+
54+
@available(iOS 13.0, *)
55+
extension MutationRetryNotifier: Subscriber {
56+
func receive(subscription: Subscription) {
57+
log.verbose(#function)
58+
reachabilitySubscription = subscription
59+
subscription.request(.unlimited)
60+
}
61+
62+
func receive(_ reachabilityUpdate: ReachabilityUpdate) -> Subscribers.Demand {
63+
if reachabilityUpdate.isOnline {
64+
notifyCallback()
65+
return .none
66+
}
67+
return .unlimited
68+
}
69+
70+
func receive(completion: Subscribers.Completion<DataStoreError>) {
71+
log.verbose(#function)
72+
reachabilitySubscription?.cancel()
73+
}
74+
}
75+
76+
@available(iOS 13.0, *)
77+
extension MutationRetryNotifier: DefaultLogger { }

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,9 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
152152
return
153153
}
154154

155+
//TODO: Find a way to resolve networkReachabilityPublisher
155156
let syncMutationToCloudOperation =
156-
SyncMutationToCloudOperation(mutationEvent: mutationEvent, api: api) { result in
157+
SyncMutationToCloudOperation(mutationEvent: mutationEvent, api: api, networkReachabilityPublisher: nil) { result in
157158
self.log.verbose("mutationEvent finished: \(mutationEvent); result: \(result)")
158159
self.stateMachine.notify(action: .processedEvent)
159160
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift

Lines changed: 84 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,30 @@ import AWSPluginsCore
1313
/// Publishes a mutation event to the specified Cloud API. Upon receipt of the API response, validates to ensure it is
1414
/// not a retriable error. If it is, attempts a retry until either success or terminal failure. Upon success or
1515
/// terminal failure, publishes the event response to the appropriate ModelReconciliationQueue subject.
16+
@available(iOS 13.0, *)
1617
class SyncMutationToCloudOperation: Operation {
1718

1819
private weak var api: APICategoryGraphQLBehavior?
1920
private let mutationEvent: MutationEvent
2021
private var mutationOperation: GraphQLOperation<MutationSync<AnyModel>>?
22+
private let networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, DataStoreError>?
2123
private let completion: GraphQLOperation<MutationSync<AnyModel>>.EventListener
22-
23-
init(mutationEvent: MutationEvent, api: APICategoryGraphQLBehavior,
24+
private var mutationRetryNotifier: MutationRetryNotifier?
25+
private var requestRetryablePolicy: RequestRetryablePolicy
26+
private var currentAttemptNumber: Int
27+
28+
init(mutationEvent: MutationEvent,
29+
api: APICategoryGraphQLBehavior,
30+
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, DataStoreError>?,
31+
currentAttemptNumber: Int = 1,
32+
requestRetryablePolicy: RequestRetryablePolicy? = RequestRetryablePolicy(),
2433
completion: @escaping GraphQLOperation<MutationSync<AnyModel>>.EventListener) {
2534
self.mutationEvent = mutationEvent
2635
self.api = api
36+
self.networkReachabilityPublisher = networkReachabilityPublisher
2737
self.completion = completion
28-
38+
self.currentAttemptNumber = currentAttemptNumber
39+
self.requestRetryablePolicy = requestRetryablePolicy ?? RequestRetryablePolicy()
2940
super.init()
3041
}
3142

@@ -42,6 +53,13 @@ class SyncMutationToCloudOperation: Operation {
4253
sendMutationToCloud()
4354
}
4455

56+
override func cancel() {
57+
mutationOperation?.cancel()
58+
mutationRetryNotifier?.cancel()
59+
let apiError = APIError.unknown("Operation cancelled", "")
60+
finish(result: .failed(apiError))
61+
}
62+
4563
private func sendMutationToCloud() {
4664
guard !isCancelled else {
4765
mutationOperation?.cancel()
@@ -51,14 +69,6 @@ class SyncMutationToCloudOperation: Operation {
5169
}
5270

5371
log.debug(#function)
54-
guard let api = api else {
55-
// TODO: This should be part of our error handling routines
56-
log.error("\(#function): API unexpectedly nil")
57-
let apiError = APIError.unknown("API unexpectedly nil", "")
58-
finish(result: .failed(apiError))
59-
return
60-
}
61-
6272
guard let mutationType = GraphQLMutationType(rawValue: mutationEvent.mutationType) else {
6373
let dataStoreError = DataStoreError.decodingError(
6474
"Invalid mutation type",
@@ -67,11 +77,19 @@ class SyncMutationToCloudOperation: Operation {
6777
match any known GraphQL mutation type. Ensure you only send valid mutation types:
6878
\(GraphQLMutationType.allCases)
6979
"""
70-
)
80+
)
7181
log.error(error: dataStoreError)
82+
let apiError = APIError.unknown("Invalid mutation type", "", dataStoreError)
83+
finish(result: .failed(apiError))
7284
return
7385
}
7486

87+
if let apiRequest = createAPIRequest(mutationType: mutationType) {
88+
makeAPIRequest(apiRequest)
89+
}
90+
}
91+
92+
func createAPIRequest(mutationType: GraphQLMutationType) -> GraphQLRequest<MutationSync<AnyModel>>? {
7593
let request: GraphQLRequest<MutationSync<AnyModel>>
7694
do {
7795
if mutationType == .delete {
@@ -82,11 +100,21 @@ class SyncMutationToCloudOperation: Operation {
82100
} catch {
83101
let apiError = APIError.unknown("Couldn't decode model", "", error)
84102
finish(result: .failed(apiError))
85-
return
103+
return nil
86104
}
105+
return request
106+
}
87107

88-
log.verbose("\(#function) sending mutation with sync data: \(request)")
89-
mutationOperation = api.mutate(request: request) { asyncEvent in
108+
func makeAPIRequest(_ apiRequest: GraphQLRequest<MutationSync<AnyModel>>) {
109+
guard let api = api else {
110+
// TODO: This should be part of our error handling routines
111+
log.error("\(#function): API unexpectedly nil")
112+
let apiError = APIError.unknown("API unexpectedly nil", "")
113+
finish(result: .failed(apiError))
114+
return
115+
}
116+
log.verbose("\(#function) sending mutation with sync data: \(apiRequest)")
117+
mutationOperation = api.mutate(request: apiRequest) { asyncEvent in
90118
self.log.verbose("sendMutationToCloud received asyncEvent: \(asyncEvent)")
91119
self.validateResponseFromCloud(asyncEvent: asyncEvent)
92120
}
@@ -126,7 +154,17 @@ class SyncMutationToCloudOperation: Operation {
126154
return
127155
}
128156

129-
// TODO: Wire in actual event validation and retriability
157+
if case .failed(let error) = asyncEvent {
158+
let advice = getRetryAdviceIfRetryable(error: error)
159+
if advice.shouldRetry {
160+
self.scheduleRetry(advice: advice)
161+
} else {
162+
self.finish(result: .failed(error))
163+
}
164+
return
165+
}
166+
167+
// TODO: Wire in actual event validation
130168

131169
// This doesn't belong here--need to add a `delete` API to the MutationEventSource and pass a
132170
// reference into the mutation queue.
@@ -139,7 +177,36 @@ class SyncMutationToCloudOperation: Operation {
139177
self.finish(result: asyncEvent)
140178
}
141179
}
180+
}
181+
182+
private func getRetryAdviceIfRetryable(error: APIError) -> RequestRetryAdvice {
183+
var advice = RequestRetryAdvice(shouldRetry: false, retryInterval: DispatchTimeInterval.never)
184+
185+
switch error {
186+
case .networkError(_, _, let error):
187+
//currently expecting APIOperationResponse to be an URLError
188+
let urlError = error as? URLError
189+
advice = requestRetryablePolicy.retryRequestAdvice(urlError: urlError,
190+
httpURLResponse: nil,
191+
attemptNumber: currentAttemptNumber)
192+
case .httpStatusError(_, let httpURLResponse):
193+
advice = requestRetryablePolicy.retryRequestAdvice(urlError: nil,
194+
httpURLResponse: httpURLResponse,
195+
attemptNumber: currentAttemptNumber)
196+
default:
197+
break
198+
}
199+
return advice
200+
}
142201

202+
private func scheduleRetry(advice: RequestRetryAdvice) {
203+
log.verbose("\(#function) scheduling retry for mutation")
204+
mutationRetryNotifier = MutationRetryNotifier(advice: advice,
205+
networkReachabilityPublisher: networkReachabilityPublisher) {
206+
self.sendMutationToCloud()
207+
self.mutationRetryNotifier = nil
208+
}
209+
currentAttemptNumber += 1
143210
}
144211

145212
private func finish(result: AsyncEvent<Void, GraphQLResponse<MutationSync<AnyModel>>, APIError>) {
@@ -152,4 +219,5 @@ class SyncMutationToCloudOperation: Operation {
152219
}
153220
}
154221

222+
@available(iOS 13.0, *)
155223
extension SyncMutationToCloudOperation: DefaultLogger { }

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/NetworkReachabilityNotifier.swift

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ class NetworkReachabilityNotifier {
1919
private var reachability: NetworkReachabilityProviding?
2020
private var allowsCellularAccess = true
2121

22-
let reachabilityPublisher = PassthroughSubject<ReachabilityUpdate, Never>()
22+
let reachabilityPublisher = PassthroughSubject<ReachabilityUpdate, DataStoreError>()
23+
var publisher: AnyPublisher<ReachabilityUpdate, DataStoreError> {
24+
return reachabilityPublisher.eraseToAnyPublisher()
25+
}
2326

2427
public init(host: String,
2528
allowsCellularAccess: Bool,
26-
reachabilityFactory: NetworkReachabilityProvidingFactory.Type = Reachability.self) {
29+
reachabilityFactory: NetworkReachabilityProvidingFactory.Type) {
2730
self.reachability = reachabilityFactory.make(for: host)
2831
self.allowsCellularAccess = allowsCellularAccess
2932

@@ -43,12 +46,7 @@ class NetworkReachabilityNotifier {
4346
deinit {
4447
reachability?.stopNotifier()
4548
NotificationCenter.default.removeObserver(self)
46-
reachabilityPublisher.send(completion: Subscribers.Completion<Never>.finished)
47-
}
48-
49-
func publisher() -> AnyPublisher<ReachabilityUpdate, Never> {
50-
return reachabilityPublisher
51-
.eraseToAnyPublisher()
49+
reachabilityPublisher.send(completion: Subscribers.Completion<DataStoreError>.finished)
5250
}
5351

5452
// MARK: - Notifications
@@ -67,7 +65,7 @@ class NetworkReachabilityNotifier {
6765
isReachable = false
6866
}
6967

70-
let reachabilityMessageUpdate = ReachabilityUpdate(isOnline: isReachable)
68+
let reachabilityMessageUpdate = ReachabilityUpdate(isOnline: isReachable)
7169
reachabilityPublisher.send(reachabilityMessageUpdate)
7270
}
7371

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RequestRetryable.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ import Foundation
99

1010
struct RequestRetryAdvice {
1111
let shouldRetry: Bool
12-
let retryInterval: DispatchTimeInterval?
12+
let retryInterval: DispatchTimeInterval
13+
init(shouldRetry: Bool,
14+
retryInterval: DispatchTimeInterval = .seconds(60)) {
15+
self.shouldRetry = shouldRetry
16+
self.retryInterval = retryInterval
17+
}
1318

1419
}
1520

0 commit comments

Comments
 (0)