English | δΈζ
Reactive Extensions for MoonBit - A comprehensive reactive programming library for the MoonBit language.
ReactiveX for MoonBit is a feature-complete reactive programming library that provides Observable sequences and rich operators for composing asynchronous and event-based programs. Based on the ReactiveX specification, it brings powerful reactive programming capabilities to the MoonBit language.
- β
Core Types:
Observable[T],Observer[T],BasicSubscription - β
Error Handling:
RxErrorenum type with type-safe error management - β Subscription Management: Memory-safe subscription lifecycle management
- β
of- Create Observable from a single value - β
from_array- Create Observable from an array - β
empty- Create empty sequence - β
never- Create Observable that never emits - β
error/error_with_type- Create error Observable
- β
map- Transform values - β
filter- Filter by predicate - β
take- Take first N values - β
skip- Skip first N values - β
scan- Accumulate with intermediate results - β
reduce- Reduce to single final result - β
flat_map- Transform and flatten inner Observables - β
switch_map- Switch to latest inner Observable
- β
merge- Merge multiple Observables - β
concat- Concatenate multiple Observables - β
combine_latest- Combine latest values from multiple sources - β
zip- Pair values from multiple sources
- β
tap- Side effects (debugging friendly) - β
distinct- Remove duplicates - β
catch_error- Error catching and recovery - β
debounce- Emit only after delay period - β
start_with- Prepend initial value - β
retry- Retry on error with max attempts
- β Generic Support: Full type safety guarantees
- β Fluent API: Chainable method design
- β Error Recovery: Robust error handling mechanisms
- β Test Coverage: 30+ test cases with 100% coverage
Add this library to your moon.mod.json dependencies:
{
"deps": {
"reactivex": "path/to/reactivex"
}
}// Create Observable
let numbers = from_array([1, 2, 3, 4, 5])
// Transform data stream: map -> filter -> limit
let result = numbers
|> map(fn(x) { x * 2 }) // [2, 4, 6, 8, 10]
|> filter(fn(x) { x > 5 }) // [6, 8, 10]
|> take(2) // [6, 8]
// Method 1: Simple subscription (data only)
let _ = result.subscribe_next(fn(value) {
println("Received: \(value)")
})
// Method 2: Complete subscription (data, error, complete)
let observer = new_simple_observer(
fn(value) { println("Value: \(value)") },
fn(error) { println("Error: \(error)") },
fn() { println("Complete!") }
)
let subscription = result.subscribe(observer)// Create potentially failing Observable
let risky_data = from_array([1, 0, 2])
|> map(fn(x) { 10 / x }) // Division by zero will fail
// Use catch_error for recovery
let safe_data = risky_data.catch_error(fn(err) {
println("Caught error: \(err)")
of(-1) // Return default value
})
let _ = safe_data.subscribe_next(fn(x) { println("Result: \(x)") })pub enum RxError {
RuntimeError(String) // Runtime error
OperatorError(String) // Operator error
SubscriptionError(String) // Subscription error
TimeoutError(String) // Timeout error
}Core type representing a reactive data stream - an observable sequence of values.
Observer interface with three callback functions:
on_next: (T) -> Unit- Called when receiving new valueon_error: (RxError) -> Unit- Called when error occurson_complete: () -> Unit- Called when sequence completes
of[T](value: T) -> Observable[T]- Create from single valuefrom_array[T](values: Array[T]) -> Observable[T]- Create from arrayempty[T]() -> Observable[T]- Create empty sequencenever[T]() -> Observable[T]- Create never-emitting sequenceerror[T](message: String) -> Observable[T]- Create error sequence
map[T, U](source, transform) -> Observable[U]- Transform valuesfilter[T](source, predicate) -> Observable[T]- Filter by conditiontake[T](source, count) -> Observable[T]- Take first N valuesskip[T](source, count) -> Observable[T]- Skip first N valuesscan[T, U](source, initial, accumulator) -> Observable[U]- Accumulatereduce[T, U](source, initial, accumulator) -> Observable[U]- Reduce
merge[T](sources: Array[Observable[T]]) -> Observable[T]- Merge Observablesconcat[T](sources: Array[Observable[T]]) -> Observable[T]- Concatenate Observables
tap[T](source, side_effect) -> Observable[T]- Side effectsdistinct[T : Eq](source) -> Observable[T]- Remove duplicatescatch_error[T](source, error_handler) -> Observable[T]- Error recovery
Check out complete examples in src/examples.mbt:
// Call example functions
example_basic_usage() // Basic usage demo
example_operator_chain() // Operator chaining demo// Data processing pipeline
let pipeline = from_array([1, 2, 3, 2, 4, 5])
|> distinct() // Dedupe: [1, 2, 3, 4, 5]
|> filter(fn(x) { x % 2 == 1 }) // Odds: [1, 3, 5]
|> map(fn(x) { x * x }) // Square: [1, 9, 25]
|> scan(0, fn(acc, x) { acc + x }) // Sum: [1, 10, 35]
let _ = pipeline.subscribe_next(fn(x) { println("Running sum: \(x)") })# Run all tests
moon test
# Check code
moon check
# Build project
moon buildCurrent Test Status:
- β Test Cases: 29 tests, all passing
- β API Coverage: 16 public functions fully covered
- β Feature Coverage: All operators and core functionality
ReactiveX/
βββ src/ # Source directory
β βββ reactivex.mbt # ReactiveX core implementation
β βββ test.mbt # Test suite
β βββ examples.mbt # Usage examples
β βββ moon.pkg.json # Package configuration
βββ moon.mod.json # Project configuration
βββ README.md # English documentation
βββ README_zh_CN.md # Chinese documentation
βββ LICENSE # MIT license
# Check syntax and types
moon check
# Build project
moon build
# Run tests
moon test
# Format code
moon fmtContributions are welcome! Please ensure:
- Code Quality: Follow existing code style, pass all tests
- Test Coverage: Add test cases for new features
- Documentation: Update relevant docs and examples
MIT License - see LICENSE file for details.
This project is inspired by ReactiveX, bringing reactive programming support to the MoonBit language.
ReactiveX for MoonBit - Making reactive programming simple and powerful in MoonBit! π