Skip to content

Commit 68ccc40

Browse files
mrstegemanfabianfett
authored andcommitted
Switch from NIOAtomic to ManagedAtomic.
1 parent ff495ef commit 68ccc40

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

Sources/RediStack/RedisConnection.swift

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import struct Foundation.UUID
1616
import struct Dispatch.DispatchTime
17+
import Atomics
1718
import Logging
1819
import Metrics
1920
import NIO
@@ -117,22 +118,22 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
117118
/// - Important: Even when set to `true`, the host machine may still choose to delay sending commands.
118119
/// - Note: Setting this to `true` will immediately drain the buffer.
119120
public var sendCommandsImmediately: Bool {
120-
get { return autoflush.load() }
121+
get { return autoflush.load(ordering: .sequentiallyConsistent) }
121122
set(newValue) {
122123
if newValue { self.channel.flush() }
123-
autoflush.store(newValue)
124+
autoflush.store(newValue, ordering: .sequentiallyConsistent)
124125
}
125126
}
126127
/// Controls the permission of the connection to be able to have PubSub subscriptions or not.
127128
///
128129
/// When set to `true`, this connection is allowed to create subscriptions.
129130
/// When set to `false`, this connection is not allowed to create subscriptions. Any potentially existing subscriptions will be removed.
130131
public var allowSubscriptions: Bool {
131-
get { self.allowPubSub.load() }
132+
get { self.allowPubSub.load(ordering: .sequentiallyConsistent) }
132133
set(newValue) {
133-
self.allowPubSub.store(newValue)
134+
self.allowPubSub.store(newValue, ordering: .sequentiallyConsistent)
134135
// if we're subscribed, and we're not allowed to be in pubsub, end our subscriptions
135-
guard self.isSubscribed && !self.allowPubSub.load() else { return }
136+
guard self.isSubscribed && !self.allowPubSub.load(ordering: .sequentiallyConsistent) else { return }
136137
_ = EventLoopFuture<Void>.whenAllComplete([
137138
self.unsubscribe(),
138139
self.punsubscribe()
@@ -147,9 +148,9 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
147148
internal let channel: Channel
148149
private let systemContext: Context
149150
private var logger: Logger { self.systemContext }
150-
151-
private let autoflush: NIOAtomic<Bool> = .makeAtomic(value: true)
152-
private let allowPubSub: NIOAtomic<Bool> = .makeAtomic(value: true)
151+
152+
private let autoflush = ManagedAtomic<Bool>(true)
153+
private let allowPubSub = ManagedAtomic<Bool>(true)
153154
private let _stateLock = NIOLock()
154155
private var _state = ConnectionState.open
155156
private var state: ConnectionState {

Sources/RediStack/RedisMetrics.swift

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Atomics
1516
import Metrics
1617
import NIOConcurrencyHelpers
1718

@@ -69,10 +70,10 @@ extension RedisMetrics {
6970
/// A specialized wrapper class for working with `Metrics.Gauge` objects for the purpose of an incrementing or decrementing count of active objects.
7071
public class IncrementalGauge {
7172
private let gauge: Gauge
72-
private let count = NIOAtomic<Int>.makeAtomic(value: 0)
73+
private let count = ManagedAtomic<Int>(0)
7374

7475
/// The number of the objects that are currently reported as active.
75-
public var currentCount: Int { return count.load() }
76+
public var currentCount: Int { return count.load(ordering: .sequentiallyConsistent) }
7677

7778
internal init(_ label: Label) {
7879
self.gauge = .init(label: label)
@@ -81,21 +82,21 @@ extension RedisMetrics {
8182
/// Increments the current count by the amount specified.
8283
/// - Parameter amount: The number to increase the current count by. Default is `1`.
8384
public func increment(by amount: Int = 1) {
84-
_ = self.count.add(amount)
85-
self.gauge.record(self.count.load())
85+
self.count.wrappingIncrement(by: amount, ordering: .sequentiallyConsistent)
86+
self.gauge.record(self.count.load(ordering: .sequentiallyConsistent))
8687
}
8788

8889
/// Decrements the current count by the amount specified.
8990
/// - Parameter amount: The number to decrease the current count by. Default is `1`.
9091
public func decrement(by amount: Int = 1) {
91-
_ = self.count.sub(amount)
92-
self.gauge.record(self.count.load())
92+
self.count.wrappingDecrement(by: amount, ordering: .sequentiallyConsistent)
93+
self.gauge.record(self.count.load(ordering: .sequentiallyConsistent))
9394
}
9495

9596
/// Resets the current count to `0`.
9697
public func reset() {
97-
_ = self.count.exchange(with: 0)
98-
self.gauge.record(self.count.load())
98+
_ = self.count.exchange(0, ordering: .sequentiallyConsistent)
99+
self.gauge.record(self.count.load(ordering: .sequentiallyConsistent))
99100
}
100101
}
101102
}

0 commit comments

Comments
 (0)