@@ -372,13 +372,246 @@ pub fn[T : Eq] distinct(source : Observable[T]) -> Observable[T] {
372372/// catch_error 操作符 - 错误恢复
373373pub fn [T ] catch_error (source : Observable [T ], error_handler : (RxError ) -> Observable [T ]) -> Observable [T ] {
374374 new_observable (fn (observer ) {
375- source . subscribe ( new_observer (
375+ let original_observer = new_observer (
376376 observer .on_next,
377377 fn (error ) {
378378 let recovery_source = error_handler (error )
379379 let _ = recovery_source .subscribe (observer )
380380 },
381381 observer .on_complete
382+ )
383+ source .subscribe (original_observer )
384+ })
385+ }
386+
387+ // ===== 高级转换操作符 =====
388+
389+ /// flatMap 操作符 - 将每个元素转换为Observable并扁平化
390+ pub fn [T , U ] flat_map (source : Observable [T ], transform : (T ) -> Observable [U ]) -> Observable [U ] {
391+ new_observable (fn (observer ) {
392+ let mut active_subscriptions = 0
393+ let mut source_completed = false
394+
395+ fn check_completion () -> Unit {
396+ if source_completed && active_subscriptions == 0 {
397+ (observer .on_complete)()
398+ }
399+ }
400+
401+ let original_observer = new_observer (
402+ fn (value ) {
403+ active_subscriptions = active_subscriptions + 1
404+ let inner_observable = transform (value )
405+ let _ = inner_observable .subscribe (new_observer (
406+ observer .on_next,
407+ observer .on_error,
408+ fn () {
409+ active_subscriptions = active_subscriptions - 1
410+ check_completion ()
411+ }
412+ ))
413+ },
414+ observer .on_error,
415+ fn () {
416+ source_completed = true
417+ check_completion ()
418+ }
419+ )
420+ source .subscribe (original_observer )
421+ })
422+ }
423+
424+ /// switchMap 操作符 - 切换到最新的内部Observable
425+ pub fn [T , U ] switch_map (source : Observable [T ], transform : (T ) -> Observable [U ]) -> Observable [U ] {
426+ new_observable (fn (observer ) {
427+ let mut current_subscription : BasicSubscription = new_subscription ()
428+ let mut source_completed = false
429+ let mut inner_completed = false
430+
431+ fn check_completion () -> Unit {
432+ if source_completed && inner_completed {
433+ (observer .on_complete)()
434+ }
435+ }
436+
437+ let original_observer = new_observer (
438+ fn (value ) {
439+ // 取消之前的内部订阅
440+ current_subscription .unsubscribe ()
441+ inner_completed = false
442+
443+ let inner_observable = transform (value )
444+ current_subscription = inner_observable .subscribe (new_observer (
445+ observer .on_next,
446+ observer .on_error,
447+ fn () {
448+ inner_completed = true
449+ check_completion ()
450+ }
451+ ))
452+ },
453+ observer .on_error,
454+ fn () {
455+ source_completed = true
456+ check_completion ()
457+ }
458+ )
459+ source .subscribe (original_observer )
460+ })
461+ }
462+
463+ /// combineLatest 操作符 - 组合多个Observable的最新值
464+ pub fn [T , U , V ] combine_latest (
465+ obs1 : Observable [T ],
466+ obs2 : Observable [U ],
467+ combiner : (T , U ) -> V
468+ ) -> Observable [V ] {
469+ new_observable (fn (observer ) {
470+ let mut value1 : T? = None
471+ let mut value2 : U? = None
472+ let mut completed_count = 0
473+
474+ fn try_emit () -> Unit {
475+ match (value1 , value2 ) {
476+ (Some (v1 ), Some (v2 )) => (observer .on_next)(combiner (v1 , v2 ))
477+ _ => ()
478+ }
479+ }
480+
481+ fn on_complete () -> Unit {
482+ completed_count = completed_count + 1
483+ if completed_count == 2 {
484+ (observer .on_complete)()
485+ }
486+ }
487+
488+ let sub1 = obs1 .subscribe (new_observer (
489+ fn (v ) { value1 = Some (v ); try_emit () },
490+ observer .on_error,
491+ on_complete
382492 ))
493+
494+ let _ = obs2 .subscribe (new_observer (
495+ fn (v ) { value2 = Some (v ); try_emit () },
496+ observer .on_error,
497+ on_complete
498+ ))
499+
500+ sub1 // 返回其中一个订阅作为代表
501+ })
502+ }
503+
504+ /// zip 操作符 - 将两个Observable按顺序配对
505+ pub fn [T , U , V ] zip (
506+ obs1 : Observable [T ],
507+ obs2 : Observable [U ],
508+ combiner : (T , U ) -> V
509+ ) -> Observable [V ] {
510+ new_observable (fn (observer ) {
511+ let queue1 : Array [T ] = []
512+ let queue2 : Array [U ] = []
513+ let mut completed1 = false
514+ let mut completed2 = false
515+
516+ fn try_emit () -> Unit {
517+ while queue1 .length () > 0 && queue2 .length () > 0 {
518+ let v1 = queue1 [0 ]
519+ let v2 = queue2 [0 ]
520+ let _ = queue1 .remove (0 )
521+ let _ = queue2 .remove (0 )
522+ (observer .on_next)(combiner (v1 , v2 ))
523+ }
524+
525+ // 检查是否应该完成
526+ if (completed1 && queue1 .length () == 0 ) || (completed2 && queue2 .length () == 0 ) {
527+ (observer .on_complete)()
528+ }
529+ }
530+
531+ let sub1 = obs1 .subscribe (new_observer (
532+ fn (v ) { queue1 .push (v ); try_emit () },
533+ observer .on_error,
534+ fn () { completed1 = true ; try_emit () }
535+ ))
536+
537+ let _ = obs2 .subscribe (new_observer (
538+ fn (v ) { queue2 .push (v ); try_emit () },
539+ observer .on_error,
540+ fn () { completed2 = true ; try_emit () }
541+ ))
542+
543+ sub1
544+ })
545+ }
546+
547+ /// debounce 操作符 - 防抖动,只发射最后一个值
548+ pub fn [T ] debounce (source : Observable [T ], delay_count : Int ) -> Observable [T ] {
549+ new_observable (fn (observer ) {
550+ let mut counter = 0
551+ let mut last_value : T? = None
552+
553+ let original_observer = new_observer (
554+ fn (value ) {
555+ counter = counter + 1
556+ last_value = Some (value )
557+
558+ // 简化的防抖逻辑(计数器模拟时间延迟)
559+ // 在实际实现中,这里应该使用定时器
560+ // 这里用计数器简化演示
561+ if counter % delay_count == 0 {
562+ match last_value {
563+ Some (v ) => (observer .on_next)(v )
564+ None => ()
565+ }
566+ }
567+ },
568+ observer .on_error,
569+ fn () {
570+ // 发射最后一个值
571+ match last_value {
572+ Some (v ) => {
573+ (observer .on_next)(v )
574+ (observer .on_complete)()
575+ }
576+ None => (observer .on_complete)()
577+ }
578+ }
579+ )
580+ source .subscribe (original_observer )
581+ })
582+ }
583+
584+ /// startWith 操作符 - 在源Observable前添加值
585+ pub fn [T ] start_with (source : Observable [T ], initial_value : T ) -> Observable [T ] {
586+ new_observable (fn (observer ) {
587+ // 先发射初始值
588+ (observer .on_next)(initial_value )
589+
590+ // 然后订阅源Observable
591+ source .subscribe (observer )
592+ })
593+ }
594+
595+ /// retry 操作符 - 错误时重试
596+ pub fn [T ] retry (source : Observable [T ], max_retries : Int ) -> Observable [T ] {
597+ new_observable (fn (observer ) {
598+ let mut retry_count = 0
599+
600+ fn attempt_subscribe () -> BasicSubscription {
601+ source .subscribe (new_observer (
602+ observer .on_next,
603+ fn (error ) {
604+ if retry_count < max_retries {
605+ retry_count = retry_count + 1
606+ let _ = attempt_subscribe ()
607+ } else {
608+ (observer .on_error)(error )
609+ }
610+ },
611+ observer .on_complete
612+ ))
613+ }
614+
615+ attempt_subscribe ()
383616 })
384617}
0 commit comments