66
77#if SKIP
88
9+ import kotlin. coroutines. resume
10+ import kotlin. coroutines. resumeWithException
11+ import kotlin. coroutines. suspendCoroutine
912import kotlinx. coroutines. launch
1013import kotlinx. coroutines. channels. BufferOverflow
1114import kotlinx. coroutines. channels. Channel
12- import kotlinx. coroutines. flow. Flow
1315import kotlinx. coroutines. flow. consumeAsFlow
1416import kotlinx. coroutines. flow. flow
1517
16- public final class AsyncStream < Element> : AsyncSequence , KotlinConverting < Flow < Element > > where Element: Any {
18+ public final class AsyncStream < Element> : AsyncSequence , SwiftCustomBridged , KotlinConverting < kotlinx . coroutines . flow . Flow < Element > > where Element: Any {
1719 // SKIP NOWARN
1820 public final class Continuation < Element> : Sendable {
1921 // SKIP NOWARN
@@ -67,17 +69,15 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
6769 }
6870
6971 public func finish( ) {
70- channel. close ( )
71- if let onTermination {
72- self . onTermination = nil
73- onTermination ( Termination . finished)
74- }
72+ let _ = channel. close ( )
7573 }
7674
7775 public var onTermination : ( ( Termination ) -> Void ) ?
7876 }
7977
8078 var continuation : Continuation < Element > ? // Internal for makeStream()
79+
80+ public private( set) var swiftDataSource : AsyncStreamSwiftDataSource < Element > ? // Bridging support
8181 private var producer : ( ( ) async -> Element ? ) ?
8282 private var onCancel : ( ( ) -> Void ) ?
8383
@@ -100,7 +100,7 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
100100 self . onCancel = onCancel
101101 }
102102
103- public init ( flow: Flow < Element > , bufferingPolicy limit: Continuation . BufferingPolicy = Continuation . BufferingPolicy. unbounded) {
103+ public init ( flow: kotlinx . coroutines . flow . Flow < Element > , bufferingPolicy limit: Continuation . BufferingPolicy = Continuation . BufferingPolicy. unbounded) {
104104 self . init ( nil , limit, { continuation in
105105 Task {
106106 flow. collect { value in
@@ -111,6 +111,11 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
111111 } )
112112 }
113113
114+ public init ( swiftDataSource: AsyncStreamSwiftDataSource < Element > ) {
115+ self . swiftDataSource = swiftDataSource
116+ self . producer = { await swiftDataSource. next ( ) }
117+ }
118+
114119 public func makeAsyncIterator( ) -> Iterator < Element > {
115120 return Iterator < Element > ( stream: self )
116121 }
@@ -136,7 +141,15 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
136141 withTaskCancellationHandler {
137142 if let channel = stream. continuation? . channel {
138143 let result = channel. receiveCatching ( )
139- return result. getOrNull ( )
144+ if result. isClosed {
145+ if let onTermination = stream. continuation? . onTermination {
146+ stream. continuation? . onTermination = nil
147+ onTermination ( Continuation . Termination. finished)
148+ }
149+ return nil
150+ } else {
151+ return result. getOrNull ( )
152+ }
140153 } else if let producer = stream. producer {
141154 return producer ( )
142155 } else {
@@ -159,7 +172,7 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
159172 }
160173 }
161174
162- public override func kotlin( nocopy: Bool = false ) -> Flow < Element > {
175+ public override func kotlin( nocopy: Bool = false ) -> kotlinx . coroutines . flow . Flow < Element > {
163176 if let channel = continuation? . channel {
164177 return channel. consumeAsFlow ( )
165178 } else if let producer {
@@ -181,7 +194,7 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
181194// Unfortunately because of minor API differences between `AsyncStream` and `AsyncThrowingStream`, we can't
182195// really share any code between them
183196
184- public final class AsyncThrowingStream < Element, Failure> : AsyncSequence , KotlinConverting < Flow < Element > > where Element: Any , Failure: Error {
197+ public final class AsyncThrowingStream < Element, Failure> : AsyncSequence , SwiftCustomBridged , KotlinConverting < kotlinx . coroutines . flow . Flow < Element > > where Element: Any , Failure: Error {
185198 // SKIP NOWARN
186199 public final class Continuation < Element, Failure> : Sendable where Element: Any , Failure: Error {
187200 // SKIP NOWARN
@@ -205,6 +218,7 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
205218 }
206219
207220 let channel : Channel < Element >
221+ var terminationError : Failure ?
208222
209223 public init ( channel: Channel < Element > ) {
210224 self . channel = channel
@@ -237,16 +251,15 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
237251
238252 public func finish( throwing error: Failure ? = nil ) {
239253 channel. close ( )
240- if let onTermination {
241- self . onTermination = nil
242- onTermination ( Termination . finished ( error) )
243- }
254+ terminationError = error
244255 }
245256
246257 public var onTermination : ( ( Termination ) -> Void ) ?
247258 }
248259
249260 var continuation : Continuation < Element , Failure > ? // Internal for makeStream()
261+
262+ public private( set) var swiftDataSource : AsyncThrowingStreamSwiftDataSource < Element > ? // Bridging support
250263 private var producer : ( ( ) async throws -> Element ? ) ?
251264 private var onCancel : ( ( ) -> Void ) ?
252265
@@ -269,7 +282,7 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
269282 self . onCancel = onCancel
270283 }
271284
272- public init ( flow: Flow < Element > , bufferingPolicy limit: Continuation . BufferingPolicy = Continuation . BufferingPolicy. unbounded) {
285+ public init ( flow: kotlinx . coroutines . flow . Flow < Element > , bufferingPolicy limit: Continuation . BufferingPolicy = Continuation . BufferingPolicy. unbounded) {
273286 self . init ( nil , limit, { continuation in
274287 Task {
275288 do {
@@ -284,6 +297,11 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
284297 } )
285298 }
286299
300+ public init ( swiftDataSource: AsyncThrowingStreamSwiftDataSource < Element > ) {
301+ self . swiftDataSource = swiftDataSource
302+ self . producer = { try await swiftDataSource. next ( ) }
303+ }
304+
287305 public func makeAsyncIterator( ) -> Iterator < Element , Failure > {
288306 return Iterator < Element , Failure > ( stream: self )
289307 }
@@ -311,7 +329,17 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
311329 if let channel = stream. continuation? . channel {
312330 let result = channel. receiveCatching ( )
313331 if result. isClosed {
314- return nil
332+ let terminationError = stream. continuation? . terminationError
333+ stream. continuation? . terminationError = nil
334+ if let onTermination = stream. continuation? . onTermination {
335+ stream. continuation? . onTermination = nil
336+ onTermination ( Continuation . Termination. finished ( terminationError) )
337+ }
338+ if let terminationError {
339+ throw terminationError as! Throwable
340+ } else {
341+ return nil
342+ }
315343 } else {
316344 return result. getOrThrow ( )
317345 }
@@ -337,7 +365,7 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
337365 }
338366 }
339367
340- public override func kotlin( nocopy: Bool = false ) -> Flow < Element > {
368+ public override func kotlin( nocopy: Bool = false ) -> kotlinx . coroutines . flow . Flow < Element > {
341369 if let channel = continuation? . channel {
342370 return channel. consumeAsFlow ( )
343371 } else if let producer {
@@ -356,4 +384,137 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
356384 }
357385}
358386
387+ // MARK: Bridge Support
388+
389+ public final class AsyncStreamBridgingDataSource {
390+ private var stream : AsyncStream < Any > ?
391+ private var throwingStream : AsyncThrowingStream < Any , Error > ?
392+ private var flow : kotlinx . coroutines . flow . Flow < Any > ?
393+
394+ public init ( stream: AsyncStream < Any > ) {
395+ self . stream = stream
396+ }
397+
398+ public init ( stream: AsyncThrowingStream < Any , Error > ) {
399+ self . throwingStream = stream
400+ }
401+
402+ public init ( flow: kotlinx . coroutines . flow . Flow < Any > ) {
403+ self . flow = flow
404+ }
405+
406+ public func collect( onNext: ( Any ) -> Void , onFinish: ( Throwable ? ) -> Void ) {
407+ Task {
408+ if let stream {
409+ let itr = stream. makeAsyncIterator ( )
410+ while let next = await itr. next ( ) {
411+ onNext ( next)
412+ }
413+ onFinish ( nil )
414+ } else if let throwingStream {
415+ let itr = throwingStream. makeAsyncIterator ( )
416+ do {
417+ while let next = try await itr. next ( ) {
418+ onNext ( next)
419+ }
420+ onFinish( nil )
421+ } catch {
422+ onFinish ( error as? Throwable )
423+ }
424+ } else if let flow {
425+ do {
426+ flow. collect {
427+ onNext ( $0)
428+ }
429+ onFinish ( nil )
430+ } catch {
431+ onFinish ( error as? Throwable )
432+ }
433+ }
434+ }
435+ }
436+ }
437+
438+ public final class AsyncStreamSwiftDataSource < Element> {
439+ public private( set) var Swift_producer : Int64
440+
441+ public init ( Swift_producer: Int64 ) {
442+ self . Swift_producer = Swift_producer
443+ }
444+
445+ deinit {
446+ Swift_producer = Swift_release ( Swift_producer)
447+ }
448+
449+ public func next( ) async throws -> Element ? {
450+ suspendCoroutine { continuation in
451+ Swift_next ( Swift_producer) { element in
452+ continuation. resume ( element as? Element )
453+ }
454+ }
455+ }
456+
457+ public func asFlow( ) -> kotlinx . coroutines . flow . Flow < Element > {
458+ return flow {
459+ while true {
460+ if let value = next ( ) {
461+ emit ( value)
462+ } else {
463+ break
464+ }
465+ }
466+ }
467+ }
468+
469+ // @JvmName is needed for test cases, as the name is otherwise mangled to append "$SkipLib_debug"
470+ // SKIP INSERT: @JvmName("Swift_next")
471+ // SKIP EXTERN
472+ func Swift_next( Swift_producer: Int64 , callback: ( Any ? ) -> Void )
473+ // SKIP EXTERN
474+ func Swift_release( Swift_producer: Int64 ) -> Int64
475+ }
476+
477+ public final class AsyncThrowingStreamSwiftDataSource < Element> {
478+ public private( set) var Swift_producer : Int64
479+
480+ public init ( Swift_producer: Int64 ) {
481+ self . Swift_producer = Swift_producer
482+ }
483+
484+ deinit {
485+ Swift_producer = Swift_release ( Swift_producer: Swift_producer)
486+ }
487+
488+ public func next( ) async throws -> Element ? {
489+ suspendCoroutine { continuation in
490+ Swift_next ( Swift_producer) { element, error in
491+ if let error {
492+ continuation. resumeWithException ( error)
493+ } else {
494+ continuation. resume ( element as? Element )
495+ }
496+ }
497+ }
498+ }
499+
500+ public func asFlow( ) -> kotlinx . coroutines . flow . Flow < Element > {
501+ return flow {
502+ while true {
503+ if let value = next ( ) {
504+ emit ( value)
505+ } else {
506+ break
507+ }
508+ }
509+ }
510+ }
511+
512+ // @JvmName is needed for test cases, as the name is otherwise mangled to append "$SkipLib_debug"
513+ // SKIP INSERT: @JvmName("Swift_next")
514+ // SKIP EXTERN
515+ func Swift_next( Swift_producer: Int64 , callback: ( Any ? , Throwable ? ) -> Void )
516+ // SKIP EXTERN
517+ func Swift_release( Swift_producer: Int64 ) -> Int64
518+ }
519+
359520#endif
0 commit comments