16
16
package rx .subjects ;
17
17
18
18
import java .util .ArrayList ;
19
- import java .util .concurrent .ConcurrentHashMap ;
20
19
import java .util .concurrent .TimeUnit ;
21
20
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
22
- import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
23
21
24
22
import rx .Observer ;
25
23
import rx .Scheduler ;
@@ -98,26 +96,20 @@ public void call(SubjectObserver<T> o) {
98
96
int lastIndex = state .replayObserverFromIndex (0 , o );
99
97
100
98
// now that it is caught up add to observers
101
- state . replayState . put ( o , lastIndex );
99
+ o . index ( lastIndex );
102
100
}
103
101
};
104
102
ssm .onTerminated = new Action1 <SubjectObserver <T >>() {
105
103
@ Override
106
104
public void call (SubjectObserver <T > o ) {
107
- Integer idx = state . replayState . remove ( o );
105
+ Integer idx = o . index ( );
108
106
if (idx == null ) {
109
107
idx = 0 ;
110
108
}
111
109
// we will finish replaying if there is anything left
112
110
state .replayObserverFromIndex (idx , o );
113
111
}
114
112
};
115
- ssm .onUnsubscribed = new Action1 <SubjectObserver <T >>() {
116
- @ Override
117
- public void call (SubjectObserver <T > o ) {
118
- state .replayState .remove (o );
119
- }
120
- };
121
113
122
114
return new ReplaySubject <T >(ssm , ssm , state );
123
115
}
@@ -273,20 +265,13 @@ static final <T> ReplaySubject<T> createWithState(final BoundedState<T> state,
273
265
274
266
@ Override
275
267
public void call (SubjectObserver <T > t1 ) {
276
- NodeList .Node <Object > l = state . removeState ( t1 );
268
+ NodeList .Node <Object > l = t1 . index ( );
277
269
if (l == null ) {
278
270
l = state .head ();
279
271
}
280
272
state .replayObserverFromIndex (l , t1 );
281
273
}
282
274
283
- };
284
- ssm .onUnsubscribed = new Action1 <SubjectObserver <T >>() {
285
- @ Override
286
- public void call (SubjectObserver <T > t1 ) {
287
- state .removeState (t1 );
288
- }
289
-
290
275
};
291
276
292
277
return new ReplaySubject <T >(ssm , ssm , state );
@@ -341,7 +326,7 @@ public void onCompleted() {
341
326
* @return Returns the number of subscribers.
342
327
*/
343
328
/* Support test. */ int subscriberCount () {
344
- return state .replayStateSize () ;
329
+ return ssm . state .observers . length ;
345
330
}
346
331
347
332
private boolean caughtUp (SubjectObserver <? super T > o ) {
@@ -364,8 +349,6 @@ private boolean caughtUp(SubjectObserver<? super T> o) {
364
349
* @param <T> the input and output type
365
350
*/
366
351
static final class UnboundedReplayState <T > implements ReplayState <T , Integer > {
367
- /** Each Observer is tracked here for what events they have received. */
368
- final ConcurrentHashMap <Observer <? super T >, Integer > replayState ;
369
352
private final NotificationLite <T > nl = NotificationLite .instance ();
370
353
/** The buffer. */
371
354
private final ArrayList <Object > list ;
@@ -378,7 +361,6 @@ static final class UnboundedReplayState<T> implements ReplayState<T, Integer> {
378
361
= AtomicIntegerFieldUpdater .newUpdater (UnboundedReplayState .class , "index" );
379
362
public UnboundedReplayState (int initialCapacity ) {
380
363
list = new ArrayList <Object >(initialCapacity );
381
- replayState = new ConcurrentHashMap <Observer <? super T >, Integer >();
382
364
}
383
365
384
366
@ Override
@@ -417,10 +399,10 @@ public boolean terminated() {
417
399
418
400
@ Override
419
401
public void replayObserver (SubjectObserver <? super T > observer ) {
420
- Integer lastEmittedLink = replayState . get ( observer );
402
+ Integer lastEmittedLink = observer . index ( );
421
403
if (lastEmittedLink != null ) {
422
404
int l = replayObserverFromIndex (lastEmittedLink , observer );
423
- replayState . put ( observer , l );
405
+ observer . index ( l );
424
406
} else {
425
407
throw new IllegalStateException ("failed to find lastEmittedLink for: " + observer );
426
408
}
@@ -441,12 +423,6 @@ public Integer replayObserverFromIndex(Integer idx, SubjectObserver<? super T> o
441
423
public Integer replayObserverFromIndexTest (Integer idx , SubjectObserver <? super T > observer , long now ) {
442
424
return replayObserverFromIndex (idx , observer );
443
425
}
444
-
445
- @ Override
446
- public int replayStateSize () {
447
- return replayState .size ();
448
- }
449
-
450
426
}
451
427
452
428
@@ -456,7 +432,6 @@ public int replayStateSize() {
456
432
*/
457
433
static final class BoundedState <T > implements ReplayState <T , NodeList .Node <Object >> {
458
434
final NodeList <Object > list ;
459
- final ConcurrentHashMap <Observer <? super T >, NodeList .Node <Object >> replayState ;
460
435
final EvictionPolicy evictionPolicy ;
461
436
final Func1 <Object , Object > enterTransform ;
462
437
final Func1 <Object , Object > leaveTransform ;
@@ -468,7 +443,6 @@ public BoundedState(EvictionPolicy evictionPolicy, Func1<Object, Object> enterTr
468
443
Func1 <Object , Object > leaveTransform ) {
469
444
this .list = new NodeList <Object >();
470
445
this .tail = list .tail ;
471
- this .replayState = new ConcurrentHashMap <Observer <? super T >, NodeList .Node <Object >>();
472
446
this .evictionPolicy = evictionPolicy ;
473
447
this .enterTransform = enterTransform ;
474
448
this .leaveTransform = leaveTransform ;
@@ -525,21 +499,11 @@ public Node<Object> head() {
525
499
public Node <Object > tail () {
526
500
return tail ;
527
501
}
528
- public Node <Object > removeState (SubjectObserver <? super T > o ) {
529
- return replayState .remove (o );
530
- }
531
- public void addState (SubjectObserver <? super T > o , Node <Object > state ) {
532
- if (state == null ) {
533
- throw new IllegalStateException ("Null state!" );
534
- } else {
535
- replayState .put (o , state );
536
- }
537
- }
538
502
@ Override
539
503
public void replayObserver (SubjectObserver <? super T > observer ) {
540
- NodeList .Node <Object > lastEmittedLink = replayState . get ( observer );
504
+ NodeList .Node <Object > lastEmittedLink = observer . index ( );
541
505
NodeList .Node <Object > l = replayObserverFromIndex (lastEmittedLink , observer );
542
- addState ( observer , l );
506
+ observer . index ( l );
543
507
}
544
508
545
509
@ Override
@@ -565,11 +529,6 @@ public NodeList.Node<Object> replayObserverFromIndexTest(
565
529
public boolean terminated () {
566
530
return terminated ;
567
531
}
568
-
569
- @ Override
570
- public int replayStateSize () {
571
- return replayState .size ();
572
- }
573
532
}
574
533
575
534
// **************
@@ -584,6 +543,10 @@ public int replayStateSize() {
584
543
interface ReplayState <T , I > {
585
544
/** @return true if the subject has reached a terminal state. */
586
545
boolean terminated ();
546
+ /**
547
+ * Replay contents to the given observer.
548
+ * @param observer the receiver of events
549
+ */
587
550
void replayObserver (SubjectObserver <? super T > observer );
588
551
/**
589
552
* Replay the buffered values from an index position and return a new index
@@ -601,10 +564,6 @@ I replayObserverFromIndex(
601
564
*/
602
565
I replayObserverFromIndexTest (
603
566
I idx , SubjectObserver <? super T > observer , long now );
604
- /**
605
- * @return the size of the replay state map for testing purposes.
606
- */
607
- int replayStateSize ();
608
567
/**
609
568
* Add an OnNext value to the buffer
610
569
* @param value the value to add
@@ -756,7 +715,7 @@ public DefaultOnAdd(BoundedState<T> state) {
756
715
@ Override
757
716
public void call (SubjectObserver <T > t1 ) {
758
717
NodeList .Node <Object > l = state .replayObserverFromIndex (state .head (), t1 );
759
- state . addState ( t1 , l );
718
+ t1 . index ( l );
760
719
}
761
720
762
721
}
@@ -783,7 +742,7 @@ public void call(SubjectObserver<T> t1) {
783
742
// accept all if terminated
784
743
l = state .replayObserverFromIndex (state .head (), t1 );
785
744
}
786
- state . addState ( t1 , l );
745
+ t1 . index ( l );
787
746
}
788
747
789
748
}
0 commit comments