1+ /*
2+ * Copyright 2015-2018 the original author or authors.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package io .rsocket .internal ;
18+
19+ import java .util .Objects ;
20+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
21+ import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
22+ import java .util .function .BiFunction ;
23+
24+ import io .netty .util .ReferenceCountUtil ;
25+ import org .reactivestreams .Publisher ;
26+ import org .reactivestreams .Subscription ;
27+ import reactor .core .CoreSubscriber ;
28+ import reactor .core .Scannable ;
29+ import reactor .core .publisher .Flux ;
30+ import reactor .core .publisher .Operators ;
31+ import reactor .util .annotation .Nullable ;
32+
33+ public final class SwitchTransformFlux <T , R > extends Flux <R > {
34+
35+ final Publisher <? extends T > source ;
36+ final BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ;
37+
38+ public SwitchTransformFlux (
39+ Publisher <? extends T > source , BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ) {
40+ this .source = Objects .requireNonNull (source , "source" );
41+ this .transformer = Objects .requireNonNull (transformer , "transformer" );
42+ }
43+
44+ @ Override
45+ public int getPrefetch () {
46+ return 1 ;
47+ }
48+
49+ @ Override
50+ public void subscribe (CoreSubscriber <? super R > actual ) {
51+ source .subscribe (new SwitchTransformMain <>(actual , transformer ));
52+ }
53+
54+ static final class SwitchTransformMain <T , R > implements CoreSubscriber <T >, Scannable {
55+
56+ final CoreSubscriber <? super R > actual ;
57+ final BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ;
58+ final SwitchTransformInner <T > inner ;
59+
60+ Subscription s ;
61+
62+ volatile int once ;
63+ @ SuppressWarnings ("rawtypes" )
64+ static final AtomicIntegerFieldUpdater <SwitchTransformMain > ONCE =
65+ AtomicIntegerFieldUpdater .newUpdater (SwitchTransformMain .class , "once" );
66+
67+ SwitchTransformMain (
68+ CoreSubscriber <? super R > actual ,
69+ BiFunction <T , Flux <T >, Publisher <? extends R >> transformer
70+ ) {
71+ this .actual = actual ;
72+ this .transformer = transformer ;
73+ this .inner = new SwitchTransformInner <>(this );
74+ }
75+
76+ @ Override
77+ @ Nullable
78+ public Object scanUnsafe (Attr key ) {
79+ if (key == Attr .CANCELLED ) return s == Operators .cancelledSubscription ();
80+ if (key == Attr .PREFETCH ) return 1 ;
81+
82+ return null ;
83+ }
84+
85+ @ Override
86+ public void onSubscribe (Subscription s ) {
87+ if (Operators .validate (this .s , s )) {
88+ this .s = s ;
89+ s .request (1 );
90+ }
91+ }
92+
93+ @ Override
94+ public void onNext (T t ) {
95+ if (isCanceled ()) {
96+ return ;
97+ }
98+
99+ if (once == 0 && ONCE .compareAndSet (this , 0 , 1 )) {
100+ try {
101+ inner .first = t ;
102+ Publisher <? extends R > result =
103+ Objects .requireNonNull (transformer .apply (t , inner ), "The transformer returned a null value" );
104+ result .subscribe (actual );
105+ return ;
106+ } catch (Throwable e ) {
107+ onError (Operators .onOperatorError (s , e , t , actual .currentContext ()));
108+ ReferenceCountUtil .safeRelease (t );
109+ return ;
110+ }
111+ }
112+
113+ inner .onNext (t );
114+ }
115+
116+ @ Override
117+ public void onError (Throwable t ) {
118+ if (isCanceled ()) {
119+ return ;
120+ }
121+
122+ if (once != 0 ) {
123+ inner .onError (t );
124+ } else {
125+ actual .onSubscribe (Operators .emptySubscription ());
126+ actual .onError (t );
127+ }
128+ }
129+
130+ @ Override
131+ public void onComplete () {
132+ if (isCanceled ()) {
133+ return ;
134+ }
135+
136+ if (once != 0 ) {
137+ inner .onComplete ();
138+ } else {
139+ actual .onSubscribe (Operators .emptySubscription ());
140+ actual .onComplete ();
141+ }
142+ }
143+
144+ boolean isCanceled () {
145+ return s == Operators .cancelledSubscription ();
146+ }
147+
148+ void cancel () {
149+ s .cancel ();
150+ s = Operators .cancelledSubscription ();
151+ }
152+ }
153+
154+ static final class SwitchTransformInner <V > extends Flux <V >
155+ implements Scannable , Subscription {
156+
157+ final SwitchTransformMain <V , ?> parent ;
158+
159+ volatile CoreSubscriber <? super V > actual ;
160+ @ SuppressWarnings ("rawtypes" )
161+ static final AtomicReferenceFieldUpdater <SwitchTransformInner , CoreSubscriber > ACTUAL =
162+ AtomicReferenceFieldUpdater .newUpdater (SwitchTransformInner .class , CoreSubscriber .class , "actual" );
163+
164+ volatile V first ;
165+ @ SuppressWarnings ("rawtypes" )
166+ static final AtomicReferenceFieldUpdater <SwitchTransformInner , Object > FIRST =
167+ AtomicReferenceFieldUpdater .newUpdater (SwitchTransformInner .class , Object .class , "first" );
168+
169+ volatile int once ;
170+ @ SuppressWarnings ("rawtypes" )
171+ static final AtomicIntegerFieldUpdater <SwitchTransformInner > ONCE =
172+ AtomicIntegerFieldUpdater .newUpdater (SwitchTransformInner .class , "once" );
173+
174+ SwitchTransformInner (SwitchTransformMain <V , ?> parent ) {
175+ this .parent = parent ;
176+ }
177+
178+ public void onNext (V t ) {
179+ CoreSubscriber <? super V > a = actual ;
180+
181+ if (a != null ) {
182+ a .onNext (t );
183+ }
184+ }
185+
186+ public void onError (Throwable t ) {
187+ CoreSubscriber <? super V > a = actual ;
188+
189+ if (a != null ) {
190+ a .onError (t );
191+ }
192+ }
193+
194+ public void onComplete () {
195+ CoreSubscriber <? super V > a = actual ;
196+
197+ if (a != null ) {
198+ a .onComplete ();
199+ }
200+ }
201+
202+ @ Override
203+ public void subscribe (CoreSubscriber <? super V > actual ) {
204+ if (once == 0 && ONCE .compareAndSet (this , 0 , 1 )) {
205+ ACTUAL .lazySet (this , actual );
206+ actual .onSubscribe (this );
207+ }
208+ else {
209+ actual .onError (new IllegalStateException ("SwitchTransform allows only one Subscriber" ));
210+ }
211+ }
212+
213+ @ Override
214+ public void request (long n ) {
215+ V f = first ;
216+
217+ if (f != null && FIRST .compareAndSet (this , f , null )) {
218+ actual .onNext (f );
219+
220+ long r = Operators .addCap (n , -1 );
221+ if (r > 0 ) {
222+ parent .s .request (r );
223+ }
224+ } else {
225+ parent .s .request (n );
226+ }
227+ }
228+
229+ @ Override
230+ public void cancel () {
231+ actual = null ;
232+ first = null ;
233+ parent .cancel ();
234+ }
235+
236+ @ Override
237+ @ Nullable
238+ public Object scanUnsafe (Attr key ) {
239+ if (key == Attr .PARENT ) return parent ;
240+ if (key == Attr .ACTUAL ) return actual ();
241+
242+ return null ;
243+ }
244+
245+ public CoreSubscriber <? super V > actual () {
246+ return actual ;
247+ }
248+ }
249+ }
0 commit comments