Skip to content

Commit 2c27ad5

Browse files
authored
Make testRetriesCantBeExecutedForTooManyRequestMessages more reliable (#1781)
Motivation: The 'testRetriesCantBeExecutedForTooManyRequestMessages' test wedges infrequently. After some diagnosis this appears to be due to the intersection of two bugs. The first is that when yielding an element to the sequence, if some consumers exist, of which a subset are waiting for the next element and the remainder are not 'slow', the continuations of the fastest consumers would not be resumed when a new element was added. The other bug is a timing issue: when waiting for the next element a subscriber may be told to suspend. However, the subscriber must drop and then reacquire the lock to store the continuation. The state wasn't re-checked on storing the continuation which opened up a window where the element may have become present between asking for it and storing the continuation. Modifications: - Don't release the lock between attempting to consume an element and suspending to wait for it. - Resume waiting consumers more frequently. - Make a few tests less flaky Result: Test didn't wedge in 10k iterations
1 parent 2034543 commit 2c27ad5

File tree

2 files changed

+69
-20
lines changed

2 files changed

+69
-20
lines changed

Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,40 @@ public struct _LockedValueBox<Value> {
253253
public func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
254254
return try self.storage.withLockedValue(mutate)
255255
}
256+
257+
/// An unsafe view over the locked value box.
258+
///
259+
/// Prefer ``withLockedValue(_:)`` where possible.
260+
public var unsafe: Unsafe {
261+
Unsafe(storage: self.storage)
262+
}
263+
264+
public struct Unsafe {
265+
@usableFromInline
266+
let storage: LockStorage<Value>
267+
268+
/// Manually acquire the lock.
269+
@inlinable
270+
public func lock() {
271+
self.storage.lock()
272+
}
273+
274+
/// Manually release the lock.
275+
@inlinable
276+
public func unlock() {
277+
self.storage.unlock()
278+
}
279+
280+
/// Mutate the value, assuming the lock has been acquired manually.
281+
@inlinable
282+
public func withValueAssumingLockIsAcquired<T>(
283+
_ mutate: (inout Value) throws -> T
284+
) rethrows -> T {
285+
return try self.storage.withUnsafeMutablePointerToHeader { value in
286+
try mutate(&value.pointee)
287+
}
288+
}
289+
}
256290
}
257291

258292
extension _LockedValueBox: Sendable where Value: Sendable {}

Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -261,38 +261,44 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
261261
func nextElement(
262262
forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
263263
) async throws -> Element? {
264-
let onNext = self._state.withLockedValue { $0.nextElement(forSubscriber: id) }
264+
return try await withTaskCancellationHandler {
265+
self._state.unsafe.lock()
266+
let onNext = self._state.unsafe.withValueAssumingLockIsAcquired {
267+
$0.nextElement(forSubscriber: id)
268+
}
265269

266-
switch onNext {
267-
case .return(let returnAndProduceMore):
268-
returnAndProduceMore.producers.resume()
269-
return try returnAndProduceMore.nextResult.get()
270+
switch onNext {
271+
case .return(let returnAndProduceMore):
272+
self._state.unsafe.unlock()
273+
returnAndProduceMore.producers.resume()
274+
return try returnAndProduceMore.nextResult.get()
270275

271-
case .suspend:
272-
return try await withTaskCancellationHandler {
276+
case .suspend:
273277
return try await withCheckedThrowingContinuation { continuation in
274-
let onSetContinuation = self._state.withLockedValue { state in
278+
let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in
275279
state.setContinuation(continuation, forSubscription: id)
276280
}
277281

282+
self._state.unsafe.unlock()
283+
278284
switch onSetContinuation {
279285
case .resume(let continuation, let result):
280286
continuation.resume(with: result)
281287
case .none:
282288
()
283289
}
284290
}
285-
} onCancel: {
286-
let onCancel = self._state.withLockedValue { state in
287-
state.cancelSubscription(withID: id)
288-
}
291+
}
292+
} onCancel: {
293+
let onCancel = self._state.withLockedValue { state in
294+
state.cancelSubscription(withID: id)
295+
}
289296

290-
switch onCancel {
291-
case .resume(let continuation, let result):
292-
continuation.resume(with: result)
293-
case .none:
294-
()
295-
}
297+
switch onCancel {
298+
case .resume(let continuation, let result):
299+
continuation.resume(with: result)
300+
case .none:
301+
()
296302
}
297303
}
298304
}
@@ -572,9 +578,18 @@ struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
572578
self.producerToken += 1
573579
onYield = .suspend(token)
574580
} else {
575-
// No consumers are slow. Remove the oldest value.
581+
// No consumers are slow, some subscribers exist, a subset of which are waiting
582+
// for a value. Drop the oldest value and resume the fastest consumers.
576583
self.elements.removeFirst()
577-
onYield = .none
584+
let continuations = self.subscriptions.takeContinuations().map {
585+
ConsumerContinuations(continuations: $0, result: .success(element))
586+
}
587+
588+
if let continuations = continuations {
589+
onYield = .resume(continuations)
590+
} else {
591+
onYield = .none
592+
}
578593
}
579594

580595
case self.subscriptions.count:

0 commit comments

Comments
 (0)