Skip to content

Commit 6535737

Browse files
committed
Add dispatch queue to Identity Executor
* Synchronize access to the delta queue and request queues * Add test `testIdentityExecutorConcurrency` that reproduced crash that is then fixed by the changes here.
1 parent 63140d4 commit 6535737

File tree

2 files changed

+127
-71
lines changed

2 files changed

+127
-71
lines changed

iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSIdentityOperationExecutor.swift

Lines changed: 88 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class OSIdentityOperationExecutor: OSOperationExecutor {
3535
var addRequestQueue: [OSRequestAddAliases] = []
3636
var removeRequestQueue: [OSRequestRemoveAlias] = []
3737

38+
// The Identity executor dispatch queue, serial. This synchronizes access to the delta and request queues.
39+
private let dispatchQueue = DispatchQueue(label: "OneSignal.OSIdentityOperationExecutor", target: .global())
40+
3841
init() {
3942
// Read unfinished deltas from cache, if any...
4043
if var deltaQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, defaultValue: []) as? [OSDelta] {
@@ -101,53 +104,60 @@ class OSIdentityOperationExecutor: OSOperationExecutor {
101104
}
102105

103106
func enqueueDelta(_ delta: OSDelta) {
104-
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor enqueueDelta: \(delta)")
105-
deltaQueue.append(delta)
107+
self.dispatchQueue.async {
108+
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor enqueueDelta: \(delta)")
109+
self.deltaQueue.append(delta)
110+
}
106111
}
107112

108113
func cacheDeltaQueue() {
109-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
114+
self.dispatchQueue.async {
115+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
116+
}
110117
}
111118

112119
func processDeltaQueue(inBackground: Bool) {
113-
if !deltaQueue.isEmpty {
114-
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor processDeltaQueue with queue: \(deltaQueue)")
115-
}
116-
for delta in deltaQueue {
117-
guard let model = delta.model as? OSIdentityModel,
118-
let aliases = delta.value as? [String: String]
119-
else {
120-
// Log error
121-
continue
120+
self.dispatchQueue.async {
121+
if !self.deltaQueue.isEmpty {
122+
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor processDeltaQueue with queue: \(self.deltaQueue)")
122123
}
124+
for delta in self.deltaQueue {
125+
guard let model = delta.model as? OSIdentityModel,
126+
let aliases = delta.value as? [String: String]
127+
else {
128+
// Log error
129+
continue
130+
}
123131

124-
switch delta.name {
125-
case OS_ADD_ALIAS_DELTA:
126-
let request = OSRequestAddAliases(aliases: aliases, identityModel: model)
127-
addRequestQueue.append(request)
132+
switch delta.name {
133+
case OS_ADD_ALIAS_DELTA:
134+
let request = OSRequestAddAliases(aliases: aliases, identityModel: model)
135+
self.addRequestQueue.append(request)
128136

129-
case OS_REMOVE_ALIAS_DELTA:
130-
for (label, _) in aliases {
131-
let request = OSRequestRemoveAlias(labelToRemove: label, identityModel: model)
132-
removeRequestQueue.append(request)
133-
}
137+
case OS_REMOVE_ALIAS_DELTA:
138+
for (label, _) in aliases {
139+
let request = OSRequestRemoveAlias(labelToRemove: label, identityModel: model)
140+
self.removeRequestQueue.append(request)
141+
}
134142

135-
default:
136-
OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSIdentityOperationExecutor met incompatible OSDelta type: \(delta)")
143+
default:
144+
OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSIdentityOperationExecutor met incompatible OSDelta type: \(delta)")
145+
}
137146
}
138-
}
139147

140-
self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue
148+
self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue
141149

142-
// persist executor's requests (including new request) to storage
143-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
144-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
150+
// persist executor's requests (including new request) to storage
151+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
152+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
145153

146-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead?
154+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead?
147155

148-
processRequestQueue(inBackground: inBackground)
156+
self.processRequestQueue(inBackground: inBackground)
157+
}
149158
}
150159

160+
/// This method is called by `processDeltaQueue` only and does not need to be added to the dispatchQueue.
151161
func processRequestQueue(inBackground: Bool) {
152162
let requestQueue: [OneSignalRequest] = addRequestQueue + removeRequestQueue
153163

@@ -188,38 +198,42 @@ class OSIdentityOperationExecutor: OSOperationExecutor {
188198
OneSignalCoreImpl.sharedClient().execute(request) { _ in
189199
// No hydration from response
190200
// On success, remove request from cache
191-
self.addRequestQueue.removeAll(where: { $0 == request})
192-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
193-
if inBackground {
194-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
201+
self.dispatchQueue.async {
202+
self.addRequestQueue.removeAll(where: { $0 == request})
203+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
204+
if inBackground {
205+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
206+
}
195207
}
196208
} onFailure: { error in
197209
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSIdentityOperationExecutor add aliases request failed with error: \(error.debugDescription)")
198-
if let nsError = error as? NSError {
199-
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
200-
if responseType == .missing {
201-
// Remove from cache and queue
202-
self.addRequestQueue.removeAll(where: { $0 == request})
203-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
204-
// Logout if the user in the SDK is the same
205-
guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel)
206-
else {
207-
if inBackground {
208-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
210+
self.dispatchQueue.async {
211+
if let nsError = error as? NSError {
212+
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
213+
if responseType == .missing {
214+
// Remove from cache and queue
215+
self.addRequestQueue.removeAll(where: { $0 == request})
216+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
217+
// Logout if the user in the SDK is the same
218+
guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel)
219+
else {
220+
if inBackground {
221+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
222+
}
223+
return
209224
}
210-
return
225+
// The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model
226+
OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil
227+
OneSignalUserManagerImpl.sharedInstance._logout()
228+
} else if responseType != .retryable {
229+
// Fail, no retry, remove from cache and queue
230+
self.addRequestQueue.removeAll(where: { $0 == request})
231+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
211232
}
212-
// The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model
213-
OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil
214-
OneSignalUserManagerImpl.sharedInstance._logout()
215-
} else if responseType != .retryable {
216-
// Fail, no retry, remove from cache and queue
217-
self.addRequestQueue.removeAll(where: { $0 == request})
218-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue)
219233
}
220-
}
221-
if inBackground {
222-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
234+
if inBackground {
235+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
236+
}
223237
}
224238
}
225239
}
@@ -243,25 +257,28 @@ class OSIdentityOperationExecutor: OSOperationExecutor {
243257
OneSignalCoreImpl.sharedClient().execute(request) { _ in
244258
// There is nothing to hydrate
245259
// On success, remove request from cache
246-
self.removeRequestQueue.removeAll(where: { $0 == request})
247-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
248-
if inBackground {
249-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
260+
self.dispatchQueue.async {
261+
self.removeRequestQueue.removeAll(where: { $0 == request})
262+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
263+
if inBackground {
264+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
265+
}
250266
}
251267
} onFailure: { error in
252268
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSIdentityOperationExecutor remove alias request failed with error: \(error.debugDescription)")
253-
254-
if let nsError = error as? NSError {
255-
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
256-
if responseType != .retryable {
257-
// Fail, no retry, remove from cache and queue
258-
// A response of .missing could mean the alias doesn't exist on this user OR this user has been deleted
259-
self.removeRequestQueue.removeAll(where: { $0 == request})
260-
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
269+
self.dispatchQueue.async {
270+
if let nsError = error as? NSError {
271+
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
272+
if responseType != .retryable {
273+
// Fail, no retry, remove from cache and queue
274+
// A response of .missing could mean the alias doesn't exist on this user OR this user has been deleted
275+
self.removeRequestQueue.removeAll(where: { $0 == request})
276+
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue)
277+
}
278+
}
279+
if inBackground {
280+
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
261281
}
262-
}
263-
if inBackground {
264-
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
265282
}
266283
}
267284
}

iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,45 @@ final class OneSignalUserTests: XCTestCase {
136136
XCTAssertTrue(client.allRequestsHandled)
137137
}
138138

139+
/**
140+
This test reproduces a crash in the Identity Executor.
141+
It is possible for two threads to modify and cache queues concurrently.
142+
*/
143+
func testIdentityExecutorConcurrency() throws {
144+
/* Setup */
145+
let client = MockOneSignalClient()
146+
let aliases = [UUID().uuidString: "id"]
147+
148+
OneSignalCoreImpl.setSharedClient(client)
149+
MockUserRequests.setAddAliasesResponse(with: client, aliases: aliases)
150+
151+
let executor = OSIdentityOperationExecutor()
152+
OSOperationRepo.sharedInstance.addExecutor(executor)
153+
154+
/* When */
155+
156+
DispatchQueue.concurrentPerform(iterations: 50) { _ in
157+
// 1. Enqueue Add Alias Deltas to the Operation Repo
158+
OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_ADD_ALIAS_DELTA, identityModelId: UUID().uuidString, model: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer()), property: "aliases", value: aliases))
159+
OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_ADD_ALIAS_DELTA, identityModelId: UUID().uuidString, model: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer()), property: "aliases", value: aliases))
160+
161+
// 2. Flush Operation Repo
162+
OSOperationRepo.sharedInstance.addFlushDeltaQueueToDispatchQueue()
163+
164+
// 3. Simulate updating the executor's request queue from a network response
165+
executor.executeAddAliasesRequest(OSRequestAddAliases(aliases: aliases, identityModel: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer())), inBackground: false)
166+
executor.executeAddAliasesRequest(OSRequestAddAliases(aliases: aliases, identityModel: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer())), inBackground: false)
167+
}
168+
169+
// 4. Run background threads
170+
OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5)
171+
172+
/* Then */
173+
// Previously caused crash: signal SIGABRT - malloc: double free for ptr
174+
// Assert that every request SDK makes has a response set, and is handled
175+
XCTAssertTrue(client.allRequestsHandled)
176+
}
177+
139178
/**
140179
This test reproduced a crash when the property model is being encoded.
141180
*/

0 commit comments

Comments
 (0)