1010import reactor .core .publisher .Operators ;
1111
1212public final class SwitchTransform <T , R > extends Flux <R > {
13-
14- final Publisher <? extends T > source ;
15- final BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ;
16-
17- public SwitchTransform (
18- Publisher <? extends T > source , BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ) {
19- this .source = Objects .requireNonNull (source , "source" );
20- this .transformer = Objects .requireNonNull (transformer , "transformer" );
21- }
22-
23- @ Override
24- public void subscribe (CoreSubscriber <? super R > actual ) {
25- Flux .from (source ).subscribe (new SwitchTransformSubscriber <>(actual , transformer ));
26- }
27-
28- static final class SwitchTransformSubscriber <T , R > implements CoreSubscriber <T > {
29- @ SuppressWarnings ("rawtypes" )
30- static final AtomicIntegerFieldUpdater <SwitchTransformSubscriber > ONCE =
31- AtomicIntegerFieldUpdater .newUpdater (SwitchTransformSubscriber .class , "once" );
32-
33- final CoreSubscriber <? super R > actual ;
13+
14+ final Publisher <? extends T > source ;
3415 final BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ;
35- final UnboundedProcessor <T > processor = new UnboundedProcessor <>();
36- Subscription s ;
37- volatile int once ;
38-
39- SwitchTransformSubscriber (
40- CoreSubscriber <? super R > actual ,
41- BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ) {
42- this .actual = actual ;
43- this .transformer = transformer ;
16+
17+ public SwitchTransform (
18+ Publisher <? extends T > source , BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ) {
19+ this .source = Objects .requireNonNull (source , "source" );
20+ this .transformer = Objects .requireNonNull (transformer , "transformer" );
4421 }
45-
22+
4623 @ Override
47- public void onSubscribe (Subscription s ) {
48- if (Operators .validate (this .s , s )) {
49- this .s = s ;
50- processor .onSubscribe (s );
51- }
24+ public void subscribe (CoreSubscriber <? super R > actual ) {
25+ Flux .from (source ).subscribe (new SwitchTransformSubscriber <>(actual , transformer ));
5226 }
53-
54- @ Override
55- public void onNext (T t ) {
56- if (once == 0 && ONCE .compareAndSet (this , 0 , 1 )) {
57- try {
58- Publisher <? extends R > result =
59- Objects .requireNonNull (
60- transformer .apply (t , processor ), "The transformer returned a null value" );
61- Flux .from (result ).subscribe (actual );
62- } catch (Throwable e ) {
63- onError (Operators .onOperatorError (s , e , t , actual .currentContext ()));
64- return ;
27+
28+ static final class SwitchTransformSubscriber <T , R > implements CoreSubscriber <T > {
29+ @ SuppressWarnings ("rawtypes" )
30+ static final AtomicIntegerFieldUpdater <SwitchTransformSubscriber > ONCE =
31+ AtomicIntegerFieldUpdater .newUpdater (SwitchTransformSubscriber .class , "once" );
32+
33+ final CoreSubscriber <? super R > actual ;
34+ final BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ;
35+ final UnboundedProcessor <T > processor = new UnboundedProcessor <>();
36+ Subscription s ;
37+ volatile int once ;
38+
39+ SwitchTransformSubscriber (
40+ CoreSubscriber <? super R > actual ,
41+ BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ) {
42+ this .actual = actual ;
43+ this .transformer = transformer ;
44+ }
45+
46+ @ Override
47+ public void onSubscribe (Subscription s ) {
48+ if (Operators .validate (this .s , s )) {
49+ this .s = s ;
50+ processor .onSubscribe (s );
51+ }
52+ }
53+
54+ @ Override
55+ public void onNext (T t ) {
56+ if (once == 0 && ONCE .compareAndSet (this , 0 , 1 )) {
57+ try {
58+ Publisher <? extends R > result =
59+ Objects .requireNonNull (
60+ transformer .apply (t , processor ), "The transformer returned a null value" );
61+ Flux .from (result ).subscribe (actual );
62+ } catch (Throwable e ) {
63+ onError (Operators .onOperatorError (s , e , t , actual .currentContext ()));
64+ return ;
65+ }
66+ }
67+ processor .onNext (t );
68+ }
69+
70+ @ Override
71+ public void onError (Throwable t ) {
72+ processor .onError (t );
73+ }
74+
75+ @ Override
76+ public void onComplete () {
77+ processor .onComplete ();
6578 }
66- }
67- processor .onNext (t );
68- }
69-
70- @ Override
71- public void onError (Throwable t ) {
72- processor .onError (t );
73- }
74-
75- @ Override
76- public void onComplete () {
77- processor .onComplete ();
7879 }
79- }
80- }
80+ }
0 commit comments