Skip to content

Commit 067fd77

Browse files
bsneedBrandon Sneed
andauthored
Storage and Multi-threading fixes (#91)
* Multithreading & storage fixes * Temporarily disable stress tests until network is mocked out. Co-authored-by: Brandon Sneed <[email protected]>
1 parent 15214bf commit 067fd77

File tree

6 files changed

+282
-178
lines changed

6 files changed

+282
-178
lines changed

Segment.xcodeproj/project.pbxproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
46A018D425E6C9C200F9CCD8 /* LinuxUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018D325E6C9C200F9CCD8 /* LinuxUtils.swift */; };
3939
46A018DA25E97FDF00F9CCD8 /* AppleUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018D925E97FDF00F9CCD8 /* AppleUtils.swift */; };
4040
46A018EE25E9A74F00F9CCD8 /* VendorSystem.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018ED25E9A74F00F9CCD8 /* VendorSystem.swift */; };
41+
46B1AC6927346D3D00846DE8 /* StressTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46B1AC6827346D3D00846DE8 /* StressTests.swift */; };
4142
46E382E72654429A00BA2502 /* Utils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46E382E62654429A00BA2502 /* Utils.swift */; };
4243
46F7485D26C718710042798E /* ObjCAnalytics.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46F7485B26C718710042798E /* ObjCAnalytics.swift */; };
4344
46F7485E26C718710042798E /* ObjCConfiguration.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46F7485C26C718710042798E /* ObjCConfiguration.swift */; };
@@ -125,6 +126,7 @@
125126
46A018D325E6C9C200F9CCD8 /* LinuxUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LinuxUtils.swift; sourceTree = "<group>"; };
126127
46A018D925E97FDF00F9CCD8 /* AppleUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppleUtils.swift; sourceTree = "<group>"; };
127128
46A018ED25E9A74F00F9CCD8 /* VendorSystem.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = VendorSystem.swift; sourceTree = "<group>"; };
129+
46B1AC6827346D3D00846DE8 /* StressTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StressTests.swift; sourceTree = "<group>"; };
128130
46D98E3D26D6FEF300E7A86A /* FlurryDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FlurryDestination.swift; sourceTree = "<group>"; };
129131
46D98E3E26D6FEF300E7A86A /* AdjustDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AdjustDestination.swift; sourceTree = "<group>"; };
130132
46D98E3F26D6FEF300E7A86A /* MixpanelDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MixpanelDestination.swift; sourceTree = "<group>"; };
@@ -377,6 +379,7 @@
377379
46F7485F26C720F60042798E /* ObjC_Tests.swift */,
378380
967C40D9258D472C008EB0B6 /* SegmentLog_Tests.swift */,
379381
46FE4D1C25A7A850003A7362 /* Storage_Tests.swift */,
382+
46B1AC6827346D3D00846DE8 /* StressTests.swift */,
380383
4621082D2609206D00EBC4A8 /* Support */,
381384
96DBF37A26F39B5500724B0B /* Timeline_Tests.swift */,
382385
OBJ_13 /* XCTestManifests.swift */,
@@ -581,6 +584,7 @@
581584
OBJ_30 /* Analytics_Tests.swift in Sources */,
582585
46F7486026C720F60042798E /* ObjC_Tests.swift in Sources */,
583586
OBJ_31 /* XCTestManifests.swift in Sources */,
587+
46B1AC6927346D3D00846DE8 /* StressTests.swift in Sources */,
584588
4658175425BA4C20006B2809 /* HTTPClient_Tests.swift in Sources */,
585589
46210811260538BE00EBC4A8 /* KeyPath_Tests.swift in Sources */,
586590
967C40E3258D4DAF008EB0B6 /* Metrics_Tests.swift in Sources */,

Sources/Segment/Plugins/SegmentDestination.swift

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class SegmentDestination: DestinationPlugin {
4242

4343
private var httpClient: HTTPClient?
4444
private var uploads = [UploadTaskInfo]()
45+
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.segment.com")
4546
private var storage: Storage?
4647

4748
private var apiKey: String? = nil
@@ -162,25 +163,33 @@ extension SegmentDestination {
162163
// lets go through and get rid of any tasks that aren't running.
163164
// either they were suspended because a background task took too
164165
// long, or the os orphaned it due to device constraints (like a watch).
165-
let before = uploads.count
166-
var newPending = uploads
167-
newPending.removeAll { uploadInfo in
168-
let shouldRemove = uploadInfo.task.state != .running
169-
if shouldRemove, let cleanup = uploadInfo.cleanup {
170-
cleanup()
166+
uploadsQueue.sync {
167+
let before = uploads.count
168+
var newPending = uploads
169+
newPending.removeAll { uploadInfo in
170+
let shouldRemove = uploadInfo.task.state != .running
171+
if shouldRemove, let cleanup = uploadInfo.cleanup {
172+
cleanup()
173+
}
174+
return shouldRemove
171175
}
172-
return shouldRemove
176+
uploads = newPending
177+
let after = uploads.count
178+
analytics?.log(message: "Cleaned up \(before - after) non-running uploads.")
173179
}
174-
uploads = newPending
175-
let after = uploads.count
176-
analytics?.log(message: "Cleaned up \(before - after) non-running uploads.")
177180
}
178181

179182
internal var pendingUploads: Int {
180-
return uploads.count
183+
var uploadsCount = 0
184+
uploadsQueue.sync {
185+
uploadsCount = uploads.count
186+
}
187+
return uploadsCount
181188
}
182189

183190
internal func add(uploadTask: UploadTaskInfo) {
184-
uploads.append(uploadTask)
191+
uploadsQueue.sync {
192+
uploads.append(uploadTask)
193+
}
185194
}
186195
}

Sources/Segment/Plugins/StartupQueue.swift

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,21 @@ internal class StartupQueue: Plugin, Subscriber {
2121
}
2222
}
2323

24+
let syncQueue = DispatchQueue(label: "startupQueue.segment.com")
2425
var queuedEvents = [RawEvent]()
2526

2627
required init() { }
2728

2829
func execute<T: RawEvent>(event: T?) -> T? {
2930
if running == false, let e = event {
3031
// timeline hasn't started, so queue it up.
31-
if queuedEvents.count >= Self.maxSize {
32-
// if we've exceeded the max queue size start dropping events
33-
queuedEvents.removeFirst()
32+
syncQueue.sync {
33+
if queuedEvents.count >= Self.maxSize {
34+
// if we've exceeded the max queue size start dropping events
35+
queuedEvents.removeFirst()
36+
}
37+
queuedEvents.append(e)
3438
}
35-
queuedEvents.append(e)
3639
return nil
3740
}
3841
// the timeline has started, so let the event pass.
@@ -50,9 +53,11 @@ extension StartupQueue {
5053

5154
internal func replayEvents() {
5255
// replay the queued events to the instance of Analytics we're working with.
53-
for event in queuedEvents {
54-
analytics?.process(event: event)
56+
syncQueue.sync {
57+
for event in queuedEvents {
58+
analytics?.process(event: event)
59+
}
60+
queuedEvents.removeAll()
5561
}
56-
queuedEvents.removeAll()
5762
}
5863
}

0 commit comments

Comments
 (0)