Skip to content

Commit 404986d

Browse files
authored
Merge pull request #2 from Asterless/refactor
Refactors the implementation and the test
2 parents 1d4f6eb + bbcecbb commit 404986d

File tree

3 files changed

+118
-153
lines changed

3 files changed

+118
-153
lines changed

src/examples.mbt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ pub fn example_basic_usage() -> Unit {
99
// 1. 创建一个简单的Observable
1010
let simple_obs = of(42)
1111
let _ = simple_obs.subscribe_next(fn(value) {
12-
println("接收到值: {value}")
12+
println("接收到值: " + value.to_string())
1313
})
1414

1515
// 2. 创建数组Observable
1616
let array_obs = from_array([1, 2, 3])
1717
let _ = array_obs.subscribe_next(fn(value) {
18-
println("数组元素: {value}")
18+
println("数组元素: " + value.to_string())
1919
})
2020
println("=== 示例完成 ===")
2121
}
@@ -31,7 +31,7 @@ pub fn example_operator_chain() -> Unit {
3131
|> map(fn(x) { x * x }) // 平方
3232
|> take(3) // 取前3个
3333
let _ = processed.subscribe_next(fn(value) {
34-
println("处理后的值: {value}")
34+
println("处理后的值: " + value.to_string())
3535
})
3636
println("=== 示例完成 ===")
3737
}

src/reactivex.mbt

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ pub fn[T] take(source : Observable[T], count : Int) -> Observable[T] {
558558
fn(value) {
559559
if taken < count {
560560
(observer.on_next)(value)
561-
taken = taken + 1
561+
taken += 1
562562
if taken == count {
563563
(observer.on_complete)()
564564
}
@@ -597,7 +597,7 @@ pub fn[T] skip(source : Observable[T], count : Int) -> Observable[T] {
597597
if skipped >= count {
598598
(observer.on_next)(value)
599599
} else {
600-
skipped = skipped + 1
600+
skipped += 1
601601
}
602602
},
603603
observer.on_error,
@@ -713,8 +713,10 @@ pub fn[T] merge(sources : Array[Observable[T]]) -> Observable[T] {
713713
let mut completed_count = 0
714714
for i = 0; i < sources.length(); i = i + 1 {
715715
let source = sources[i]
716-
let subscription = source.subscribe(
717-
new_observer(observer.on_next, observer.on_error, fn() {
716+
let subscription = source.subscribe(new_observer(
717+
observer.on_next,
718+
observer.on_error,
719+
fn() {
718720
completed_count += 1
719721
if completed_count == sources.length() {
720722
(observer.on_complete)()
@@ -758,8 +760,10 @@ pub fn[T] concat(sources : Array[Observable[T]]) -> Observable[T] {
758760
fn subscribe_next() -> Unit {
759761
if current_index < sources.length() {
760762
let source = sources[current_index]
761-
current_subscription = source.subscribe(
762-
new_observer(observer.on_next, observer.on_error, fn() {
763+
current_subscription = source.subscribe(new_observer(
764+
observer.on_next,
765+
observer.on_error,
766+
fn() {
763767
current_index += 1
764768
subscribe_next()
765769
}),
@@ -929,10 +933,12 @@ pub fn[T, U] flat_map(
929933
930934
let original_observer = new_observer(
931935
fn(value) {
932-
active_subscriptions = active_subscriptions + 1
936+
active_subscriptions += 1
933937
let inner_observable = transform(value)
934-
let _ = inner_observable.subscribe(
935-
new_observer(observer.on_next, observer.on_error, fn() {
938+
let _ = inner_observable.subscribe(new_observer(
939+
observer.on_next,
940+
observer.on_error,
941+
fn() {
936942
active_subscriptions -= 1
937943
check_completion()
938944
}),
@@ -1043,7 +1049,7 @@ pub fn[T, U, V] combine_latest(
10431049
}
10441050
10451051
fn on_complete() -> Unit {
1046-
completed_count = completed_count + 1
1052+
completed_count += 1
10471053
if completed_count == 2 {
10481054
(observer.on_complete)()
10491055
}
@@ -1178,7 +1184,7 @@ pub fn[T] debounce(source : Observable[T], delay_count : Int) -> Observable[T] {
11781184
let mut last_value : T? = None
11791185
let original_observer = new_observer(
11801186
fn(value) {
1181-
counter = counter + 1
1187+
counter += 1
11821188
last_value = Some(value)
11831189
11841190
// 简化的防抖逻辑(计数器模拟时间延迟)
@@ -1261,21 +1267,18 @@ pub fn[T] retry(source : Observable[T], max_retries : Int) -> Observable[T] {
12611267
new_observable(fn(observer) {
12621268
let mut retry_count = 0
12631269
fn attempt_subscribe() -> BasicSubscription {
1264-
source.subscribe(
1265-
new_observer(
1266-
observer.on_next,
1267-
fn(error) {
1268-
if retry_count < max_retries {
1269-
retry_count += 1
1270-
let _ = attempt_subscribe()
1271-
1272-
} else {
1273-
(observer.on_error)(error)
1274-
}
1275-
},
1276-
observer.on_complete,
1277-
),
1278-
)
1270+
source.subscribe(new_observer(
1271+
observer.on_next,
1272+
fn(error) {
1273+
if retry_count < max_retries {
1274+
retry_count += 1
1275+
let _ = attempt_subscribe()
1276+
} else {
1277+
(observer.on_error)(error)
1278+
}
1279+
},
1280+
observer.on_complete
1281+
))
12791282
}
12801283
12811284
attempt_subscribe()

0 commit comments

Comments
 (0)