Skip to content

Commit 63140d4

Browse files
committed
Add dispatch queue to Subscription Executor
* Synchronize access to the delta queue and request queues * Add test `testSubscriptionExecutorConcurrency` that reproduced crash that is then fixed by the changes here.
1 parent f2869a8 commit 63140d4

File tree

3 files changed

+172
-107
lines changed

3 files changed

+172
-107
lines changed

iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSSubscriptionOperationExecutor.swift

Lines changed: 129 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor {
3737
var updateRequestQueue: [OSRequestUpdateSubscription] = []
3838
var subscriptionModels: [String: OSSubscriptionModel] = [:]
3939

40+
// The Subscription executor dispatch queue, serial. This synchronizes access to the delta and request queues.
41+
private let dispatchQueue = DispatchQueue(label: "OneSignal.OSSubscriptionOperationExecutor", target: .global())
42+
4043
init() {
4144
// Read unfinished deltas from cache, if any...
4245
if var deltaQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, defaultValue: []) as? [OSDelta] {
@@ -152,75 +155,84 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor {
152155
}
153156

154157
func enqueueDelta(_ delta: OSDelta) {
155-
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor enqueueDelta: \(delta)")
156-
deltaQueue.append(delta)
158+
self.dispatchQueue.async {
159+
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor enqueueDelta: \(delta)")
160+
self.deltaQueue.append(delta)
161+
}
157162
}
158163

159164
func cacheDeltaQueue() {
160-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
165+
self.dispatchQueue.async {
166+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
167+
}
161168
}
162169

163170
func processDeltaQueue(inBackground: Bool) {
164-
if !deltaQueue.isEmpty {
165-
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor processDeltaQueue with queue: \(deltaQueue)")
166-
}
167-
for delta in deltaQueue {
168-
guard let subModel = delta.model as? OSSubscriptionModel
169-
else {
170-
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)")
171-
continue
171+
self.dispatchQueue.async {
172+
if !self.deltaQueue.isEmpty {
173+
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor processDeltaQueue with queue: \(self.deltaQueue)")
172174
}
175+
for delta in self.deltaQueue {
176+
guard let subModel = delta.model as? OSSubscriptionModel
177+
else {
178+
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)")
179+
continue
180+
}
173181

174-
switch delta.name {
175-
case OS_ADD_SUBSCRIPTION_DELTA:
176-
// Only create the request if the identity model exists
177-
if let identityModel = OneSignalUserManagerImpl.sharedInstance.getIdentityModel(delta.identityModelId) {
178-
let request = OSRequestCreateSubscription(
179-
subscriptionModel: subModel,
180-
identityModel: identityModel
182+
switch delta.name {
183+
case OS_ADD_SUBSCRIPTION_DELTA:
184+
// Only create the request if the identity model exists
185+
if let identityModel = OneSignalUserManagerImpl.sharedInstance.getIdentityModel(delta.identityModelId) {
186+
let request = OSRequestCreateSubscription(
187+
subscriptionModel: subModel,
188+
identityModel: identityModel
189+
)
190+
self.addRequestQueue.append(request)
191+
} else {
192+
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)")
193+
}
194+
case OS_REMOVE_SUBSCRIPTION_DELTA:
195+
let request = OSRequestDeleteSubscription(
196+
subscriptionModel: subModel
181197
)
182-
addRequestQueue.append(request)
183-
} else {
184-
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)")
198+
self.removeRequestQueue.append(request)
199+
200+
case OS_UPDATE_SUBSCRIPTION_DELTA:
201+
let request = OSRequestUpdateSubscription(
202+
subscriptionObject: [delta.property: delta.value],
203+
subscriptionModel: subModel
204+
)
205+
self.updateRequestQueue.append(request)
206+
207+
default:
208+
// Log error
209+
OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSSubscriptionOperationExecutor met incompatible OSDelta type: \(delta).")
185210
}
186-
case OS_REMOVE_SUBSCRIPTION_DELTA:
187-
let request = OSRequestDeleteSubscription(
188-
subscriptionModel: subModel
189-
)
190-
removeRequestQueue.append(request)
191-
192-
case OS_UPDATE_SUBSCRIPTION_DELTA:
193-
let request = OSRequestUpdateSubscription(
194-
subscriptionObject: [delta.property: delta.value],
195-
subscriptionModel: subModel
196-
)
197-
updateRequestQueue.append(request)
198-
199-
default:
200-
// Log error
201-
OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSSubscriptionOperationExecutor met incompatible OSDelta type: \(delta).")
202211
}
203-
}
204212

205-
self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue
213+
self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue
206214

207-
// persist executor's requests (including new request) to storage
208-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
209-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
210-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
215+
// persist executor's requests (including new request) to storage
216+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
217+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
218+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
211219

212-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead?
220+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead?
213221

214-
processRequestQueue(inBackground: inBackground)
222+
self.processRequestQueue(inBackground: inBackground)
223+
}
215224
}
216225

217226
// Bypasses the operation repo to create a push subscription request
218227
func createPushSubscription(subscriptionModel: OSSubscriptionModel, identityModel: OSIdentityModel) {
219228
let request = OSRequestCreateSubscription(subscriptionModel: subscriptionModel, identityModel: identityModel)
220-
addRequestQueue.append(request)
221-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
229+
self.dispatchQueue.async {
230+
self.addRequestQueue.append(request)
231+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
232+
}
222233
}
223234

235+
/// This method is called by `processDeltaQueue` only and does not need to be added to the dispatchQueue.
224236
func processRequestQueue(inBackground: Bool) {
225237
let requestQueue: [OneSignalRequest] = addRequestQueue + removeRequestQueue + updateRequestQueue
226238

@@ -261,46 +273,50 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor {
261273
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor: executeCreateSubscriptionRequest making request: \(request)")
262274
OneSignalCoreImpl.sharedClient().execute(request) { result in
263275
// On success, remove request from cache (even if not hydrating model), and hydrate model
264-
self.addRequestQueue.removeAll(where: { $0 == request})
265-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
266-
267-
guard let response = result?["subscription"] as? [String: Any] else {
268-
OneSignalLog.onesignalLog(.LL_ERROR, message: "Unabled to parse response to create subscription request")
276+
self.dispatchQueue.async {
277+
self.addRequestQueue.removeAll(where: { $0 == request})
278+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
279+
280+
guard let response = result?["subscription"] as? [String: Any] else {
281+
OneSignalLog.onesignalLog(.LL_ERROR, message: "Unabled to parse response to create subscription request")
282+
if inBackground {
283+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
284+
}
285+
return
286+
}
287+
request.subscriptionModel.hydrate(response)
269288
if inBackground {
270289
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
271290
}
272-
return
273-
}
274-
request.subscriptionModel.hydrate(response)
275-
if inBackground {
276-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
277291
}
278292
} onFailure: { error in
279293
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor create subscription request failed with error: \(error.debugDescription)")
280-
if let nsError = error as? NSError {
281-
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
282-
if responseType == .missing {
283-
self.addRequestQueue.removeAll(where: { $0 == request})
284-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
285-
// Logout if the user in the SDK is the same
286-
guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel)
287-
else {
288-
if inBackground {
289-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
294+
self.dispatchQueue.async {
295+
if let nsError = error as? NSError {
296+
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
297+
if responseType == .missing {
298+
self.addRequestQueue.removeAll(where: { $0 == request})
299+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
300+
// Logout if the user in the SDK is the same
301+
guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel)
302+
else {
303+
if inBackground {
304+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
305+
}
306+
return
290307
}
291-
return
308+
// The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model
309+
OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil
310+
OneSignalUserManagerImpl.sharedInstance._logout()
311+
} else if responseType != .retryable {
312+
// Fail, no retry, remove from cache and queue
313+
self.addRequestQueue.removeAll(where: { $0 == request})
314+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
292315
}
293-
// The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model
294-
OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil
295-
OneSignalUserManagerImpl.sharedInstance._logout()
296-
} else if responseType != .retryable {
297-
// Fail, no retry, remove from cache and queue
298-
self.addRequestQueue.removeAll(where: { $0 == request})
299-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
300316
}
301-
}
302-
if inBackground {
303-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
317+
if inBackground {
318+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
319+
}
304320
}
305321
}
306322
}
@@ -324,24 +340,28 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor {
324340
OneSignalCoreImpl.sharedClient().execute(request) { _ in
325341
// On success, remove request from cache. No model hydration occurs.
326342
// For example, if app restarts and we read in operations between sending this off and getting the response
327-
self.removeRequestQueue.removeAll(where: { $0 == request})
328-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
329-
if inBackground {
330-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
343+
self.dispatchQueue.async {
344+
self.removeRequestQueue.removeAll(where: { $0 == request})
345+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
346+
if inBackground {
347+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
348+
}
331349
}
332350
} onFailure: { error in
333351
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor delete subscription request failed with error: \(error.debugDescription)")
334-
if let nsError = error as? NSError {
335-
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
336-
if responseType != .retryable {
337-
// Fail, no retry, remove from cache and queue
338-
// If this request returns a missing status, that is ok as this is a delete request
339-
self.removeRequestQueue.removeAll(where: { $0 == request})
340-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
352+
self.dispatchQueue.async {
353+
if let nsError = error as? NSError {
354+
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
355+
if responseType != .retryable {
356+
// Fail, no retry, remove from cache and queue
357+
// If this request returns a missing status, that is ok as this is a delete request
358+
self.removeRequestQueue.removeAll(where: { $0 == request})
359+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
360+
}
361+
}
362+
if inBackground {
363+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
341364
}
342-
}
343-
if inBackground {
344-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
345365
}
346366
}
347367
}
@@ -363,23 +383,27 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor {
363383
OneSignalCoreImpl.sharedClient().execute(request) { _ in
364384
// On success, remove request from cache. No model hydration occurs.
365385
// For example, if app restarts and we read in operations between sending this off and getting the response
366-
self.updateRequestQueue.removeAll(where: { $0 == request})
367-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
368-
if inBackground {
369-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
386+
self.dispatchQueue.async {
387+
self.updateRequestQueue.removeAll(where: { $0 == request})
388+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
389+
if inBackground {
390+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
391+
}
370392
}
371393
} onFailure: { error in
372394
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor update subscription request failed with error: \(error.debugDescription)")
373-
if let nsError = error as? NSError {
374-
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
375-
if responseType != .retryable {
376-
// Fail, no retry, remove from cache and queue
377-
self.updateRequestQueue.removeAll(where: { $0 == request})
378-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
395+
self.dispatchQueue.async {
396+
if let nsError = error as? NSError {
397+
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
398+
if responseType != .retryable {
399+
// Fail, no retry, remove from cache and queue
400+
self.updateRequestQueue.removeAll(where: { $0 == request})
401+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
402+
}
403+
}
404+
if inBackground {
405+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
379406
}
380-
}
381-
if inBackground {
382-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
383407
}
384408
}
385409
}

0 commit comments

Comments
 (0)