Skip to content

Commit c4bb71d

Browse files
authored
Fix remaining issues in async/sync mode switching. (#270)
* Fix asyncAndWait call differences * confused compiler. * Give it another try * Fix linux behavior diff * updated tests * Updated flushQueue entry scheduling. * Slight revision to group operation * Remove usage of notify(), too unpredictable. * Final version i hope. :S * Test
1 parent a06dca7 commit c4bb71d

File tree

5 files changed

+122
-35
lines changed

5 files changed

+122
-35
lines changed

Sources/Segment/Analytics.swift

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -212,40 +212,49 @@ extension Analytics {
212212
flushGroup.enter()
213213

214214
apply { plugin in
215+
// we want to enter as soon as possible. waiting to do it from
216+
// another queue just takes too long.
215217
operatingMode.run(queue: configuration.values.flushQueue) {
216218
if let p = plugin as? FlushCompletion {
217-
// this is async
218-
// flush(group:completion:) handles the enter/leave.
219+
// flush handles the groups enter/leave calls
219220
p.flush(group: flushGroup) { plugin in
220221
// we don't really care about the plugin value .. yet.
221222
}
222223
} else if let p = plugin as? EventPlugin {
223-
// we have no idea if this will be async or not, assume it's sync.
224224
flushGroup.enter()
225+
// we have no idea if this will be async or not, assume it's sync.
225226
p.flush()
226227
flushGroup.leave()
227228
}
228229
}
229230
}
230231

231-
// if we're not in server mode, we need to be notified when it's done.
232-
if let completion, operatingMode != .synchronous {
233-
// set up our callback to know when the group has completed, if we're not
234-
// in .server operating mode.
235-
flushGroup.notify(queue: configuration.values.flushQueue) {
236-
DispatchQueue.main.async { completion() }
237-
}
238-
}
239-
240232
flushGroup.leave() // matches our initial enter().
241233

242-
// if we ARE in server mode, we need to wait on the group.
234+
// if we ARE in sync mode, we need to wait on the group.
243235
// This effectively ends up being a `sync` operation.
244236
if operatingMode == .synchronous {
245237
flushGroup.wait()
246238
// we need to call completion on our own since
247-
// we skipped setting up notify.
248-
if let completion { DispatchQueue.main.async { completion() }}
239+
// we skipped setting up notify. we don't need to do it on
240+
// .main since we are in synchronous mode.
241+
if let completion { completion() }
242+
} else if operatingMode == .asynchronous {
243+
// if we're not, flip over to our serial queue, tell it to wait on the flush
244+
// group to complete if we have a completion to hit. Otherwise, no need to
245+
// wait on completion.
246+
if let completion {
247+
// NOTE: DispatchGroup's `notify` method on linux ended up getting called
248+
// before the tasks have actually completed, so we went with this instead.
249+
OperatingMode.defaultQueue.async { [weak self] in
250+
let timedOut = flushGroup.wait(timeout: .now() + 15 /*seconds*/)
251+
if timedOut == .timedOut {
252+
self?.log(message: "flush(completion:) timed out waiting for completion.")
253+
}
254+
completion()
255+
//DispatchQueue.main.async { completion() }
256+
}
257+
}
249258
}
250259
}
251260

@@ -437,16 +446,11 @@ extension OperatingMode {
437446
task()
438447
}
439448
case .synchronous:
440-
// if for some reason, we're told to do all this stuff on
441-
// main, ignore it, and use the default queue. this prevents
442-
// a possible deadlock.
443-
if queue === DispatchQueue.main {
444-
OperatingMode.defaultQueue.asyncAndWait {
445-
task()
446-
}
447-
} else {
448-
queue.asyncAndWait { task() }
449-
}
449+
// in synchronous mode, always use our own queue to
450+
// prevent deadlocks.
451+
let workItem = DispatchWorkItem(block: task)
452+
OperatingMode.defaultQueue.asyncAndWait(execute: workItem)
450453
}
451454
}
452455
}
456+

Sources/Segment/Plugins/SegmentDestination.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,11 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
124124
// don't flush if analytics is disabled.
125125
guard analytics.enabled == true else { return }
126126

127+
// enter for the high level flush, allow us time to run through any existing files..
128+
group.enter()
129+
127130
// Read events from file system
128-
guard let data = storage.read(Storage.Constants.events) else { return }
131+
guard let data = storage.read(Storage.Constants.events) else { group.leave(); return }
129132

130133
eventCount = 0
131134
cleanupUploads()
@@ -134,9 +137,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
134137

135138
if pendingUploads == 0 {
136139
for url in data {
137-
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")
138-
// enter the dispatch group
140+
// enter for this url we're going to kick off
139141
group.enter()
142+
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")
140143
// set up the task
141144
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
142145
switch result {
@@ -154,7 +157,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
154157
self.cleanupUploads()
155158
// call the completion
156159
completion(self)
157-
// leave the dispatch group
160+
// leave for the url we kicked off.
158161
group.leave()
159162
}
160163
// we have a legit upload in progress now, so add it to our list.
@@ -165,6 +168,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
165168
} else {
166169
analytics.log(message: "Skipping processing; Uploads in progress.")
167170
}
171+
172+
// leave for the high level flush
173+
group.leave()
168174
}
169175
}
170176

Sources/Segment/Utilities/Utils.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@
77

88
import Foundation
99

10+
#if os(Linux)
11+
extension DispatchQueue {
12+
func asyncAndWait(execute workItem: DispatchWorkItem) {
13+
async {
14+
workItem.perform()
15+
}
16+
workItem.wait()
17+
}
18+
}
19+
#endif
20+
1021
/// Inquire as to whether we are within a Unit Testing environment.
1122
#if DEBUG
1223
internal var isUnitTesting: Bool = {
@@ -58,3 +69,36 @@ extension Optional: Flattenable {
5869
}
5970
}
6071

72+
class TrackingDispatchGroup: CustomStringConvertible {
73+
internal let group = DispatchGroup()
74+
75+
var description: String {
76+
return "DispatchGroup Enters: \(enters), Leaves: \(leaves)"
77+
}
78+
79+
var enters: Int = 0
80+
var leaves: Int = 0
81+
var current: Int = 0
82+
83+
func enter() {
84+
enters += 1
85+
current += 1
86+
group.enter()
87+
}
88+
89+
func leave() {
90+
leaves += 1
91+
current -= 1
92+
group.leave()
93+
}
94+
95+
init() { }
96+
97+
func wait() {
98+
group.wait()
99+
}
100+
101+
public func notify(qos: DispatchQoS = .unspecified, flags: DispatchWorkItemFlags = [], queue: DispatchQueue, execute work: @escaping @convention(block) () -> Void) {
102+
group.notify(qos: qos, flags: flags, queue: queue, execute: work)
103+
}
104+
}

Tests/Segment-Tests/Analytics_Tests.swift

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -692,9 +692,38 @@ final class Analytics_Tests: XCTestCase {
692692

693693
}
694694

695-
func testServerOperatingMode() {
695+
func testAsyncOperatingMode() {
696696
// Use a specific writekey to this test so we do not collide with other cached items.
697-
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode")
697+
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_asyncMode")
698+
.flushInterval(9999)
699+
.flushAt(9999)
700+
.operatingMode(.asynchronous))
701+
702+
waitUntilStarted(analytics: analytics)
703+
704+
analytics.storage.hardReset(doYouKnowHowToUseThis: true)
705+
706+
@Atomic var completionCalled = false
707+
708+
// put an event in the pipe ...
709+
analytics.track(name: "completion test1")
710+
// flush it, that'll get us an upload going
711+
analytics.flush {
712+
// verify completion is called.
713+
completionCalled = true
714+
}
715+
716+
while !completionCalled {
717+
RunLoop.main.run(until: Date.distantPast)
718+
}
719+
720+
XCTAssertTrue(completionCalled)
721+
XCTAssertEqual(analytics.pendingUploads!.count, 0)
722+
}
723+
724+
func testSyncOperatingMode() {
725+
// Use a specific writekey to this test so we do not collide with other cached items.
726+
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_syncMode")
698727
.flushInterval(9999)
699728
.flushAt(9999)
700729
.operatingMode(.synchronous))

Tests/Segment-Tests/StressTests.swift

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ class StressTests: XCTestCase {
6969
let event = "write queue 1: \(eventsWritten)"
7070
analytics.track(name: event)
7171
eventsWritten += 1
72-
usleep(0001)
72+
//usleep(0001)
73+
RunLoop.main.run(until: Date.distantPast)
7374
}
7475
print("queue 1 wrote \(eventsWritten) events.")
7576
queue1Done = true
@@ -82,7 +83,8 @@ class StressTests: XCTestCase {
8283
let event = "write queue 2: \(eventsWritten)"
8384
analytics.track(name: event)
8485
eventsWritten += 1
85-
usleep(0001)
86+
//usleep(0001)
87+
RunLoop.main.run(until: Date.distantPast)
8688
}
8789
print("queue 2 wrote \(eventsWritten) events.")
8890
queue2Done = true
@@ -91,10 +93,12 @@ class StressTests: XCTestCase {
9193
flushQueue.async {
9294
while (ready == false) { usleep(1) }
9395
var counter = 0
94-
sleep(1)
96+
//sleep(1)
97+
RunLoop.main.run(until: Date(timeIntervalSinceNow: 1))
9598
while (queue1Done == false || queue2Done == false) {
9699
let sleepTime = UInt32.random(in: 1..<3000)
97-
usleep(sleepTime)
100+
//usleep(sleepTime)
101+
RunLoop.main.run(until: Date(timeIntervalSinceNow: Double(sleepTime / 1000) ))
98102
analytics.flush()
99103
counter += 1
100104
}

0 commit comments

Comments
 (0)