Skip to content

Commit bed371a

Browse files
authored
Merge pull request #266 from TelemetryDeck/feat/improve-signal-cache
feat: Improve signal cache concurrency
2 parents 8a5a649 + a12f4b9 commit bed371a

File tree

3 files changed

+164
-18
lines changed

3 files changed

+164
-18
lines changed

Sources/TelemetryDeck/Signals/SignalCache.swift

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal class SignalCache<T>: @unchecked Sendable where T: Codable {
1717

1818
/// How many Signals are cached
1919
func count() -> Int {
20-
queue.sync(flags: .barrier) {
20+
queue.sync {
2121
self.cachedSignals.count
2222
}
2323
}
@@ -41,15 +41,12 @@ internal class SignalCache<T>: @unchecked Sendable where T: Codable {
4141
/// You should hold on to the signals returned by this function. If the action you are trying to do with them fails
4242
/// (e.g. sending them to a server) you should reinsert them into the cache with the `push` function.
4343
func pop() -> [T] {
44-
var poppedSignals: [T]!
45-
46-
queue.sync {
44+
queue.sync(flags: .barrier) {
4745
let sliceSize = min(maximumNumberOfSignalsToPopAtOnce, cachedSignals.count)
48-
poppedSignals = Array(cachedSignals[..<sliceSize])
46+
let poppedSignals = Array(cachedSignals[..<sliceSize])
4947
cachedSignals.removeFirst(sliceSize)
48+
return poppedSignals
5049
}
51-
52-
return poppedSignals
5350
}
5451

5552
private func fileURL() -> URL {

Sources/TelemetryDeck/Signals/SignalManager.swift

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ final class SignalManager: SignalManageable, @unchecked Sendable {
2929
private var signalCache: SignalCache<SignalPostBody>
3030
let configuration: TelemetryManagerConfiguration
3131

32-
private var sendTimer: Timer?
32+
private var sendTimerSource: DispatchSourceTimer?
33+
private let timerQueue = DispatchQueue(label: "com.telemetrydeck.SignalTimer", qos: .utility)
3334

3435
init(configuration: TelemetryManagerConfiguration) {
3536
self.configuration = configuration
@@ -96,14 +97,17 @@ final class SignalManager: SignalManageable, @unchecked Sendable {
9697
private func sendCachedSignalsRepeatedly() {
9798
attemptToSendNextBatchOfCachedSignals()
9899

99-
sendTimer?.invalidate()
100-
sendTimer = Timer.scheduledTimer(
101-
timeInterval: Self.minimumSecondsToPassBetweenRequests,
102-
target: self,
103-
selector: #selector(attemptToSendNextBatchOfCachedSignals),
104-
userInfo: nil,
105-
repeats: true
100+
sendTimerSource?.cancel()
101+
let source = DispatchSource.makeTimerSource(queue: timerQueue)
102+
source.schedule(
103+
deadline: .now() + Self.minimumSecondsToPassBetweenRequests,
104+
repeating: Self.minimumSecondsToPassBetweenRequests
106105
)
106+
source.setEventHandler { [weak self] in
107+
self?.attemptToSendNextBatchOfCachedSignals()
108+
}
109+
source.resume()
110+
sendTimerSource = source
107111
}
108112

109113
/// Adds a signal to the process queue
@@ -149,7 +153,6 @@ final class SignalManager: SignalManageable, @unchecked Sendable {
149153

150154
/// Sends one batch of signals from the cache if not empty.
151155
/// If signals fail to send, we put them back into the cache to try again later.
152-
@objc
153156
@Sendable
154157
func attemptToSendNextBatchOfCachedSignals() {
155158
configuration.logHandler?.log(.debug, message: "Current signal cache count: \(signalCache.count())")
@@ -232,8 +235,8 @@ extension SignalManager {
232235
@objc func didEnterBackground() {
233236
configuration.logHandler?.log(.debug, message: #function)
234237

235-
sendTimer?.invalidate()
236-
sendTimer = nil
238+
sendTimerSource?.cancel()
239+
sendTimerSource = nil
237240

238241
#if os(watchOS) || os(macOS)
239242
self.signalCache.backupCache()
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import Foundation
2+
import Testing
3+
4+
@testable import TelemetryDeck
5+
6+
struct SignalCacheConcurrencyTests {
7+
8+
/// Repro for https://github.com/TelemetryDeck/SwiftSDK/issues/265:
9+
///
10+
/// count() with barrier blocks because it waits for ALL pending GCD operations.
11+
///
12+
/// The bug: When count() uses `.barrier`, it must wait for all prior async blocks
13+
/// to complete before executing. If those blocks do work before calling push(),
14+
/// count() is blocked for their entire duration.
15+
///
16+
/// This test queues async blocks with artificial delays to create pending work,
17+
/// then immediately calls count() to measure blocking.
18+
@Test
19+
func count_barrierCausesMainThreadBlock() {
20+
if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) {
21+
let cache = SignalCache<SignalPostBody>(logHandler: nil)
22+
let stressQueue = DispatchQueue(label: "com.telemetrydeck.stressdaqueue", attributes: .concurrent)
23+
24+
// Queue 50 operations that each take 2ms BEFORE reaching push()
25+
// With barrier bug: count() waits for ALL of these (~100ms total)
26+
// With fix: count() returns immediately (~0ms)
27+
for i in 0..<50 {
28+
stressQueue.async {
29+
Thread.sleep(forTimeInterval: 0.002)
30+
cache.push(Self.makeSignal(id: "\(i)"))
31+
}
32+
}
33+
34+
// Immediately call count() - this is what the timer callback does
35+
let start = CFAbsoluteTimeGetCurrent()
36+
_ = cache.count()
37+
let elapsed = CFAbsoluteTimeGetCurrent() - start
38+
39+
// With barrier bug: ~100ms (50 * 2ms serialized wait)
40+
// With fix (no barrier): < 10ms (just reads array.count)
41+
#expect(elapsed < 0.010, "count() blocked for \(elapsed)s - barrier flag causing contention")
42+
} else {
43+
print("skipping test on incompatible OS")
44+
}
45+
}
46+
47+
/// Validates thread safety of concurrent push and pop operations.
48+
/// After fix, pop() uses barrier flag to ensure exclusive access during mutation.
49+
@Test
50+
func concurrentPushAndPop_maintainsDataIntegrity() async {
51+
if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) {
52+
let cache = SignalCache<SignalPostBody>(logHandler: nil)
53+
let pushCount = 500
54+
55+
await withTaskGroup(of: Void.self) { group in
56+
// Concurrent pushes
57+
for i in 0..<pushCount {
58+
group.addTask {
59+
cache.push(Self.makeSignal(id: "\(i)"))
60+
}
61+
}
62+
63+
// Concurrent pops (some will return empty arrays, that's fine)
64+
for _ in 0..<50 {
65+
group.addTask {
66+
_ = cache.pop()
67+
}
68+
}
69+
70+
await group.waitForAll()
71+
}
72+
73+
// Drain remaining signals
74+
var totalPopped = 0
75+
var batch = cache.pop()
76+
while !batch.isEmpty {
77+
totalPopped += batch.count
78+
batch = cache.pop()
79+
}
80+
81+
// We should have popped some signals (exact count varies due to concurrency)
82+
// The key assertion is that we don't crash or corrupt data
83+
#expect(cache.count() == 0, "Cache should be empty after draining")
84+
} else {
85+
print("skipping test on incompatible OS")
86+
}
87+
}
88+
89+
90+
/// Validates pop() correctly handles concurrent access without data races.
91+
/// Without barrier on pop(), concurrent calls can corrupt the array.
92+
/// Run multiple iterations to increase probability of catching race condition.
93+
@Test
94+
func pop_isThreadSafe() async {
95+
if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) {
96+
for iteration in 0..<10 {
97+
let cache = SignalCache<SignalPostBody>(logHandler: nil)
98+
let signalCount = 200
99+
100+
for i in 0..<signalCount {
101+
cache.push(Self.makeSignal(id: "\(iteration)_\(i)"))
102+
}
103+
104+
let allPopped = await withTaskGroup(of: [SignalPostBody].self, returning: [[SignalPostBody]].self) { group in
105+
for _ in 0..<20 {
106+
group.addTask {
107+
cache.pop()
108+
}
109+
}
110+
111+
var collected: [[SignalPostBody]] = []
112+
for await batch in group {
113+
collected.append(batch)
114+
}
115+
return collected
116+
}
117+
118+
let totalPopped = allPopped.flatMap { $0 }.count
119+
let remaining = cache.count()
120+
121+
#expect(
122+
totalPopped + remaining == signalCount,
123+
"Iteration \(iteration): Total signals (popped + remaining) should equal original count"
124+
)
125+
}
126+
} else {
127+
print("skipping test on incompatible OS")
128+
}
129+
130+
}
131+
132+
// MARK: - Helpers
133+
134+
private static func makeSignal(id: String) -> SignalPostBody {
135+
SignalPostBody(
136+
receivedAt: Date(),
137+
appID: UUID().uuidString,
138+
clientUser: id,
139+
sessionID: id,
140+
type: "test",
141+
floatValue: nil,
142+
payload: [:],
143+
isTestMode: "true"
144+
)
145+
}
146+
}

0 commit comments

Comments
 (0)