@@ -112,7 +112,7 @@ extension AsyncStream {
112
112
let handler = state. onTermination
113
113
state. onTermination = nil
114
114
unlock ( )
115
-
115
+
116
116
// handler must be invoked before yielding nil for termination
117
117
handler ? ( . cancelled)
118
118
@@ -124,7 +124,7 @@ extension AsyncStream {
124
124
lock ( )
125
125
let limit = state. limit
126
126
let count = state. pending. count
127
-
127
+
128
128
if !state. continuations. isEmpty {
129
129
let continuation = state. continuations. removeFirst ( )
130
130
if count > 0 {
@@ -170,7 +170,7 @@ extension AsyncStream {
170
170
case . bufferingOldest( let limit) :
171
171
result = . enqueued( remaining: limit)
172
172
}
173
-
173
+
174
174
unlock ( )
175
175
continuation. resume ( returning: value)
176
176
}
@@ -205,7 +205,7 @@ extension AsyncStream {
205
205
}
206
206
return result
207
207
}
208
-
208
+
209
209
func finish( ) {
210
210
lock ( )
211
211
let handler = state. onTermination
@@ -249,9 +249,9 @@ extension AsyncStream {
249
249
} else {
250
250
unlock ( )
251
251
}
252
-
252
+
253
253
}
254
-
254
+
255
255
func next( ) async -> Element ? {
256
256
await withTaskCancellationHandler { [ cancel] in
257
257
cancel ( )
@@ -289,7 +289,7 @@ extension AsyncThrowingStream {
289
289
case finished
290
290
case failed( Failure )
291
291
}
292
-
292
+
293
293
struct State {
294
294
var continuation : UnsafeContinuation < Element ? , Error > ?
295
295
var pending = _Deque < Element > ( )
@@ -345,7 +345,7 @@ extension AsyncThrowingStream {
345
345
let handler = state. onTermination
346
346
state. onTermination = nil
347
347
unlock ( )
348
-
348
+
349
349
// handler must be invoked before yielding nil for termination
350
350
handler ? ( . cancelled)
351
351
@@ -409,7 +409,7 @@ extension AsyncThrowingStream {
409
409
case . bufferingNewest( let limit) :
410
410
result = . enqueued( remaining: limit)
411
411
}
412
-
412
+
413
413
state. continuation = nil
414
414
unlock ( )
415
415
continuation. resume ( returning: value)
@@ -445,7 +445,7 @@ extension AsyncThrowingStream {
445
445
}
446
446
return result
447
447
}
448
-
448
+
449
449
func finish( throwing error: __owned Failure? = nil ) {
450
450
lock ( )
451
451
let handler = state. onTermination
@@ -510,7 +510,7 @@ extension AsyncThrowingStream {
510
510
fatalError ( " attempt to await next() on more than one task " )
511
511
}
512
512
}
513
-
513
+
514
514
func next( ) async throws -> Element ? {
515
515
try await withTaskCancellationHandler { [ cancel] in
516
516
cancel ( )
@@ -546,7 +546,7 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
546
546
private init ( _doNotCallMe: ( ) ) {
547
547
fatalError ( " _AsyncStreamCriticalStorage must be initialized by create " )
548
548
}
549
-
549
+
550
550
private func lock( ) {
551
551
let ptr =
552
552
UnsafeRawPointer ( Builtin . projectTailElems ( self , UnsafeRawPointer . self) )
@@ -558,15 +558,15 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
558
558
UnsafeRawPointer ( Builtin . projectTailElems ( self , UnsafeRawPointer . self) )
559
559
_unlock ( ptr)
560
560
}
561
-
561
+
562
562
var value : Contents {
563
563
get {
564
564
lock ( )
565
565
let contents = _value
566
566
unlock ( )
567
567
return contents
568
568
}
569
-
569
+
570
570
set {
571
571
lock ( )
572
572
withExtendedLifetime ( _value) {
@@ -575,7 +575,7 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
575
575
}
576
576
}
577
577
}
578
-
578
+
579
579
static func create( _ initial: Contents ) -> _AsyncStreamCriticalStorage {
580
580
let minimumCapacity = _lockWordCount ( )
581
581
let storage = Builtin . allocWithTailElems_1 (
0 commit comments