Skip to content

Commit d9dabab

Browse files
authored
2.: Fix flatMapX over-cancellation in case of an inner error (#4686)
1 parent 5209ba3 commit d9dabab

File tree

9 files changed

+93
-8
lines changed

9 files changed

+93
-8
lines changed

build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ jacoco {
115115
toolVersion = '0.7.7.201606060606' // See http://www.eclemma.org/jacoco/.
116116
}
117117

118+
task GCandMem(dependsOn: 'check') << {
119+
System.gc()
120+
Thread.sleep(200)
121+
print("Memory usage: ")
122+
println(java.lang.management.ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024.0 / 1024.0)
123+
}
124+
118125
jacocoTestReport {
119126
reports {
120127
xml.enabled = true
@@ -129,6 +136,8 @@ jacocoTestReport {
129136
}
130137
}
131138

139+
jacocoTestReport.dependsOn GCandMem
140+
132141
build.dependsOn jacocoTestReport
133142

134143
// pmd {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ void innerError(InnerObserver inner, Throwable e) {
228228
set.delete(inner);
229229
if (errors.addThrowable(e)) {
230230
if (!delayErrors) {
231-
cancel();
231+
s.cancel();
232+
set.dispose();
232233
}
233234
active.decrementAndGet();
234235
drain();

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ void innerError(InnerObserver inner, Throwable e) {
228228
set.delete(inner);
229229
if (errors.addThrowable(e)) {
230230
if (!delayErrors) {
231-
cancel();
231+
s.cancel();
232+
set.dispose();
232233
}
233234
active.decrementAndGet();
234235
drain();

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ void innerError(InnerObserver inner, Throwable e) {
195195
set.delete(inner);
196196
if (errors.addThrowable(e)) {
197197
if (!delayErrors) {
198-
dispose();
198+
d.dispose();
199+
set.dispose();
199200
}
200201
active.decrementAndGet();
201202
drain();

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ void innerError(InnerObserver inner, Throwable e) {
195195
set.delete(inner);
196196
if (errors.addThrowable(e)) {
197197
if (!delayErrors) {
198-
dispose();
198+
d.dispose();
199+
set.dispose();
199200
}
200201
active.decrementAndGet();
201202
drain();

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19-
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.*;
2020

2121
import org.junit.Test;
2222

@@ -253,4 +253,22 @@ public MaybeSource<Integer> apply(Integer v) throws Exception {
253253
.test()
254254
.assertResult(1, 2);
255255
}
256+
257+
@Test
258+
public void middleError() {
259+
Flowable.fromArray(new String[]{"1","a","2"}).flatMapMaybe(new Function<String, MaybeSource<Integer>>() {
260+
@Override
261+
public MaybeSource<Integer> apply(final String s) throws NumberFormatException {
262+
//return Single.just(Integer.valueOf(s)); //This works
263+
return Maybe.fromCallable(new Callable<Integer>() {
264+
@Override
265+
public Integer call() throws NumberFormatException {
266+
return Integer.valueOf(s);
267+
}
268+
});
269+
}
270+
})
271+
.test()
272+
.assertFailure(NumberFormatException.class, 1);
273+
}
256274
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19-
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.*;
2020

2121
import org.junit.Test;
2222

@@ -240,4 +240,22 @@ public SingleSource<Integer> apply(Integer v) throws Exception {
240240
.test()
241241
.assertResult(1, 2);
242242
}
243+
244+
@Test
245+
public void middleError() {
246+
Flowable.fromArray(new String[]{"1","a","2"}).flatMapSingle(new Function<String, SingleSource<Integer>>() {
247+
@Override
248+
public SingleSource<Integer> apply(final String s) throws NumberFormatException {
249+
//return Single.just(Integer.valueOf(s)); //This works
250+
return Single.fromCallable(new Callable<Integer>() {
251+
@Override
252+
public Integer call() throws NumberFormatException {
253+
return Integer.valueOf(s);
254+
}
255+
});
256+
}
257+
})
258+
.test()
259+
.assertFailure(NumberFormatException.class, 1);
260+
}
243261
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19-
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.*;
2020

2121
import org.junit.Test;
2222

@@ -181,4 +181,22 @@ public MaybeSource<Integer> apply(Integer v) throws Exception {
181181
.test()
182182
.assertResult(1, 2);
183183
}
184+
185+
@Test
186+
public void middleError() {
187+
Observable.fromArray(new String[]{"1","a","2"}).flatMapMaybe(new Function<String, MaybeSource<Integer>>() {
188+
@Override
189+
public MaybeSource<Integer> apply(final String s) throws NumberFormatException {
190+
//return Single.just(Integer.valueOf(s)); //This works
191+
return Maybe.fromCallable(new Callable<Integer>() {
192+
@Override
193+
public Integer call() throws NumberFormatException {
194+
return Integer.valueOf(s);
195+
}
196+
});
197+
}
198+
})
199+
.test()
200+
.assertFailure(NumberFormatException.class, 1);
201+
}
184202
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19-
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.*;
2020

2121
import org.junit.Test;
2222

@@ -168,4 +168,22 @@ public SingleSource<Integer> apply(Integer v) throws Exception {
168168
.test()
169169
.assertResult(1, 2);
170170
}
171+
172+
@Test
173+
public void middleError() {
174+
Observable.fromArray(new String[]{"1","a","2"}).flatMapSingle(new Function<String, SingleSource<Integer>>() {
175+
@Override
176+
public SingleSource<Integer> apply(final String s) throws NumberFormatException {
177+
//return Single.just(Integer.valueOf(s)); //This works
178+
return Single.fromCallable(new Callable<Integer>() {
179+
@Override
180+
public Integer call() throws NumberFormatException {
181+
return Integer.valueOf(s);
182+
}
183+
});
184+
}
185+
})
186+
.test()
187+
.assertFailure(NumberFormatException.class, 1);
188+
}
171189
}

0 commit comments

Comments
 (0)