Skip to content

Commit ec7d7e9

Browse files
committed
Add dispatch queue to User Executor
* Synchronize access to the request queues * Add test `testUserExecutorConcurrency` that reproduced crash that is then fixed by the changes here. * Note that concurrent access in this executor should be mitigated already by the executor only sending one request at a time.
1 parent 6535737 commit ec7d7e9

File tree

2 files changed

+163
-112
lines changed

2 files changed

+163
-112
lines changed

iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift

Lines changed: 124 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -37,84 +37,90 @@ class OSUserExecutor {
3737
static var userRequestQueue: [OSUserRequest] = []
3838
static var transferSubscriptionRequestQueue: [OSRequestTransferSubscription] = []
3939

40+
// The User executor dispatch queue, serial. This synchronizes access to the request queues.
41+
private static let dispatchQueue = DispatchQueue(label: "OneSignal.OSUserExecutor", target: .global())
42+
4043
// Read in requests from the cache, do not read in FetchUser requests as this is not needed.
4144
static func start() {
42-
var userRequestQueue: [OSUserRequest] = []
43-
44-
// Read unfinished Create User + Identify User + Get Identity By Subscription requests from cache, if any...
45-
if let cachedRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSUserRequest] {
46-
// Hook each uncached Request to the right model reference
47-
for request in cachedRequestQueue {
48-
if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let req = request as? OSRequestFetchIdentityBySubscription {
49-
if let identityModel = getIdentityModel(req.identityModel.modelId) {
50-
// 1. The model exist in the repo, set it to be the Request's model
51-
// It is the current user or the model has already been processed
52-
req.identityModel = identityModel
53-
} else {
54-
// 2. The model do not exist, use the model on the request, and add to repo.
55-
addIdentityModel(req.identityModel)
56-
}
57-
userRequestQueue.append(req)
58-
59-
} else if request.isKind(of: OSRequestCreateUser.self), let req = request as? OSRequestCreateUser {
60-
if let identityModel = getIdentityModel(req.identityModel.modelId) {
61-
// 1. The model exist in the repo, set it to be the Request's model
62-
req.identityModel = identityModel
63-
} else {
64-
// 2. The models do not exist, use the model on the request, and add to repo.
65-
addIdentityModel(req.identityModel)
66-
}
67-
userRequestQueue.append(req)
68-
69-
} else if request.isKind(of: OSRequestIdentifyUser.self), let req = request as? OSRequestIdentifyUser {
70-
71-
if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId),
72-
let identityModelToUpdate = getIdentityModel(req.identityModelToUpdate.modelId) {
73-
// 1. Both models exist in the repo, set it to be the Request's models
74-
req.identityModelToIdentify = identityModelToIdentify
75-
req.identityModelToUpdate = identityModelToUpdate
76-
} else if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId),
77-
getIdentityModel(req.identityModelToUpdate.modelId) == nil {
78-
// 2. A model is in the repo, the other model does not exist
79-
req.identityModelToIdentify = identityModelToIdentify
80-
addIdentityModel(req.identityModelToUpdate)
81-
} else {
82-
// 3. Both models don't exist yet
83-
// Drop the request if the identityModelToIdentify does not already exist AND the request is missing OSID
84-
// Otherwise, this request will forever fail `prepareForExecution` and block pending requests such as recovery calls to `logout` or `login`
85-
guard request.prepareForExecution() else {
86-
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)")
87-
continue
45+
self.dispatchQueue.async {
46+
var userRequestQueue: [OSUserRequest] = []
47+
48+
// Read unfinished Create User + Identify User + Get Identity By Subscription requests from cache, if any...
49+
if let cachedRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSUserRequest] {
50+
// Hook each uncached Request to the right model reference
51+
for request in cachedRequestQueue {
52+
if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let req = request as? OSRequestFetchIdentityBySubscription {
53+
if let identityModel = getIdentityModel(req.identityModel.modelId) {
54+
// 1. The model exist in the repo, set it to be the Request's model
55+
// It is the current user or the model has already been processed
56+
req.identityModel = identityModel
57+
} else {
58+
// 2. The model do not exist, use the model on the request, and add to repo.
59+
addIdentityModel(req.identityModel)
60+
}
61+
userRequestQueue.append(req)
62+
63+
} else if request.isKind(of: OSRequestCreateUser.self), let req = request as? OSRequestCreateUser {
64+
if let identityModel = getIdentityModel(req.identityModel.modelId) {
65+
// 1. The model exist in the repo, set it to be the Request's model
66+
req.identityModel = identityModel
67+
} else {
68+
// 2. The models do not exist, use the model on the request, and add to repo.
69+
addIdentityModel(req.identityModel)
70+
}
71+
userRequestQueue.append(req)
72+
73+
} else if request.isKind(of: OSRequestIdentifyUser.self), let req = request as? OSRequestIdentifyUser {
74+
75+
if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId),
76+
let identityModelToUpdate = getIdentityModel(req.identityModelToUpdate.modelId) {
77+
// 1. Both models exist in the repo, set it to be the Request's models
78+
req.identityModelToIdentify = identityModelToIdentify
79+
req.identityModelToUpdate = identityModelToUpdate
80+
} else if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId),
81+
getIdentityModel(req.identityModelToUpdate.modelId) == nil {
82+
// 2. A model is in the repo, the other model does not exist
83+
req.identityModelToIdentify = identityModelToIdentify
84+
addIdentityModel(req.identityModelToUpdate)
85+
} else {
86+
// 3. Both models don't exist yet
87+
// Drop the request if the identityModelToIdentify does not already exist AND the request is missing OSID
88+
// Otherwise, this request will forever fail `prepareForExecution` and block pending requests such as recovery calls to `logout` or `login`
89+
guard request.prepareForExecution() else {
90+
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)")
91+
continue
92+
}
93+
addIdentityModel(req.identityModelToIdentify)
94+
addIdentityModel(req.identityModelToUpdate)
8895
}
89-
addIdentityModel(req.identityModelToIdentify)
90-
addIdentityModel(req.identityModelToUpdate)
96+
userRequestQueue.append(req)
9197
}
92-
userRequestQueue.append(req)
9398
}
9499
}
95-
}
96-
self.userRequestQueue = userRequestQueue
97-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue)
98-
// Read unfinished Transfer Subscription requests from cache, if any...
99-
if let transferSubscriptionRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSRequestTransferSubscription] {
100-
// We only care about the last transfer subscription request
101-
if let request = transferSubscriptionRequestQueue.last {
102-
// Hook the uncached Request to the model in the store
103-
if request.subscriptionModel.modelId == OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel.modelId {
104-
// The model exist, set it to be the Request's model
105-
request.subscriptionModel = OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel
106-
self.transferSubscriptionRequestQueue = [request]
107-
} else if !request.prepareForExecution() {
108-
// The model do not exist AND this request cannot be sent, drop this Request
109-
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)")
110-
self.transferSubscriptionRequestQueue = []
100+
self.userRequestQueue = userRequestQueue
101+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue)
102+
// Read unfinished Transfer Subscription requests from cache, if any...
103+
if let transferSubscriptionRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSRequestTransferSubscription] {
104+
// We only care about the last transfer subscription request
105+
if let request = transferSubscriptionRequestQueue.last {
106+
// Hook the uncached Request to the model in the store
107+
if request.subscriptionModel.modelId == OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel.modelId {
108+
// The model exist, set it to be the Request's model
109+
request.subscriptionModel = OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel
110+
self.transferSubscriptionRequestQueue = [request]
111+
} else if !request.prepareForExecution() {
112+
// The model do not exist AND this request cannot be sent, drop this Request
113+
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)")
114+
self.transferSubscriptionRequestQueue = []
115+
}
111116
}
117+
} else {
118+
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor error encountered reading from cache for \(OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY)")
112119
}
113-
} else {
114-
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor error encountered reading from cache for \(OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY)")
120+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue)
121+
122+
executePendingRequests()
115123
}
116-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue)
117-
executePendingRequests()
118124
}
119125

120126
static private func getIdentityModel(_ modelId: String) -> OSIdentityModel? {
@@ -126,61 +132,67 @@ class OSUserExecutor {
126132
}
127133

128134
static func appendToQueue(_ request: OSUserRequest) {
129-
if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription {
130-
self.transferSubscriptionRequestQueue.append(req)
131-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue)
132-
} else {
133-
self.userRequestQueue.append(request)
134-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue)
135+
self.dispatchQueue.async {
136+
if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription {
137+
self.transferSubscriptionRequestQueue.append(req)
138+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue)
139+
} else {
140+
self.userRequestQueue.append(request)
141+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue)
142+
}
135143
}
136144
}
137145

138146
static func removeFromQueue(_ request: OSUserRequest) {
139-
if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription {
140-
transferSubscriptionRequestQueue.removeAll(where: { $0 == req})
141-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue)
142-
} else {
143-
userRequestQueue.removeAll(where: { $0 == request})
144-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue)
147+
self.dispatchQueue.async {
148+
if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription {
149+
transferSubscriptionRequestQueue.removeAll(where: { $0 == req})
150+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue)
151+
} else {
152+
userRequestQueue.removeAll(where: { $0 == request})
153+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue)
154+
}
145155
}
146156
}
147157

148158
static func executePendingRequests() {
149-
let requestQueue: [OSUserRequest] = userRequestQueue + transferSubscriptionRequestQueue
150-
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSUserExecutor.executePendingRequests called with queue \(requestQueue)")
151-
152-
if requestQueue.isEmpty {
153-
return
154-
}
159+
self.dispatchQueue.async {
160+
let requestQueue: [OSUserRequest] = userRequestQueue + transferSubscriptionRequestQueue
161+
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSUserExecutor.executePendingRequests called with queue \(requestQueue)")
155162

156-
// Sort the requestQueue by timestamp
157-
for request in requestQueue.sorted(by: { first, second in
158-
return first.timestamp < second.timestamp
159-
}) {
160-
// Return as soon as we reach an un-executable request
161-
if !request.prepareForExecution() {
162-
OneSignalLog.onesignalLog(.LL_WARN, message: "OSUserExecutor.executePendingRequests() is blocked by unexecutable request \(request)")
163+
if requestQueue.isEmpty {
163164
return
164165
}
165166

166-
if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let fetchIdentityRequest = request as? OSRequestFetchIdentityBySubscription {
167-
executeFetchIdentityBySubscriptionRequest(fetchIdentityRequest)
168-
return
169-
} else if request.isKind(of: OSRequestCreateUser.self), let createUserRequest = request as? OSRequestCreateUser {
170-
executeCreateUserRequest(createUserRequest)
171-
return
172-
} else if request.isKind(of: OSRequestIdentifyUser.self), let identifyUserRequest = request as? OSRequestIdentifyUser {
173-
executeIdentifyUserRequest(identifyUserRequest)
174-
return
175-
} else if request.isKind(of: OSRequestTransferSubscription.self), let transferSubscriptionRequest = request as? OSRequestTransferSubscription {
176-
executeTransferPushSubscriptionRequest(transferSubscriptionRequest)
177-
return
178-
} else if request.isKind(of: OSRequestFetchUser.self), let fetchUserRequest = request as? OSRequestFetchUser {
179-
executeFetchUserRequest(fetchUserRequest)
180-
return
181-
} else {
182-
// Log Error
183-
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor met incompatible Request type that cannot be executed.")
167+
// Sort the requestQueue by timestamp
168+
for request in requestQueue.sorted(by: { first, second in
169+
return first.timestamp < second.timestamp
170+
}) {
171+
// Return as soon as we reach an un-executable request
172+
if !request.prepareForExecution() {
173+
OneSignalLog.onesignalLog(.LL_WARN, message: "OSUserExecutor.executePendingRequests() is blocked by unexecutable request \(request)")
174+
return
175+
}
176+
177+
if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let fetchIdentityRequest = request as? OSRequestFetchIdentityBySubscription {
178+
executeFetchIdentityBySubscriptionRequest(fetchIdentityRequest)
179+
return
180+
} else if request.isKind(of: OSRequestCreateUser.self), let createUserRequest = request as? OSRequestCreateUser {
181+
executeCreateUserRequest(createUserRequest)
182+
return
183+
} else if request.isKind(of: OSRequestIdentifyUser.self), let identifyUserRequest = request as? OSRequestIdentifyUser {
184+
executeIdentifyUserRequest(identifyUserRequest)
185+
return
186+
} else if request.isKind(of: OSRequestTransferSubscription.self), let transferSubscriptionRequest = request as? OSRequestTransferSubscription {
187+
executeTransferPushSubscriptionRequest(transferSubscriptionRequest)
188+
return
189+
} else if request.isKind(of: OSRequestFetchUser.self), let fetchUserRequest = request as? OSRequestFetchUser {
190+
executeFetchUserRequest(fetchUserRequest)
191+
return
192+
} else {
193+
// Log Error
194+
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor met incompatible Request type that cannot be executed.")
195+
}
184196
}
185197
}
186198
}

0 commit comments

Comments
 (0)