Skip to content

Commit eb45946

Browse files
author
Rodrigo Gomez Palacio
committed
OSConsistencyManager & related classes
Motivation: Manages (kafka) offsets that function as read-your-write tokens for more accurate segment membership calculation. The manager works based on conditions & offsets. Offsets are stored in a nested map indexed by a unique id (e.g. `onesignalId`) and offset key (e.g. `USER_UPDATE`). This allows us to track offsets on a per-user basis (e.g. handle switching users). Conditions work by creating a blocking mechanism with customizable offset retrieval until a pre-defined condition is met (e.g. at least two specific offsets are available). OSCondition interface: allows extensibility for future applications to control offset blocking mechanism in consistency use-cases.
1 parent fa3c750 commit eb45946

File tree

4 files changed

+503
-0
lines changed

4 files changed

+503
-0
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//
2+
// OSCondition.swift
3+
// OneSignalOSCore
4+
//
5+
// Created by Rodrigo Gomez-Palacio on 9/10/24.
6+
// Copyright © 2024 OneSignal. All rights reserved.
7+
//
8+
9+
import Foundation
10+
11+
@objc public protocol OSCondition: AnyObject {
12+
// Each conforming class will provide its unique ID
13+
var conditionId: String { get }
14+
func isMet(indexedTokens: [String: [NSNumber: OSReadYourWriteData]]) -> Bool
15+
func getNewestToken(indexedTokens: [String: [NSNumber: OSReadYourWriteData]]) -> OSReadYourWriteData?
16+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
//
2+
// OSConsistencyKeyEnum.swift
3+
// OneSignalOSCore
4+
//
5+
// Created by Rodrigo Gomez-Palacio on 9/10/24.
6+
// Copyright © 2024 OneSignal. All rights reserved.
7+
//
8+
9+
import Foundation
10+
11+
// Protocol for enums with Int raw values.
12+
public protocol OSConsistencyKeyEnum: RawRepresentable where RawValue == Int { }
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
//
2+
// OSConsistencyManager.swift
3+
// OneSignalOSCore
4+
//
5+
// Created by Rodrigo Gomez-Palacio on 9/10/24.
6+
// Copyright © 2024 OneSignal. All rights reserved.
7+
//
8+
9+
import Foundation
10+
11+
@objc public class OSConsistencyManager: NSObject {
12+
// Singleton instance
13+
@objc public static let shared = OSConsistencyManager()
14+
15+
private let queue = DispatchQueue(label: "com.consistencyManager.queue")
16+
private var indexedTokens: [String: [NSNumber: OSReadYourWriteData]] = [:]
17+
private var indexedConditions: [String: [(OSCondition, DispatchSemaphore)]] = [:] // Index conditions by condition id
18+
19+
// Private initializer to prevent multiple instances
20+
private override init() {}
21+
22+
// Used for testing
23+
public func reset() {
24+
indexedTokens = [:]
25+
indexedConditions = [:]
26+
}
27+
28+
// Function to set the token in a thread-safe manner
29+
public func setRywTokenAndDelay(id: String, key: any OSConsistencyKeyEnum, value: OSReadYourWriteData) {
30+
queue.sync {
31+
let nsKey = NSNumber(value: key.rawValue)
32+
if self.indexedTokens[id] == nil {
33+
self.indexedTokens[id] = [:]
34+
}
35+
self.indexedTokens[id]?[nsKey] = value
36+
self.checkConditionsAndComplete(forId: id) // Only check conditions for this specific ID
37+
}
38+
}
39+
40+
// Register a condition and block the caller until the condition is met
41+
@objc public func getRywTokenFromAwaitableCondition(_ condition: OSCondition, forId id: String) -> OSReadYourWriteData? {
42+
let semaphore = DispatchSemaphore(value: 0)
43+
queue.sync {
44+
if self.conditions[id] == nil {
45+
self.conditions[id] = []
46+
}
47+
self.conditions[id]?.append((condition, semaphore))
48+
self.checkConditionsAndComplete(forId: id)
49+
}
50+
semaphore.wait() // Block until the condition is met
51+
return queue.sync {
52+
return condition.getNewestToken(indexedTokens: self.indexedTokens)
53+
}
54+
}
55+
56+
// Method to resolve conditions by condition ID (e.g. OSIamFetchReadyCondition.ID)
57+
@objc public func resolveConditionsWithID(id: String) {
58+
guard let conditionList = conditions[id] else { return }
59+
var completedConditions: [(OSCondition, DispatchSemaphore)] = []
60+
for (condition, semaphore) in conditionList {
61+
if (condition.conditionId == id) {
62+
semaphore.signal()
63+
completedConditions.append((condition, semaphore))
64+
}
65+
}
66+
conditions[id]?.removeAll { condition, semaphore in
67+
completedConditions.contains(where: { $0.0 === condition && $0.1 == semaphore })
68+
}
69+
}
70+
71+
// Private method to check conditions for a specific id (unique ID like onesignalId)
72+
private func checkConditionsAndComplete(forId id: String) {
73+
guard let conditionList = conditions[id] else { return }
74+
var completedConditions: [(OSCondition, DispatchSemaphore)] = []
75+
for (condition, semaphore) in conditionList {
76+
if condition.isMet(indexedTokens: indexedTokens) {
77+
print("Condition met for id: \(id)")
78+
semaphore.signal()
79+
completedConditions.append((condition, semaphore))
80+
} else {
81+
print("Condition not met for id: \(id)")
82+
}
83+
}
84+
conditions[id]?.removeAll { condition, semaphore in
85+
completedConditions.contains(where: { $0.0 === condition && $0.1 == semaphore })
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)