Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Test
jobs:
test:
name: Test
runs-on: macos-latest
runs-on: macos-11
strategy:
fail-fast: false
matrix:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# 6.7.0
# 6.7.0-rc1
1. Add Swift Concurrency extensions `asyncStream` and `asyncThrowingStream` to `Signal` and `SignalProducer` (#847)

1. New operator `SignalProducer.Type.interval(_:interval:on:)` for emitting elements from a given sequence regularly. (#810, kudos to @mluisbrown)

Expand Down
123 changes: 48 additions & 75 deletions ReactiveSwift.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions Sources/Signal+SwiftConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Signal+SwiftConcurrency.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//
#if compiler(>=5.5) && canImport(_Concurrency)
import Foundation

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension Signal {
public var asyncThrowingStream: AsyncThrowingStream<Value, Swift.Error> {
AsyncThrowingStream<Value, Swift.Error> { continuation in
let disposable = observe { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable termination in
disposable?.dispose()
}
}
}
}

extension Signal where Error == Never {
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
public var asyncStream: AsyncStream<Value> {
AsyncStream<Value> { continuation in
let disposable = observe { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed:
fatalError("Never is impossible to construct")
}
}
continuation.onTermination = { @Sendable termination in
disposable?.dispose()
}
}
}
}
#endif
52 changes: 52 additions & 0 deletions Sources/SignalProducer+SwiftConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// SignalProducer+SwiftConcurrency.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//
#if compiler(>=5.5) && canImport(_Concurrency)
import Foundation

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension SignalProducer {
public var asyncThrowingStream: AsyncThrowingStream<Value, Swift.Error> {
AsyncThrowingStream<Value, Swift.Error> { continuation in
let disposable = start { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension SignalProducer where Error == Never {
public var asyncStream: AsyncStream<Value> {
AsyncStream<Value> { continuation in
let disposable = start { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed:
fatalError("Never is impossible to construct")
}
}
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}
#endif
130 changes: 130 additions & 0 deletions Tests/ReactiveSwiftTests/SwiftConcurrencyTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//
// SwiftConcurrencyTests.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//

#if compiler(>=5.5) && canImport(_Concurrency)
import Foundation
import ReactiveSwift
import XCTest

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
class SwiftConcurrencyTests: XCTestCase {
func testValuesAsyncSignalProducer() async {
let values = [1,2,3]
var counter = 0
let asyncStream = SignalProducer(values).asyncStream
for await _ in asyncStream {
counter += 1
}
XCTAssertEqual(counter, 3)
}

func testValuesAsyncThrowingSignalProducer() async throws {
let values = [1,2,3]
var counter = 0
let asyncStream = SignalProducer(values).asyncThrowingStream
for try await _ in asyncStream {
counter += 1
}
XCTAssertEqual(counter, 3)
}

func testCompleteAsyncSignalProducer() async {
let asyncStream = SignalProducer<String, Never>.empty.asyncStream
let first = await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testCompleteAsyncThrowingSignalProducer() async throws {
let asyncStream = SignalProducer<String, Error>.empty.asyncThrowingStream
let first = try await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testErrorSignalProducer() async {
let error = NSError(domain: "domain", code: 0, userInfo: nil)
let asyncStream = SignalProducer<String, Error>(error: error).asyncThrowingStream
await XCTAssertThrowsError(try await asyncStream.first(where: { _ in true }))
}

func testValuesAsyncSignal() async {
let signal = Signal<Int, Never> { observer, _ in
Task {
for number in [1, 2, 3] {
observer.send(value: number)
}
observer.sendCompleted()
}
}
var counter = 0
let asyncStream = signal.asyncStream
for await _ in asyncStream {
counter += 1
}
XCTAssertEqual(counter, 3)
}

func testValuesAsyncThrowingSignal() async throws {
let signal = Signal<Int, Never> { observer, _ in
Task {
for number in [1, 2, 3] {
observer.send(value: number)
}
observer.sendCompleted()
}
}
var counter = 0
let asyncStream = signal.asyncThrowingStream
for try await _ in asyncStream {
counter += 1
}
XCTAssertEqual(counter, 3)
}

func testCompleteAsyncSignal() async {
let asyncStream = Signal<String, Never>.empty.asyncStream
let first = await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testCompleteAsyncThrowingSignal() async throws {
let asyncStream = Signal<String, Error>.empty.asyncThrowingStream
let first = try await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testErrorSignal() async {
let error = NSError(domain: "domain", code: 0, userInfo: nil)
let signal = Signal<String, Error> { observer, _ in
Task {
observer.send(error: error)
}
}
let asyncStream = signal.asyncThrowingStream
await XCTAssertThrowsError(try await asyncStream.first(where: { _ in true }))
}
}
// Extension to allow Throw assertion for async expressions
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
fileprivate extension XCTest {
func XCTAssertThrowsError<T: Sendable>(
_ expression: @autoclosure () async throws -> T,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath,
line: UInt = #line,
_ errorHandler: (_ error: Error) -> Void = { _ in }
) async {
do {
_ = try await expression()
XCTFail(message(), file: file, line: line)
} catch {
errorHandler(error)
}
}
}

#endif