@@ -87,6 +87,58 @@ public void testBufferedReplay() {
8787 }
8888 }
8989
90+ @ Test
91+ public void testBufferedWindowReplay () {
92+ PublishSubject <Integer > source = PublishSubject .create ();
93+ TestScheduler scheduler = new TestScheduler ();
94+ ConnectableObservable <Integer > co = source .replay (3 , 100 , TimeUnit .MILLISECONDS , scheduler );
95+ co .connect ();
96+
97+ {
98+ @ SuppressWarnings ("unchecked" )
99+ Observer <Object > observer1 = mock (Observer .class );
100+ InOrder inOrder = inOrder (observer1 );
101+
102+ co .subscribe (observer1 );
103+
104+ source .onNext (1 );
105+ scheduler .advanceTimeBy (10 , TimeUnit .MILLISECONDS );
106+ source .onNext (2 );
107+ scheduler .advanceTimeBy (10 , TimeUnit .MILLISECONDS );
108+ source .onNext (3 );
109+ scheduler .advanceTimeBy (10 , TimeUnit .MILLISECONDS );
110+
111+ inOrder .verify (observer1 , times (1 )).onNext (1 );
112+ inOrder .verify (observer1 , times (1 )).onNext (2 );
113+ inOrder .verify (observer1 , times (1 )).onNext (3 );
114+
115+ source .onNext (4 );
116+ source .onNext (5 );
117+ scheduler .advanceTimeBy (90 , TimeUnit .MILLISECONDS );
118+
119+ inOrder .verify (observer1 , times (1 )).onNext (4 );
120+
121+ inOrder .verify (observer1 , times (1 )).onNext (5 );
122+
123+ inOrder .verifyNoMoreInteractions ();
124+ verify (observer1 , never ()).onError (any (Throwable .class ));
125+
126+ }
127+
128+ {
129+ @ SuppressWarnings ("unchecked" )
130+ Observer <Object > observer1 = mock (Observer .class );
131+ InOrder inOrder = inOrder (observer1 );
132+
133+ co .subscribe (observer1 );
134+
135+ inOrder .verify (observer1 , times (1 )).onNext (4 );
136+ inOrder .verify (observer1 , times (1 )).onNext (5 );
137+ inOrder .verifyNoMoreInteractions ();
138+ verify (observer1 , never ()).onError (any (Throwable .class ));
139+ }
140+ }
141+
90142 @ Test
91143 public void testWindowedReplay () {
92144 TestScheduler scheduler = new TestScheduler ();
0 commit comments