Skip to content

Commit a3bd32f

Browse files
authored
Merge pull request #17 from skiptools/bridgestream
Support code for bridging AsyncStream
2 parents 15a5ad4 + f07799e commit a3bd32f

File tree

1 file changed

+179
-18
lines changed

1 file changed

+179
-18
lines changed

Sources/SkipLib/AsyncStream.swift

Lines changed: 179 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66

77
#if SKIP
88

9+
import kotlin.coroutines.resume
10+
import kotlin.coroutines.resumeWithException
11+
import kotlin.coroutines.suspendCoroutine
912
import kotlinx.coroutines.launch
1013
import kotlinx.coroutines.channels.BufferOverflow
1114
import kotlinx.coroutines.channels.Channel
12-
import kotlinx.coroutines.flow.Flow
1315
import kotlinx.coroutines.flow.consumeAsFlow
1416
import kotlinx.coroutines.flow.flow
1517

16-
public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<Element>> where Element: Any {
18+
public final class AsyncStream<Element>: AsyncSequence, SwiftCustomBridged, KotlinConverting<kotlinx.coroutines.flow.Flow<Element>> where Element: Any {
1719
// SKIP NOWARN
1820
public final class Continuation<Element> : Sendable {
1921
// SKIP NOWARN
@@ -67,17 +69,15 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
6769
}
6870

6971
public func finish() {
70-
channel.close()
71-
if let onTermination {
72-
self.onTermination = nil
73-
onTermination(Termination.finished)
74-
}
72+
let _ = channel.close()
7573
}
7674

7775
public var onTermination: ((Termination) -> Void)?
7876
}
7977

8078
var continuation: Continuation<Element>? // Internal for makeStream()
79+
80+
public private(set) var swiftDataSource: AsyncStreamSwiftDataSource<Element>? // Bridging support
8181
private var producer: (() async -> Element?)?
8282
private var onCancel: (() -> Void)?
8383

@@ -100,7 +100,7 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
100100
self.onCancel = onCancel
101101
}
102102

103-
public init(flow: Flow<Element>, bufferingPolicy limit: Continuation.BufferingPolicy = Continuation.BufferingPolicy.unbounded) {
103+
public init(flow: kotlinx.coroutines.flow.Flow<Element>, bufferingPolicy limit: Continuation.BufferingPolicy = Continuation.BufferingPolicy.unbounded) {
104104
self.init(nil, limit, { continuation in
105105
Task {
106106
flow.collect { value in
@@ -111,6 +111,11 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
111111
})
112112
}
113113

114+
public init(swiftDataSource: AsyncStreamSwiftDataSource<Element>) {
115+
self.swiftDataSource = swiftDataSource
116+
self.producer = { await swiftDataSource.next() }
117+
}
118+
114119
public func makeAsyncIterator() -> Iterator<Element> {
115120
return Iterator<Element>(stream: self)
116121
}
@@ -136,7 +141,15 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
136141
withTaskCancellationHandler {
137142
if let channel = stream.continuation?.channel {
138143
let result = channel.receiveCatching()
139-
return result.getOrNull()
144+
if result.isClosed {
145+
if let onTermination = stream.continuation?.onTermination {
146+
stream.continuation?.onTermination = nil
147+
onTermination(Continuation.Termination.finished)
148+
}
149+
return nil
150+
} else {
151+
return result.getOrNull()
152+
}
140153
} else if let producer = stream.producer {
141154
return producer()
142155
} else {
@@ -159,7 +172,7 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
159172
}
160173
}
161174

162-
public override func kotlin(nocopy: Bool = false) -> Flow<Element> {
175+
public override func kotlin(nocopy: Bool = false) -> kotlinx.coroutines.flow.Flow<Element> {
163176
if let channel = continuation?.channel {
164177
return channel.consumeAsFlow()
165178
} else if let producer {
@@ -181,7 +194,7 @@ public final class AsyncStream<Element>: AsyncSequence, KotlinConverting<Flow<El
181194
// Unfortunately because of minor API differences between `AsyncStream` and `AsyncThrowingStream`, we can't
182195
// really share any code between them
183196

184-
public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinConverting<Flow<Element>> where Element: Any, Failure: Error {
197+
public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, SwiftCustomBridged, KotlinConverting<kotlinx.coroutines.flow.Flow<Element>> where Element: Any, Failure: Error {
185198
// SKIP NOWARN
186199
public final class Continuation<Element, Failure> : Sendable where Element: Any, Failure: Error {
187200
// SKIP NOWARN
@@ -205,6 +218,7 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
205218
}
206219

207220
let channel: Channel<Element>
221+
var terminationError: Failure?
208222

209223
public init(channel: Channel<Element>) {
210224
self.channel = channel
@@ -237,16 +251,15 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
237251

238252
public func finish(throwing error: Failure? = nil) {
239253
channel.close()
240-
if let onTermination {
241-
self.onTermination = nil
242-
onTermination(Termination.finished(error))
243-
}
254+
terminationError = error
244255
}
245256

246257
public var onTermination: ((Termination) -> Void)?
247258
}
248259

249260
var continuation: Continuation<Element, Failure>? // Internal for makeStream()
261+
262+
public private(set) var swiftDataSource: AsyncThrowingStreamSwiftDataSource<Element>? // Bridging support
250263
private var producer: (() async throws -> Element?)?
251264
private var onCancel: (() -> Void)?
252265

@@ -269,7 +282,7 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
269282
self.onCancel = onCancel
270283
}
271284

272-
public init(flow: Flow<Element>, bufferingPolicy limit: Continuation.BufferingPolicy = Continuation.BufferingPolicy.unbounded) {
285+
public init(flow: kotlinx.coroutines.flow.Flow<Element>, bufferingPolicy limit: Continuation.BufferingPolicy = Continuation.BufferingPolicy.unbounded) {
273286
self.init(nil, limit, { continuation in
274287
Task {
275288
do {
@@ -284,6 +297,11 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
284297
})
285298
}
286299

300+
public init(swiftDataSource: AsyncThrowingStreamSwiftDataSource<Element>) {
301+
self.swiftDataSource = swiftDataSource
302+
self.producer = { try await swiftDataSource.next() }
303+
}
304+
287305
public func makeAsyncIterator() -> Iterator<Element, Failure> {
288306
return Iterator<Element, Failure>(stream: self)
289307
}
@@ -311,7 +329,17 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
311329
if let channel = stream.continuation?.channel {
312330
let result = channel.receiveCatching()
313331
if result.isClosed {
314-
return nil
332+
let terminationError = stream.continuation?.terminationError
333+
stream.continuation?.terminationError = nil
334+
if let onTermination = stream.continuation?.onTermination {
335+
stream.continuation?.onTermination = nil
336+
onTermination(Continuation.Termination.finished(terminationError))
337+
}
338+
if let terminationError {
339+
throw terminationError as! Throwable
340+
} else {
341+
return nil
342+
}
315343
} else {
316344
return result.getOrThrow()
317345
}
@@ -337,7 +365,7 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
337365
}
338366
}
339367

340-
public override func kotlin(nocopy: Bool = false) -> Flow<Element> {
368+
public override func kotlin(nocopy: Bool = false) -> kotlinx.coroutines.flow.Flow<Element> {
341369
if let channel = continuation?.channel {
342370
return channel.consumeAsFlow()
343371
} else if let producer {
@@ -356,4 +384,137 @@ public final class AsyncThrowingStream<Element, Failure>: AsyncSequence, KotlinC
356384
}
357385
}
358386

387+
// MARK: Bridge Support
388+
389+
public final class AsyncStreamBridgingDataSource {
390+
private var stream: AsyncStream<Any>?
391+
private var throwingStream: AsyncThrowingStream<Any, Error>?
392+
private var flow: kotlinx.coroutines.flow.Flow<Any>?
393+
394+
public init(stream: AsyncStream<Any>) {
395+
self.stream = stream
396+
}
397+
398+
public init(stream: AsyncThrowingStream<Any, Error>) {
399+
self.throwingStream = stream
400+
}
401+
402+
public init(flow: kotlinx.coroutines.flow.Flow<Any>) {
403+
self.flow = flow
404+
}
405+
406+
public func collect(onNext: (Any) -> Void, onFinish: (Throwable?) -> Void) {
407+
Task {
408+
if let stream {
409+
let itr = stream.makeAsyncIterator()
410+
while let next = await itr.next() {
411+
onNext(next)
412+
}
413+
onFinish(nil)
414+
} else if let throwingStream {
415+
let itr = throwingStream.makeAsyncIterator()
416+
do {
417+
while let next = try await itr.next() {
418+
onNext(next)
419+
}
420+
onFinish(nil)
421+
} catch {
422+
onFinish(error as? Throwable)
423+
}
424+
} else if let flow {
425+
do {
426+
flow.collect {
427+
onNext($0)
428+
}
429+
onFinish(nil)
430+
} catch {
431+
onFinish(error as? Throwable)
432+
}
433+
}
434+
}
435+
}
436+
}
437+
438+
public final class AsyncStreamSwiftDataSource<Element> {
439+
public private(set) var Swift_producer: Int64
440+
441+
public init(Swift_producer: Int64) {
442+
self.Swift_producer = Swift_producer
443+
}
444+
445+
deinit {
446+
Swift_producer = Swift_release(Swift_producer)
447+
}
448+
449+
public func next() async throws -> Element? {
450+
suspendCoroutine { continuation in
451+
Swift_next(Swift_producer) { element in
452+
continuation.resume(element as? Element)
453+
}
454+
}
455+
}
456+
457+
public func asFlow() -> kotlinx.coroutines.flow.Flow<Element> {
458+
return flow {
459+
while true {
460+
if let value = next() {
461+
emit(value)
462+
} else {
463+
break
464+
}
465+
}
466+
}
467+
}
468+
469+
// @JvmName is needed for test cases, as the name is otherwise mangled to append "$SkipLib_debug"
470+
// SKIP INSERT: @JvmName("Swift_next")
471+
// SKIP EXTERN
472+
func Swift_next(Swift_producer: Int64, callback: (Any?) -> Void)
473+
// SKIP EXTERN
474+
func Swift_release(Swift_producer: Int64) -> Int64
475+
}
476+
477+
public final class AsyncThrowingStreamSwiftDataSource<Element> {
478+
public private(set) var Swift_producer: Int64
479+
480+
public init(Swift_producer: Int64) {
481+
self.Swift_producer = Swift_producer
482+
}
483+
484+
deinit {
485+
Swift_producer = Swift_release(Swift_producer: Swift_producer)
486+
}
487+
488+
public func next() async throws -> Element? {
489+
suspendCoroutine { continuation in
490+
Swift_next(Swift_producer) { element, error in
491+
if let error {
492+
continuation.resumeWithException(error)
493+
} else {
494+
continuation.resume(element as? Element)
495+
}
496+
}
497+
}
498+
}
499+
500+
public func asFlow() -> kotlinx.coroutines.flow.Flow<Element> {
501+
return flow {
502+
while true {
503+
if let value = next() {
504+
emit(value)
505+
} else {
506+
break
507+
}
508+
}
509+
}
510+
}
511+
512+
// @JvmName is needed for test cases, as the name is otherwise mangled to append "$SkipLib_debug"
513+
// SKIP INSERT: @JvmName("Swift_next")
514+
// SKIP EXTERN
515+
func Swift_next(Swift_producer: Int64, callback: (Any?, Throwable?) -> Void)
516+
// SKIP EXTERN
517+
func Swift_release(Swift_producer: Int64) -> Int64
518+
}
519+
359520
#endif

0 commit comments

Comments
 (0)