2424import io .reactivex .*;
2525import io .reactivex .exceptions .*;
2626import io .reactivex .functions .*;
27- import io .reactivex .internal .operators .flowable .FlowableConcatMap .WeakScalarSubscription ;
27+ import io .reactivex .internal .functions .Functions ;
28+ import io .reactivex .internal .operators .flowable .FlowableConcatMap .SimpleScalarSubscription ;
29+ import io .reactivex .processors .PublishProcessor ;
2830import io .reactivex .schedulers .Schedulers ;
2931import io .reactivex .subscribers .TestSubscriber ;
3032
@@ -33,7 +35,7 @@ public class FlowableConcatMapTest {
3335 @ Test
3436 public void weakSubscriptionRequest () {
3537 TestSubscriber <Integer > ts = new TestSubscriber <Integer >(0 );
36- WeakScalarSubscription <Integer > ws = new WeakScalarSubscription <Integer >(1 , ts );
38+ SimpleScalarSubscription <Integer > ws = new SimpleScalarSubscription <Integer >(1 , ts );
3739 ts .onSubscribe (ws );
3840
3941 ws .request (0 );
@@ -105,6 +107,68 @@ public Publisher<? extends Object> apply(String v)
105107 .assertResult ("RxSingleScheduler" );
106108 }
107109
110+ @ Test
111+ public void innerScalarRequestRace () {
112+ final Flowable <Integer > just = Flowable .just (1 );
113+ final int n = 1000 ;
114+ for (int i = 0 ; i < TestHelper .RACE_DEFAULT_LOOPS ; i ++) {
115+ final PublishProcessor <Flowable <Integer >> source = PublishProcessor .create ();
116+
117+ final TestSubscriber <Integer > ts = source
118+ .concatMap (Functions .<Flowable <Integer >>identity (), n + 1 )
119+ .test (1L );
120+
121+ TestHelper .race (new Runnable () {
122+ @ Override
123+ public void run () {
124+ for (int j = 0 ; j < n ; j ++) {
125+ source .onNext (just );
126+ }
127+ }
128+ }, new Runnable () {
129+ @ Override
130+ public void run () {
131+ for (int j = 0 ; j < n ; j ++) {
132+ ts .request (1 );
133+ }
134+ }
135+ });
136+
137+ ts .assertValueCount (n );
138+ }
139+ }
140+
141+ @ Test
142+ public void innerScalarRequestRaceDelayError () {
143+ final Flowable <Integer > just = Flowable .just (1 );
144+ final int n = 1000 ;
145+ for (int i = 0 ; i < TestHelper .RACE_DEFAULT_LOOPS ; i ++) {
146+ final PublishProcessor <Flowable <Integer >> source = PublishProcessor .create ();
147+
148+ final TestSubscriber <Integer > ts = source
149+ .concatMapDelayError (Functions .<Flowable <Integer >>identity (), n + 1 , true )
150+ .test (1L );
151+
152+ TestHelper .race (new Runnable () {
153+ @ Override
154+ public void run () {
155+ for (int j = 0 ; j < n ; j ++) {
156+ source .onNext (just );
157+ }
158+ }
159+ }, new Runnable () {
160+ @ Override
161+ public void run () {
162+ for (int j = 0 ; j < n ; j ++) {
163+ ts .request (1 );
164+ }
165+ }
166+ });
167+
168+ ts .assertValueCount (n );
169+ }
170+ }
171+
108172 @ Test
109173 public void pollThrows () {
110174 Flowable .just (1 )
0 commit comments