|
7 | 7 | // |
8 | 8 |
|
9 | 9 | import Foundation |
| 10 | +import ThreadSafeSwift |
10 | 11 |
|
11 | 12 | /** |
12 | 13 | Concrete Implementation for an `EventPool`. |
13 | 14 | - Author: Simon J. Stuart |
14 | | - - Version: 3.1.0 |
| 15 | + - Version: 4.0.0 |
15 | 16 | - Parameters: |
16 | 17 | - TEventThread: The `EventThreadable`-conforming Type to be managed by this `EventPool` |
17 | 18 | - Note: Event Pools own and manage all instances of the given `TEventThread` type |
18 | 19 | */ |
19 | 20 | open class EventPool<TEventThread: EventThreadable>: EventHandler, EventPooling { |
| 21 | + @ThreadSafeSemaphore public var balancer: EventPoolBalancing |
| 22 | + @ThreadSafeSemaphore public var scaler: EventPoolScaling |
| 23 | + @ThreadSafeSemaphore public var capacity: UInt8 |
| 24 | + |
| 25 | + private var eventThreads = [TEventThread]() |
| 26 | + |
| 27 | + struct ThreadContainer { |
| 28 | + weak var thread: (any EventThreadable)? |
| 29 | + } |
| 30 | + @ThreadSafeSemaphore private var pools = [String:[ThreadContainer]]() |
20 | 31 |
|
21 | 32 | public func addReceiver(_ receiver: EventReceiving, forEventType: Eventable.Type) { |
22 | | - |
| 33 | + if let eventThread = receiver as? EventThreadable { /// We must cast the `receiver` to `EventThreadable` safely |
| 34 | + let eventTypeName = String(reflecting: forEventType) |
| 35 | + |
| 36 | + // We need to add the Thread into the Pool for this Event Type |
| 37 | + _pools.withLock { pools in |
| 38 | + var bucket = pools[eventTypeName] |
| 39 | + let newBucket = bucket == nil |
| 40 | + if newBucket { bucket = [ThreadContainer]() } /// If there's no Bucket for this Event Type, create one |
| 41 | + |
| 42 | + /// If it's NOT a New Bucket, and the Bucket already contains this Receiver... |
| 43 | + if !newBucket && bucket!.contains(where: { threadContainer in |
| 44 | + threadContainer.thread != nil && ObjectIdentifier(threadContainer.thread!) == ObjectIdentifier(eventThread) |
| 45 | + }) { |
| 46 | + return // ... just Return! |
| 47 | + } |
| 48 | + |
| 49 | + /// If we reach here, the Receiver is not already in the Bucket, so let's add it! |
| 50 | + bucket!.append(ThreadContainer(thread: eventThread)) |
| 51 | + |
| 52 | + if bucket!.count == 1 { EventCentral.shared.addReceiver(self, forEventType: forEventType) } /// If this is the *first* registered Thread for this Event Type, we need to register with Central Dispatch |
| 53 | + |
| 54 | + pools[eventTypeName] = bucket! |
| 55 | + } |
| 56 | + } |
23 | 57 | } |
24 | 58 |
|
25 | 59 | public func removeReceiver(_ receiver: EventReceiving, forEventType: Eventable.Type) { |
26 | | - |
| 60 | + if let eventThread = receiver as? EventThreadable { /// We must cast the `receiver` to `EventThreadable` safely |
| 61 | + let eventTypeName = String(reflecting: forEventType) |
| 62 | + |
| 63 | + _pools.withLock { pools in |
| 64 | + var bucket = pools[eventTypeName] |
| 65 | + if bucket == nil { return } /// Can't remove a Receiver if there isn't even a Bucket for hte Event Type |
| 66 | + |
| 67 | + /// Remove any Receivers from this Event-Type Bucket for the given `receiver` instance. |
| 68 | + bucket!.removeAll { threadContainer in |
| 69 | + threadContainer.thread != nil && ObjectIdentifier(threadContainer.thread!) == ObjectIdentifier(eventThread) |
| 70 | + } |
| 71 | + |
| 72 | + if bucket!.count == 0 { EventCentral.shared.removeReceiver(self, forEventType: forEventType) } /// If there are none left in the Bucket, unregister this `EventPool` from Central Dispatch |
| 73 | + |
| 74 | + pools[eventTypeName] = bucket // Update the Bucket for this Event Type |
| 75 | + } |
| 76 | + } |
27 | 77 | } |
28 | 78 |
|
29 | 79 | public func removeReceiver(_ receiver: EventReceiving) { |
| 80 | + if let eventThread = receiver as? EventThreadable { /// We must cast the `receiver` to `EventThreadable` safely |
| 81 | + |
| 82 | + _pools.withLock { pools in |
| 83 | + for (eventTypeName, bucket) in pools { /// Iterate every Event Type |
| 84 | + var newBucket = bucket // Copy the Bucket |
| 85 | + newBucket.removeAll { threadContainer in /// Remove any occurences of the given Receiver from the Bucket |
| 86 | + threadContainer.thread != nil && ObjectIdentifier(threadContainer.thread!) == ObjectIdentifier(eventThread) |
| 87 | + } |
| 88 | + |
| 89 | + if bucket.count == 0 { EventCentral.shared.removeReceiver(self) } /// If there are none left in the Bucket, unregister this `EventPool` from Central Dispatch |
| 90 | + |
| 91 | + pools[eventTypeName] = newBucket /// Update the Bucket for this Event Type |
| 92 | + } |
| 93 | + } |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + internal func scalePool() { |
| 98 | + let scalingResult = scaler.calculateScaling(currentCapacity: capacity, eventThreads: eventThreads, eventsPending: eventCount) |
| 99 | + if !scalingResult.modifyCapacity { return } // If there's no scaling to perform, let's return |
| 100 | + //TODO: Implement Scaling + Culling here |
| 101 | + } |
| 102 | + |
| 103 | + override internal func processEvent(_ event: any Eventable, dispatchMethod: EventDispatchMethod, priority: EventPriority) { |
| 104 | + let eventTypeName = String(reflecting: type(of: event)) |
| 105 | + |
| 106 | + var snapPools = [String:[ThreadContainer]]() |
| 107 | + |
| 108 | + _pools.withLock { pools in |
| 109 | + // We should take this opportunity to remove any nil receivers |
| 110 | + pools[eventTypeName]?.removeAll(where: { threadContainer in |
| 111 | + threadContainer.thread == nil |
| 112 | + }) |
| 113 | + snapPools = pools |
| 114 | + } |
| 115 | + |
| 116 | + let bucket = snapPools[eventTypeName] |
| 117 | + if bucket == nil { return } /// No Receivers, so nothing more to do! |
| 118 | + |
| 119 | + /// Now we need to determine the appropriate `EventThread` to receive this `Eventable` |
| 120 | + var bucketThreads = [EventThreadable]() |
| 121 | + for threadContainer in bucket! { |
| 122 | + if threadContainer.thread == nil { continue } //Can't consider this Thread if it doesn't exist! |
| 123 | + bucketThreads.append(threadContainer.thread!) |
| 124 | + } |
| 125 | + let targetThread = balancer.chooseEventThread(eventThreads: bucketThreads) |
| 126 | + |
| 127 | + if targetThread != nil { |
| 128 | + switch dispatchMethod { |
| 129 | + case .stack: |
| 130 | + targetThread!.stackEvent(event, priority: priority) |
| 131 | + case .queue: |
| 132 | + targetThread!.queueEvent(event, priority: priority) |
| 133 | + } |
| 134 | + } |
| 135 | + |
| 136 | + scalePool() |
| 137 | + } |
| 138 | + |
| 139 | + /** |
| 140 | + Create a new `EventPool` |
| 141 | + - Author: Simon J. Stuart |
| 142 | + - Version: 4.0.0 |
| 143 | + - Parameters: |
| 144 | + - capacity: The number of Threads to spawn |
| 145 | + - balancer: The Load Balancer to use (directs `Eventable` instances to the most appropriate `EventThread` at any given time) - Default is `nil`, uses the `EventPoolRoundRobinBalancer` if `nil` |
| 146 | + - scaler: The Scaler to use (increases and/or decreases the number of `EventThread` instances managed by the `EventPool` in response to rules defined by the `scaler` - Default is `nil`, uses the `EventPoolStaticScaler` if `nil` |
| 147 | + */ |
| 148 | + public init( |
| 149 | + capacity: UInt8, |
| 150 | + balancer: EventPoolBalancing? = nil, |
| 151 | + scaler: EventPoolScaling? = nil |
| 152 | + ) { |
| 153 | + self.capacity = capacity |
| 154 | + self.balancer = balancer != nil ? balancer! : EventPoolRoundRobinBalancer() |
| 155 | + self.scaler = scaler != nil ? scaler! : EventPoolStaticScaler(initialCapacity: capacity, minimumCapacity: capacity, maximumCapacity: capacity) |
30 | 156 |
|
| 157 | + super.init() |
| 158 | + // Now we create all of our Event Threads |
| 159 | + var current = 0 |
| 160 | + while current < capacity + 1 { |
| 161 | + let eventThread = TEventThread(eventPool: self) |
| 162 | + eventThreads.append(eventThread) |
| 163 | + current += 1 |
| 164 | + } |
31 | 165 | } |
32 | 166 | } |
0 commit comments