1515 */
1616package rx .observers ;
1717
18- import static org .junit .Assert .*;
19- import static org .mockito .Matchers .*;
20- import static org .mockito .Mockito .*;
18+ import static org .junit .Assert .assertEquals ;
19+ import static org .junit .Assert .assertFalse ;
20+ import static org .junit .Assert .assertTrue ;
21+ import static org .junit .Assert .fail ;
22+ import static org .mockito .Matchers .any ;
23+ import static org .mockito .Mockito .mock ;
24+ import static org .mockito .Mockito .never ;
25+ import static org .mockito .Mockito .times ;
26+ import static org .mockito .Mockito .verify ;
2127
2228import java .util .concurrent .CountDownLatch ;
2329import java .util .concurrent .ExecutorService ;
2834import java .util .concurrent .atomic .AtomicInteger ;
2935
3036import org .junit .Before ;
37+ import org .junit .Ignore ;
3138import org .junit .Test ;
3239import org .mockito .Mock ;
3340import org .mockito .MockitoAnnotations ;
3441
3542import rx .Observable ;
43+ import rx .Observable .OnSubscribe ;
3644import rx .Observer ;
3745import rx .Subscriber ;
3846import rx .Subscription ;
47+ import rx .schedulers .Schedulers ;
3948
4049public class SerializedObserverTest {
4150
@@ -265,6 +274,111 @@ public void runConcurrencyTest() {
265274 }
266275 }
267276
277+ @ Test
278+ public void testNotificationDelay () {
279+ ExecutorService tp = Executors .newFixedThreadPool (2 );
280+
281+ TestSubscriber <String > to = new TestSubscriber <String >(new Observer <String >() {
282+
283+ @ Override
284+ public void onCompleted () {
285+
286+ }
287+
288+ @ Override
289+ public void onError (Throwable e ) {
290+
291+ }
292+
293+ @ Override
294+ public void onNext (String t ) {
295+ // force it to take time when delivering
296+ // so the second thread will asynchronously enqueue
297+ try {
298+ Thread .sleep (50 );
299+ } catch (InterruptedException e ) {
300+ e .printStackTrace ();
301+ }
302+ }
303+
304+ });
305+ Observer <String > o = serializedObserver (to );
306+
307+ Future <?> f1 = tp .submit (new OnNextThread (o , 1 ));
308+ Future <?> f2 = tp .submit (new OnNextThread (o , 1 ));
309+
310+ waitOnThreads (f1 , f2 );
311+ // not completed yet
312+
313+ assertEquals (2 , to .getOnNextEvents ().size ());
314+ System .out .println (to .getOnNextEvents ());
315+ o .onCompleted ();
316+ System .out .println (to .getOnNextEvents ());
317+ }
318+
319+ /**
320+ * Demonstrates thread starvation problem.
321+ *
322+ * No solution on this for now. Trade-off in this direction as per https://github.com/Netflix/RxJava/issues/998#issuecomment-38959474
323+ * Probably need backpressure for this to work
324+ *
325+ * When using SynchronizedObserver we get this output:
326+ *
327+ * p1: 18 p2: 68 => should be close to each other unless we have thread starvation
328+ *
329+ * When using SerializedObserver we get:
330+ *
331+ * p1: 1 p2: 2445261 => should be close to each other unless we have thread starvation
332+ *
333+ * This demonstrates how SynchronizedObserver balances back and forth better, and blocks emission.
334+ * The real issue in this example is the async buffer-bloat, so we need backpressure.
335+ *
336+ *
337+ * @throws InterruptedException
338+ */
339+ @ Ignore
340+ @ Test
341+ public void testThreadStarvation () throws InterruptedException {
342+
343+ TestSubscriber <String > to = new TestSubscriber <String >(new Observer <String >() {
344+
345+ @ Override
346+ public void onCompleted () {
347+
348+ }
349+
350+ @ Override
351+ public void onError (Throwable e ) {
352+
353+ }
354+
355+ @ Override
356+ public void onNext (String t ) {
357+ // force it to take time when delivering
358+ try {
359+ Thread .sleep (1 );
360+ } catch (InterruptedException e ) {
361+ }
362+ }
363+
364+ });
365+ Observer <String > o = serializedObserver (to );
366+
367+ AtomicInteger p1 = new AtomicInteger ();
368+ AtomicInteger p2 = new AtomicInteger ();
369+
370+ Subscription s1 = infinite (p1 ).subscribe (o );
371+ Subscription s2 = infinite (p2 ).subscribe (o );
372+
373+ Thread .sleep (100 );
374+
375+ System .out .println ("p1: " + p1 .get () + " p2: " + p2 .get () + " => should be close to each other unless we have thread starvation" );
376+ assertEquals (p1 .get (), p2 .get (), 10000 ); // fairly distributed within 10000 of each other
377+
378+ s1 .unsubscribe ();
379+ s2 .unsubscribe ();
380+ }
381+
268382 private static void waitOnThreads (Future <?>... futures ) {
269383 for (Future <?> f : futures ) {
270384 try {
@@ -276,23 +390,44 @@ private static void waitOnThreads(Future<?>... futures) {
276390 }
277391 }
278392
393+ private static Observable <String > infinite (final AtomicInteger produced ) {
394+ return Observable .create (new OnSubscribe <String >() {
395+
396+ @ Override
397+ public void call (Subscriber <? super String > s ) {
398+ while (!s .isUnsubscribed ()) {
399+ s .onNext ("onNext" );
400+ produced .incrementAndGet ();
401+ }
402+ }
403+
404+ }).subscribeOn (Schedulers .newThread ());
405+ }
406+
279407 /**
280408 * A thread that will pass data to onNext
281409 */
282410 public static class OnNextThread implements Runnable {
283411
284- private final Observer <String > Observer ;
412+ private final Observer <String > observer ;
285413 private final int numStringsToSend ;
414+ final AtomicInteger produced ;
286415
287- OnNextThread (Observer <String > Observer , int numStringsToSend ) {
288- this .Observer = Observer ;
416+ OnNextThread (Observer <String > observer , int numStringsToSend , AtomicInteger produced ) {
417+ this .observer = observer ;
289418 this .numStringsToSend = numStringsToSend ;
419+ this .produced = produced ;
420+ }
421+
422+ OnNextThread (Observer <String > observer , int numStringsToSend ) {
423+ this (observer , numStringsToSend , new AtomicInteger ());
290424 }
291425
292426 @ Override
293427 public void run () {
294428 for (int i = 0 ; i < numStringsToSend ; i ++) {
295- Observer .onNext (Thread .currentThread ().getId () + "-" + i );
429+ observer .onNext (Thread .currentThread ().getId () + "-" + i );
430+ produced .incrementAndGet ();
296431 }
297432 }
298433 }
0 commit comments