1414 * limitations under the License.
1515 */
1616
17+ import Atomics
1718import BenchmarkUtils
1819import Foundation
1920import GRPC
@@ -168,8 +169,8 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
168169 /// Succeeds after a stop has been requested and all outstanding requests have completed.
169170 private var stopComplete : EventLoopPromise < Void >
170171
171- private let running : NIOAtomic < Bool > = . makeAtomic ( value : false )
172- private let outstanding : NIOAtomic < Int > = . makeAtomic ( value : 0 )
172+ private let running = ManagedAtomic < Bool > ( false )
173+ private let outstanding = ManagedAtomic < Int > ( 0 )
173174
174175 private var requestMaker : RequestMakerType
175176
@@ -211,15 +212,20 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
211212 // - when a request finishes it will either start a new request or decrement the
212213 // the atomic counter (if we've been told to stop)
213214 // - if the counter drops to zero we're finished.
214- let exchangedRunning = self . running. compareAndExchange ( expected: false , desired: true )
215- precondition ( exchangedRunning, " launchRequests should only be called once " )
215+ let exchangedRunning = self . running. compareExchange (
216+ expected: false ,
217+ desired: true ,
218+ ordering: . relaxed
219+ )
220+ precondition ( exchangedRunning. exchanged, " launchRequests should only be called once " )
216221
217222 // We only decrement the outstanding count when running has been changed back to false.
218- let exchangedOutstanding = self . outstanding. compareAndExchange (
223+ let exchangedOutstanding = self . outstanding. compareExchange (
219224 expected: 0 ,
220- desired: self . maxPermittedOutstandingRequests
225+ desired: self . maxPermittedOutstandingRequests,
226+ ordering: . relaxed
221227 )
222- precondition ( exchangedOutstanding, " launchRequests should only be called once " )
228+ precondition ( exchangedOutstanding. exchanged , " launchRequests should only be called once " )
223229
224230 for _ in 0 ..< self . maxPermittedOutstandingRequests {
225231 self . requestMaker. makeRequest ( ) . whenComplete { _ in
@@ -229,11 +235,11 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
229235 }
230236
231237 private func makeRequest( ) {
232- if self . running. load ( ) {
238+ if self . running. load ( ordering : . relaxed ) {
233239 self . requestMaker. makeRequest ( ) . whenComplete { _ in
234240 self . makeRequest ( )
235241 }
236- } else if self . outstanding. sub ( 1 ) == 1 {
242+ } else if self . outstanding. loadThenWrappingDecrement ( ordering : . relaxed ) == 1 {
237243 self . stopIsComplete ( )
238244 } // else we're no longer running but not all RPCs have finished.
239245 }
@@ -260,7 +266,7 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
260266 /// - returns: A future which can be waited on to signal when all activity has ceased.
261267 func stop( ) -> EventLoopFuture < Void > {
262268 self . requestMaker. requestStop ( )
263- self . running. store ( false )
269+ self . running. store ( false , ordering : . relaxed )
264270 return self . stopComplete. futureResult
265271 }
266272 }
0 commit comments