Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class Analytics {
static internal weak var firstInstance: Analytics? = nil

@Atomic static internal var activeWriteKeys = [String]()

// Used for WaitingPlugin's, see waiting.swift
internal var processingTimer: DispatchWorkItem? = nil

/**
This method isn't a traditional singleton implementation. It's provided here
Expand Down
6 changes: 6 additions & 0 deletions Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ extension DestinationPlugin {
public func add(plugin: Plugin) -> Plugin {
if let analytics = self.analytics {
plugin.configure(analytics: analytics)
if let waiting = plugin as? WaitingPlugin {
analytics.pauseEventProcessing(plugin: waiting)
}
}
timeline.add(plugin: plugin)
analytics?.updateIfNecessary(plugin: plugin)
Expand Down Expand Up @@ -188,6 +191,9 @@ extension Analytics {
@discardableResult
public func add(plugin: Plugin) -> Plugin {
plugin.configure(analytics: self)
if let waiting = plugin as? WaitingPlugin {
pauseEventProcessing(plugin: waiting)
}
timeline.add(plugin: plugin)
updateIfNecessary(plugin: plugin)
return plugin
Expand Down
26 changes: 9 additions & 17 deletions Sources/Segment/Settings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,11 @@

extension Analytics {
internal func update(settings: Settings) {
guard let system: System = store.currentState() else { return }
apply { plugin in
plugin.update(settings: settings, type: updateType(for: plugin, in: system))
plugin.update(settings: settings, type: updateType(for: plugin))
if let destPlugin = plugin as? DestinationPlugin {
destPlugin.apply { subPlugin in
subPlugin.update(settings: settings, type: updateType(for: subPlugin, in: system))
subPlugin.update(settings: settings, type: updateType(for: subPlugin))
}
}
}
Expand All @@ -125,19 +124,12 @@
// if we're already running, update has already been called for existing plugins,
// so we just wanna call it on this one if it hasn't been done already.
if system.running, let settings = system.settings {
let alreadyInitialized = system.initializedPlugins.contains { p in
return plugin === p
}
if !alreadyInitialized {
store.dispatch(action: System.AddPluginToInitialized(plugin: plugin))
plugin.update(settings: settings, type: .initial)
} else {
plugin.update(settings: settings, type: .refresh)
}
plugin.update(settings: settings, type: updateType(for: plugin))
}
}

internal func updateType(for plugin: Plugin, in system: System) -> UpdateType {
internal func updateType(for plugin: Plugin) -> UpdateType {
guard let system: System = store.currentState() else { return .initial }
let alreadyInitialized = system.initializedPlugins.contains { p in
return plugin === p
}
Expand All @@ -154,14 +146,14 @@
if isUnitTesting {
// we don't really wanna wait for this network call during tests...
// but we should make it work similarly.
store.dispatch(action: System.ToggleRunningAction(running: false))
pauseEventProcessing()

operatingMode.run(queue: DispatchQueue.main) {
if let state: System = self.store.currentState(), let settings = state.settings {
self.store.dispatch(action: System.UpdateSettingsAction(settings: settings))
self.update(settings: settings)
}
self.store.dispatch(action: System.ToggleRunningAction(running: true))
self.resumeEventProcessing()
}

return
Expand All @@ -172,7 +164,7 @@
let httpClient = HTTPClient(analytics: self)

// stop things; queue in case our settings have changed.
store.dispatch(action: System.ToggleRunningAction(running: false))
pauseEventProcessing()

Check warning on line 167 in Sources/Segment/Settings.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Segment/Settings.swift#L167

Added line #L167 was not covered by tests
httpClient.settingsFor(writeKey: writeKey) { (success, settings) in
if success, let s = settings {
// put the new settings in the state store.
Expand All @@ -186,7 +178,7 @@
}

// we're good to go back to a running state.
self.store.dispatch(action: System.ToggleRunningAction(running: true))
self.resumeEventProcessing()

Check warning on line 181 in Sources/Segment/Settings.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Segment/Settings.swift#L181

Added line #L181 was not covered by tests
}
}
}
103 changes: 95 additions & 8 deletions Sources/Segment/State.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct System: State {
let running: Bool
let enabled: Bool
let initializedPlugins: [Plugin]
let waitingPlugins: [Plugin]

struct UpdateSettingsAction: Action {
let settings: Settings
Expand All @@ -25,7 +26,8 @@ struct System: State {
settings: settings,
running: state.running,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins)
initializedPlugins: state.initializedPlugins,
waitingPlugins: state.waitingPlugins)
return result
}
}
Expand All @@ -34,11 +36,29 @@ struct System: State {
let running: Bool

func reduce(state: System) -> System {
var desiredRunning = running

if desiredRunning == true && state.waitingPlugins.count > 0 {
desiredRunning = false
}

return System(configuration: state.configuration,
settings: state.settings,
running: running,
running: desiredRunning,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins)
initializedPlugins: state.initializedPlugins,
waitingPlugins: state.waitingPlugins)
}
}

struct ForceRunningAction: Action {
func reduce(state: System) -> System {
return System(configuration: state.configuration,
settings: state.settings,
running: true,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins,
waitingPlugins: state.waitingPlugins)
}
}

Expand All @@ -50,7 +70,8 @@ struct System: State {
settings: state.settings,
running: state.running,
enabled: enabled,
initializedPlugins: state.initializedPlugins)
initializedPlugins: state.initializedPlugins,
waitingPlugins: state.waitingPlugins)
}
}

Expand All @@ -62,7 +83,8 @@ struct System: State {
settings: state.settings,
running: state.running,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins)
initializedPlugins: state.initializedPlugins,
waitingPlugins: state.waitingPlugins)
}
}

Expand All @@ -79,7 +101,8 @@ struct System: State {
settings: settings,
running: state.running,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins)
initializedPlugins: state.initializedPlugins,
waitingPlugins: state.waitingPlugins)
}
}

Expand All @@ -97,7 +120,64 @@ struct System: State {
settings: state.settings,
running: state.running,
enabled: state.enabled,
initializedPlugins: initializedPlugins)
initializedPlugins: initializedPlugins,
waitingPlugins: state.waitingPlugins)
}
}

struct AddWaitingPlugin: Action {
let plugin: Plugin

func reduce(state: System) -> System {
var waitingPlugins = state.waitingPlugins
if !waitingPlugins.contains(where: { p in
return plugin === p
}) {
waitingPlugins.append(plugin)
}
return System(configuration: state.configuration,
settings: state.settings,
running: state.running,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins,
waitingPlugins: waitingPlugins)
}
}

/*struct RemoveWaitingPlugin: Action {
let plugin: Plugin

func reduce(state: System) -> System {
var waitingPlugins = state.waitingPlugins
waitingPlugins.removeAll { p in
return plugin === p
}
return System(configuration: state.configuration,
settings: state.settings,
running: state.running,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins,
waitingPlugins: waitingPlugins)
}
}*/
struct RemoveWaitingPlugin: Action {
let plugin: Plugin

func reduce(state: System) -> System {
var waitingPlugins = state.waitingPlugins
let countBefore = waitingPlugins.count
waitingPlugins.removeAll { p in
return plugin === p
}
let countAfter = waitingPlugins.count
print("RemoveWaitingPlugin: \(countBefore) -> \(countAfter)")

return System(configuration: state.configuration,
settings: state.settings,
running: state.running,
enabled: state.enabled,
initializedPlugins: state.initializedPlugins,
waitingPlugins: waitingPlugins)
}
}
}
Expand Down Expand Up @@ -171,7 +251,14 @@ extension System {
settings = Settings(writeKey: configuration.values.writeKey, apiHost: HTTPClient.getDefaultAPIHost())
}
}
return System(configuration: configuration, settings: settings, running: false, enabled: true, initializedPlugins: [Plugin]())
return System(
configuration: configuration,
settings: settings,
running: false,
enabled: true,
initializedPlugins: [Plugin](),
waitingPlugins: [WaitingPlugin]()
)
}
}

Expand Down
73 changes: 73 additions & 0 deletions Sources/Segment/Waiting.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//
// Waiting.swift
// Segment
//
// Created by Brandon Sneed on 7/12/25.
//
import Foundation

public protocol WaitingPlugin: Plugin {}

extension Analytics {
/// Pauses event processing, causing events to be queued. When processing resumes
/// any queued events will be replayed to the system with their original timestamps.
/// The system will forcibly resume after 30 seconds, but you should
/// call `resumeEventProcessing(plugin:)` when you've completed your task.
public func pauseEventProcessing(plugin: WaitingPlugin) {
store.dispatch(action: System.AddWaitingPlugin(plugin: plugin))
pauseEventProcessing()
}

/// Resume event processing. Any queued events will be replayed into the system
/// using their original timestamps.
public func resumeEventProcessing(plugin: WaitingPlugin) {
store.dispatch(action: System.RemoveWaitingPlugin(plugin: plugin))
resumeEventProcessing()
}
}

extension Analytics {
internal func running() -> Bool {
if let system: System = store.currentState() {
return system.running
}
// we have no state, so assume no.
return false

Check warning on line 35 in Sources/Segment/Waiting.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Segment/Waiting.swift#L34-L35

Added lines #L34 - L35 were not covered by tests
}

internal func pauseEventProcessing() {
let running = running()
// if we're already paused, ignore and leave.
if !running {
return
}
// pause processing
store.dispatch(action: System.ToggleRunningAction(running: false))
// if we WERE running, someone stopped us, set a timer for
// 30 seconds so they can't keep the system stopped forever.
startProcessingAfterTimeout()
}

internal func resumeEventProcessing() {
let running = running()
// if we're already running, ignore and leave.
if running {
return
}
store.dispatch(action: System.ToggleRunningAction(running: true))
}

internal func startProcessingAfterTimeout() {
DispatchQueue.main.async { [weak self] in
guard let self else { return }
self.processingTimer?.cancel()
self.processingTimer = DispatchWorkItem { [weak self] in
self?.store.dispatch(action: System.ForceRunningAction())
self?.processingTimer = nil // clean up after ourselves
}

Check warning on line 67 in Sources/Segment/Waiting.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Segment/Waiting.swift#L65-L67

Added lines #L65 - L67 were not covered by tests
if let processingTimer = self.processingTimer {
DispatchQueue.main.asyncAfter(deadline: .now() + 30, execute: processingTimer)
}
}
}
}
Loading