Skip to content

Commit 0038044

Browse files
authored
Add Hot Publishing Unit Tests (#12)
1 parent e4aa035 commit 0038044

File tree

2 files changed

+200
-1
lines changed

2 files changed

+200
-1
lines changed

reactive/src/test/java/edu/example/myjavalab/reactor/flux/TestFlux.java renamed to reactive/src/test/java/edu/example/myjavalab/reactor/flux/cold/TestFlux.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* SOFTWARE.
2323
*/
2424

25-
package edu.example.myjavalab.reactor.flux;
25+
package edu.example.myjavalab.reactor.flux.cold;
2626

2727
import static org.junit.jupiter.api.Assertions.assertEquals;
2828
import static org.mockito.ArgumentMatchers.any;
@@ -50,6 +50,10 @@
5050
* <p>
5151
* The terms "producer" and "publisher" are used interchangeable in this test class.
5252
* Likewise, the terms "consumer" and "subscriber".
53+
* <p>
54+
* This Test class is located in the <code>cold</code> package, meaning that it contains
55+
* tests related to "cold publishing", i.e., a publisher who needs at least one subscriber
56+
* to emit the data, but also whose emitted data is not shared among different subscribers.
5357
*/
5458
public class TestFlux {
5559

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2025 Cristiano Silva
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package edu.example.myjavalab.reactor.flux.hot;
26+
27+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
28+
import static org.junit.jupiter.api.Assertions.assertTrue;
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.ArgumentMatchers.anyInt;
31+
import static org.mockito.Mockito.atLeastOnce;
32+
import static org.mockito.Mockito.mock;
33+
import static org.mockito.Mockito.never;
34+
import static org.mockito.Mockito.times;
35+
import static org.mockito.Mockito.verify;
36+
37+
import edu.example.myjavalab.reactor.common.SomeBusinessLogic;
38+
import edu.example.myjavalab.reactor.common.SomeUtils;
39+
import edu.example.myjavalab.reactor.flux.internal.NumberGenerator;
40+
import java.time.Duration;
41+
import java.util.Random;
42+
import org.junit.jupiter.api.Test;
43+
import org.mockito.ArgumentCaptor;
44+
import reactor.core.publisher.Flux;
45+
46+
/**
47+
* Playing around with {@link Flux} via JUnit tests.
48+
* <p>
49+
* The terms "producer" and "publisher" are used interchangeable in this test class.
50+
* Likewise, the terms "consumer" and "subscriber".
51+
* <p>
52+
* This Test class is located in the <code>hot</code> package, meaning that it contains
53+
* tests related to "hot publishing", i.e., a publisher who is able, if wanted, to emit data
54+
* without anyone subscribed to it, but also whose emitted data is shared among different subscribers.
55+
*/
56+
public class TestFlux {
57+
58+
/**
59+
* GIVEN a producer associated with a numbers generator
60+
* AND two consumers already subscribed to it
61+
* WHEN numbers are created by generator
62+
* THEN and only then the same data is processed by both consumers
63+
*/
64+
@Test
65+
public void testFluxSinkOfSharedData() {
66+
67+
// preparation
68+
final SomeBusinessLogic<Integer> processNumber = mock(SomeBusinessLogic.class);
69+
70+
final int howManyNumbers = 5;
71+
72+
// producer created, associated to a generator
73+
final NumberGenerator numbers = new NumberGenerator();
74+
75+
var flux = Flux.create(numbers).share();
76+
77+
// two implicit subscribers
78+
flux.subscribe(processNumber::process, processNumber::onError, processNumber::onComplete);
79+
flux.subscribe(processNumber::process, processNumber::onError, processNumber::onComplete);
80+
81+
// generator has not created any numbers yet, so nothing has happened
82+
verify(processNumber, never()).process(anyInt());
83+
verify(processNumber, never()).onComplete();
84+
verify(processNumber, never()).onError(any(Throwable.class));
85+
86+
// now numbers will be generated
87+
numbers.generate(howManyNumbers);
88+
numbers.complete();
89+
90+
// ... and now they are processed
91+
verify(processNumber, times(howManyNumbers * 2)).process(anyInt());
92+
verify(processNumber, times(2)).onComplete();
93+
verify(processNumber, never()).onError(any(Throwable.class));
94+
}
95+
96+
/**
97+
* GIVEN a cold producer of random numbers
98+
* AND as long as there is no subscriber, data is not produced
99+
* WHEN two consumers subscribe to it
100+
* THEN they receive two different sets of data
101+
*/
102+
@Test
103+
public void testColdPublishing() {
104+
105+
// preparation
106+
final Random random = new Random();
107+
108+
final SomeBusinessLogic<Integer> firstProcessNumber = mock(SomeBusinessLogic.class);
109+
final SomeBusinessLogic<Integer> secondProcessNumber = mock(SomeBusinessLogic.class);
110+
111+
final int howManyNumbersToTake = 5;
112+
113+
// creating cold producer (think of it as Netflix!)
114+
Flux<Integer> coldFlux = Flux.generate(sink -> sink.next(random.nextInt()))
115+
.cast(Integer.class);
116+
117+
// no subscribers yet, nothing happened
118+
verify(firstProcessNumber, never()).process(anyInt());
119+
verify(secondProcessNumber, never()).process(anyInt());
120+
verify(firstProcessNumber, never()).onComplete();
121+
verify(secondProcessNumber, never()).onComplete();
122+
verify(firstProcessNumber, never()).onError(any(Throwable.class));
123+
verify(secondProcessNumber, never()).onError(any(Throwable.class));
124+
125+
// two implicit subscribers
126+
coldFlux.log("take")
127+
.take(howManyNumbersToTake)
128+
.log("sub")
129+
.subscribe(firstProcessNumber::process, firstProcessNumber::onError, firstProcessNumber::onComplete);
130+
coldFlux.log("take")
131+
.take(howManyNumbersToTake)
132+
.log("sub")
133+
.subscribe(secondProcessNumber::process, secondProcessNumber::onError, secondProcessNumber::onComplete);
134+
135+
// verifying
136+
ArgumentCaptor<Integer> firstCaptor = ArgumentCaptor.captor();
137+
ArgumentCaptor<Integer> secondCaptor = ArgumentCaptor.captor();
138+
139+
verify(firstProcessNumber, times(howManyNumbersToTake)).process(firstCaptor.capture());
140+
verify(secondProcessNumber, times(howManyNumbersToTake)).process(secondCaptor.capture());
141+
142+
// different data, because it was not shared
143+
assertNotEquals(firstCaptor.getAllValues(), secondCaptor.getAllValues());
144+
}
145+
146+
/**
147+
* GIVEN a hot producer of random numbers
148+
* AND data is produced right away, without subscribers
149+
* WHEN two consumers subscribe to it
150+
* THEN they receive the same set of data
151+
*/
152+
@Test
153+
public void testHotPublishing() {
154+
155+
// preparation
156+
final Random random = new Random();
157+
158+
final SomeBusinessLogic<Integer> firstProcessNumber = mock(SomeBusinessLogic.class);
159+
final SomeBusinessLogic<Integer> secondProcessNumber = mock(SomeBusinessLogic.class);
160+
161+
final int howManyNumbersToTake = 5;
162+
163+
// creating hot producer (think of it as a movie theatre!)
164+
Flux<Integer> hotFlux = Flux.generate(sink -> sink.next(random.nextInt()))
165+
.delayElements(Duration.ofMillis(500))
166+
.cast(Integer.class)
167+
.replay(howManyNumbersToTake - 2)
168+
.refCount(1);
169+
170+
// two implicit subscribers, one starting before the other
171+
hotFlux.log("take")
172+
.take(howManyNumbersToTake)
173+
.log("sub")
174+
.subscribe(firstProcessNumber::process, firstProcessNumber::onError, firstProcessNumber::onComplete);
175+
176+
SomeUtils.INSTANCE.aBitOfSleeping(1);
177+
178+
hotFlux.log("take")
179+
.take(howManyNumbersToTake)
180+
.log("sub")
181+
.subscribe(secondProcessNumber::process, secondProcessNumber::onError, secondProcessNumber::onComplete);
182+
183+
SomeUtils.INSTANCE.aBitOfSleeping(2);
184+
185+
// verifying
186+
ArgumentCaptor<Integer> firstCaptor = ArgumentCaptor.captor();
187+
ArgumentCaptor<Integer> secondCaptor = ArgumentCaptor.captor();
188+
189+
verify(firstProcessNumber, atLeastOnce()).process(firstCaptor.capture());
190+
verify(secondProcessNumber, atLeastOnce()).process(secondCaptor.capture());
191+
192+
// data was shared
193+
secondCaptor.getAllValues().forEach(value -> assertTrue(firstCaptor.getAllValues().contains(value)));
194+
}
195+
}

0 commit comments

Comments
 (0)