Skip to content

Commit 7741aa5

Browse files
authored
Added operating mode config option & behavior. (#269)
* Added operatingMode configuration option and logic. * Changed unit-test settings setup to respect operating mode. * Made some startup plugins not auto-load when operating in server mode. * Fixed logging output to be correct. * Updated testPurgeStorage to reflect new flush behavior. * Added test for server operation mode. * Improved flush policy interval test. * Modified equality check to specify pointer value. * Fixed issue on linux w/ recursive sync * Logic update to simplify queueing and protection * updated commentary
1 parent 3fbc48f commit 7741aa5

File tree

9 files changed

+175
-17
lines changed

9 files changed

+175
-17
lines changed

Sources/Segment/Analytics.swift

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ extension Analytics {
146146
return nil
147147
}
148148

149+
/// Returns the current operating mode this instance was given.
150+
public var operatingMode: OperatingMode {
151+
return configuration.values.operatingMode
152+
}
153+
149154
/// Adjusts the flush interval post configuration.
150155
public var flushInterval: TimeInterval {
151156
get {
@@ -196,16 +201,52 @@ extension Analytics {
196201
}
197202

198203
/// Tells this instance of Analytics to flush any queued events up to Segment.com. This command will also
199-
/// be sent to each plugin present in the system.
200-
public func flush() {
204+
/// be sent to each plugin present in the system. A completion handler can be optionally given and will be
205+
/// called when flush has completed.
206+
public func flush(completion: (() -> Void)? = nil) {
201207
// only flush if we're enabled.
202208
guard enabled == true else { return }
203209

210+
let flushGroup = DispatchGroup()
211+
// gotta call enter at least once before we ask to be notified.
212+
flushGroup.enter()
213+
204214
apply { plugin in
205-
if let p = plugin as? EventPlugin {
206-
p.flush()
215+
operatingMode.run(queue: configuration.values.flushQueue) {
216+
if let p = plugin as? FlushCompletion {
217+
// this is async
218+
// flush(group:completion:) handles the enter/leave.
219+
p.flush(group: flushGroup) { plugin in
220+
// we don't really care about the plugin value .. yet.
221+
}
222+
} else if let p = plugin as? EventPlugin {
223+
// we have no idea if this will be async or not, assume it's sync.
224+
flushGroup.enter()
225+
p.flush()
226+
flushGroup.leave()
227+
}
207228
}
208229
}
230+
231+
// if we're not in server mode, we need to be notified when it's done.
232+
if let completion, operatingMode != .synchronous {
233+
// set up our callback to know when the group has completed, if we're not
234+
// in .server operating mode.
235+
flushGroup.notify(queue: configuration.values.flushQueue) {
236+
DispatchQueue.main.async { completion() }
237+
}
238+
}
239+
240+
flushGroup.leave() // matches our initial enter().
241+
242+
// if we ARE in server mode, we need to wait on the group.
243+
// This effectively ends up being a `sync` operation.
244+
if operatingMode == .synchronous {
245+
flushGroup.wait()
246+
// we need to call completion on our own since
247+
// we skipped setting up notify.
248+
if let completion { DispatchQueue.main.async { completion() }}
249+
}
209250
}
210251

211252
/// Resets this instance of Analytics to a clean slate. Traits, UserID's, anonymousId, etc are all cleared or reset. This
@@ -384,3 +425,28 @@ extension Analytics {
384425
return configuration.values.writeKey == Self.deadInstance
385426
}
386427
}
428+
429+
// MARK: Operating mode based scheduling
430+
431+
extension OperatingMode {
432+
func run(queue: DispatchQueue, task: @escaping () -> Void) {
433+
//
434+
switch self {
435+
case .asynchronous:
436+
queue.async {
437+
task()
438+
}
439+
case .synchronous:
440+
// if for some reason, we're told to do all this stuff on
441+
// main, ignore it, and use the default queue. this prevents
442+
// a possible deadlock.
443+
if queue === DispatchQueue.main {
444+
OperatingMode.defaultQueue.asyncAndWait {
445+
task()
446+
}
447+
} else {
448+
queue.asyncAndWait { task() }
449+
}
450+
}
451+
}
452+
}

Sources/Segment/Configuration.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@ import Foundation
1010
import FoundationNetworking
1111
#endif
1212

13+
// MARK: - Operating Mode
14+
/// Specifies the operating mode/context
15+
public enum OperatingMode {
16+
/// The operation of the Analytics client are synchronous.
17+
case synchronous
18+
/// The operation of the Analytics client are asynchronous.
19+
case asynchronous
20+
21+
static internal let defaultQueue = DispatchQueue(label: "com.segment.operatingModeQueue", qos: .utility)
22+
}
23+
1324
// MARK: - Internal Configuration
1425

1526
public class Configuration {
@@ -26,6 +37,9 @@ public class Configuration {
2637
var requestFactory: ((URLRequest) -> URLRequest)? = nil
2738
var errorHandler: ((Error) -> Void)? = nil
2839
var flushPolicies: [FlushPolicy] = [CountBasedFlushPolicy(), IntervalBasedFlushPolicy()]
40+
41+
var operatingMode: OperatingMode = .asynchronous
42+
var flushQueue: DispatchQueue = OperatingMode.defaultQueue
2943
var userAgent: String? = nil
3044
}
3145

@@ -184,6 +198,22 @@ public extension Configuration {
184198
return self
185199
}
186200

201+
/// Informs the Analytics instance of its operating mode/context.
202+
/// Use `.server` when operating in a web service, or when synchronous operation
203+
/// is desired. Use `.client` when operating in a long lived process,
204+
/// desktop/mobile application.
205+
@discardableResult
206+
func operatingMode(_ mode: OperatingMode) -> Configuration {
207+
values.operatingMode = mode
208+
return self
209+
}
210+
211+
/// Specify a custom queue to use when performing a flush operation. The default
212+
/// value is a Segment owned background queue.
213+
@discardableResult
214+
func flushQueue(_ queue: DispatchQueue) -> Configuration {
215+
values.flushQueue = queue
216+
187217
@discardableResult
188218
func userAgent(_ userAgent: String) -> Configuration {
189219
values.userAgent = userAgent

Sources/Segment/Plugins.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ public protocol VersionedPlugin {
6262
static func version() -> String
6363
}
6464

65+
public protocol FlushCompletion {
66+
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
67+
}
68+
6569
// For internal platform-specific bits
6670
internal protocol PlatformPlugin: Plugin { }
6771

Sources/Segment/Plugins/SegmentDestination.swift

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import Sovran
1616
import FoundationNetworking
1717
#endif
1818

19-
public class SegmentDestination: DestinationPlugin, Subscriber {
19+
public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion {
2020
internal enum Constants: String {
2121
case integrationName = "Segment.io"
2222
case apiHost = "apiHost"
@@ -113,6 +113,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
113113
}
114114

115115
public func flush() {
116+
// unused .. see flush(group:completion:)
117+
}
118+
119+
public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
116120
guard let storage = self.storage else { return }
117121
guard let analytics = self.analytics else { return }
118122
guard let httpClient = self.httpClient else { return }
@@ -131,7 +135,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
131135
if pendingUploads == 0 {
132136
for url in data {
133137
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")
134-
138+
// enter the dispatch group
139+
group.enter()
140+
// set up the task
135141
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
136142
switch result {
137143
case .success(_):
@@ -146,6 +152,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
146152
// make sure it gets removed and it's cleanup() called rather
147153
// than waiting on the next flush to come around.
148154
self.cleanupUploads()
155+
// call the completion
156+
completion(self)
157+
// leave the dispatch group
158+
group.leave()
149159
}
150160
// we have a legit upload in progress now, so add it to our list.
151161
if let upload = uploadTask {

Sources/Segment/Settings.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,14 @@ extension Analytics {
131131
// we don't really wanna wait for this network call during tests...
132132
// but we should make it work similarly.
133133
store.dispatch(action: System.ToggleRunningAction(running: false))
134-
DispatchQueue.main.async {
134+
135+
operatingMode.run(queue: DispatchQueue.main) {
135136
if let state: System = self.store.currentState(), let settings = state.settings {
136137
self.store.dispatch(action: System.UpdateSettingsAction(settings: settings))
137138
}
138139
self.store.dispatch(action: System.ToggleRunningAction(running: true))
139140
}
141+
140142
return
141143
}
142144
#endif

Sources/Segment/Startup.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ extension Analytics: Subscriber {
4848
plugins += VendorSystem.current.requiredPlugins
4949

5050
// setup lifecycle if desired
51-
if configuration.values.trackApplicationLifecycleEvents {
51+
if configuration.values.trackApplicationLifecycleEvents, operatingMode != .synchronous {
5252
#if os(iOS) || os(tvOS)
5353
plugins.append(iOSLifecycleEvents())
5454
#endif

Sources/Segment/Utilities/Logging.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88
import Foundation
99

1010
extension Analytics {
11-
internal enum LogKind {
11+
internal enum LogKind: CustomStringConvertible, CustomDebugStringConvertible {
1212
case error
1313
case warning
1414
case debug
1515
case none
1616

17+
var description: String { return string }
18+
var debugDescription: String { return string }
19+
1720
var string: String {
1821
switch self {
1922
case .error:
@@ -23,7 +26,7 @@ extension Analytics {
2326
case .debug:
2427
return "SEG_DEBUG: "
2528
case .none:
26-
return ""
29+
return "SEG_INFO: "
2730
}
2831
}
2932
}

Tests/Segment-Tests/Analytics_Tests.swift

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,10 @@ final class Analytics_Tests: XCTestCase {
457457

458458
func testPurgeStorage() {
459459
// Use a specific writekey to this test so we do not collide with other cached items.
460-
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either").flushInterval(9999).flushAt(9999))
460+
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either")
461+
.flushInterval(9999)
462+
.flushAt(9999)
463+
.operatingMode(.synchronous))
461464

462465
waitUntilStarted(analytics: analytics)
463466

@@ -479,13 +482,13 @@ final class Analytics_Tests: XCTestCase {
479482
analytics.track(name: "test")
480483

481484
var newPendingCount = analytics.pendingUploads!.count
482-
XCTAssertEqual(newPendingCount, 4)
485+
XCTAssertEqual(newPendingCount, 1)
483486

484487
let pending = analytics.pendingUploads!
485488
analytics.purgeStorage(fileURL: pending.first!)
486489

487490
newPendingCount = analytics.pendingUploads!.count
488-
XCTAssertEqual(newPendingCount, 3)
491+
XCTAssertEqual(newPendingCount, 0)
489492

490493
analytics.purgeStorage()
491494
newPendingCount = analytics.pendingUploads!.count
@@ -688,4 +691,38 @@ final class Analytics_Tests: XCTestCase {
688691
XCTAssertTrue(shared2 === shared)
689692

690693
}
694+
695+
func testServerOperatingMode() {
696+
// Use a specific writekey to this test so we do not collide with other cached items.
697+
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode")
698+
.flushInterval(9999)
699+
.flushAt(9999)
700+
.operatingMode(.synchronous))
701+
702+
waitUntilStarted(analytics: analytics)
703+
704+
analytics.storage.hardReset(doYouKnowHowToUseThis: true)
705+
706+
@Atomic var completionCalled = false
707+
708+
// put an event in the pipe ...
709+
analytics.track(name: "completion test1")
710+
// flush it, that'll get us an upload going
711+
analytics.flush {
712+
// verify completion is called.
713+
completionCalled = true
714+
}
715+
716+
// completion shouldn't be called before flush returned.
717+
XCTAssertTrue(completionCalled)
718+
XCTAssertEqual(analytics.pendingUploads!.count, 0)
719+
720+
// put another event in the pipe.
721+
analytics.track(name: "completion test2")
722+
analytics.flush()
723+
724+
// flush shouldn't return until all uploads are done, cuz
725+
// it's running in sync mode.
726+
XCTAssertEqual(analytics.pendingUploads!.count, 0)
727+
}
691728
}

Tests/Segment-Tests/FlushPolicy_Tests.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,15 @@ class FlushPolicyTests: XCTestCase {
137137

138138
XCTAssertTrue(analytics.hasUnsentEvents)
139139

140-
// sleep for 4 seconds for 2 second flush policy
141-
RunLoop.main.run(until: Date.init(timeIntervalSinceNow: 4))
142-
143-
XCTAssertFalse(analytics.hasUnsentEvents)
140+
@Atomic var flushSent = false
141+
while !flushSent {
142+
RunLoop.main.run(until: Date.distantPast)
143+
if analytics.pendingUploads!.count > 0 {
144+
// flush was triggered
145+
flushSent = true
146+
}
147+
}
148+
149+
XCTAssertTrue(flushSent)
144150
}
145151
}

0 commit comments

Comments
 (0)