Skip to content

Commit eb0839b

Browse files
Pivot Unit Test
1 parent f15ea24 commit eb0839b

File tree

1 file changed

+135
-1
lines changed

1 file changed

+135
-1
lines changed

rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public Observable<String> call(final GroupedObservable<String, Integer> innerGro
6060

6161
@Override
6262
public String call(Integer i) {
63-
return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i;
63+
return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + outerGroup.getKey() + "." + innerGroup.getKey() + " Value: " + i + " Thread: " + Thread.currentThread();
6464
}
6565

6666
});
@@ -166,6 +166,140 @@ public String call(Integer i) {
166166
assertTrue(counter2.get() < 50000); // should be much smaller (< 1000) but this will be non-deterministic
167167
}
168168

169+
/**
170+
* The pivot operator does not need to add any serialization but this is confirming the expected behavior.
171+
*
172+
* It does not need serializing as it never merges groups, it just re-arranges them.
173+
*
174+
* For example, a simple 2-stream case with odd/even:
175+
*
176+
* Observable<GroupedObservable<Boolean, Integer>> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 1
177+
* Observable<GroupedObservable<Boolean, Integer>> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 2
178+
* Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2));
179+
* Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>> pivoted = Observable.pivot(groups);
180+
*
181+
* ============> OnNext: Odd => from source: false.o1 Value: 1 Thread: Thread[RxNewThreadScheduler-1,5,main]
182+
* ============> OnNext: Even => from source: true.o2 Value: 12 Thread: Thread[RxNewThreadScheduler-2,5,main]
183+
* ============> OnNext: Even => from source: true.o1 Value: 2 Thread: Thread[RxNewThreadScheduler-1,5,main]
184+
* ============> OnNext: Odd => from source: false.o2 Value: 11 Thread: Thread[RxNewThreadScheduler-2,5,main]
185+
* ============> OnNext: Odd => from source: false.o2 Value: 13 Thread: Thread[RxNewThreadScheduler-2,5,main]
186+
* ============> OnNext: Odd => from source: false.o2 Value: 15 Thread: Thread[RxNewThreadScheduler-2,5,main]
187+
* ============> OnNext: Odd => from source: false.o2 Value: 17 Thread: Thread[RxNewThreadScheduler-2,5,main]
188+
* ============> OnNext: Even => from source: true.o2 Value: 14 Thread: Thread[RxNewThreadScheduler-2,5,main]
189+
* ============> OnNext: Odd => from source: false.o1 Value: 3 Thread: Thread[RxNewThreadScheduler-1,5,main]
190+
* ============> OnNext: Odd => from source: false.o1 Value: 5 Thread: Thread[RxNewThreadScheduler-1,5,main]
191+
* ============> OnNext: Odd => from source: false.o1 Value: 7 Thread: Thread[RxNewThreadScheduler-1,5,main]
192+
* ============> OnNext: Odd => from source: false.o1 Value: 9 Thread: Thread[RxNewThreadScheduler-1,5,main]
193+
* ============> OnNext: Even => from source: true.o2 Value: 16 Thread: Thread[RxNewThreadScheduler-2,5,main]
194+
* ============> OnNext: Even => from source: true.o2 Value: 18 Thread: Thread[RxNewThreadScheduler-2,5,main]
195+
* ============> OnNext: Odd => from source: false.o2 Value: 19 Thread: Thread[RxNewThreadScheduler-2,5,main]
196+
* ============> OnNext: Even => from source: true.o1 Value: 4 Thread: Thread[RxNewThreadScheduler-1,5,main]
197+
* ============> OnNext: Even => from source: true.o1 Value: 6 Thread: Thread[RxNewThreadScheduler-1,5,main]
198+
* ============> OnNext: Even => from source: true.o1 Value: 8 Thread: Thread[RxNewThreadScheduler-1,5,main]
199+
* ============> OnNext: Even => from source: true.o1 Value: 10 Thread: Thread[RxNewThreadScheduler-1,5,main]
200+
* ============> OnNext: Even => from source: true.o2 Value: 20 Thread: Thread[RxNewThreadScheduler-2,5,main]
201+
* ============> OnCompleted
202+
*
203+
* This starts as:
204+
*
205+
* => Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>>:
206+
*
207+
* o1.odd: 1, 3, 5, 7, 9 on Thread 1
208+
* o1.even: 2, 4, 6, 8, 10 on Thread 1
209+
* o2.odd: 11, 13, 15, 17, 19 on Thread 2
210+
* o2.even: 12, 14, 16, 18, 20 on Thread 2
211+
*
212+
* It pivots to become:
213+
*
214+
* => Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>>:
215+
*
216+
* odd.o1: 1, 3, 5, 7, 9 on Thread 1
217+
* odd.o2: 11, 13, 15, 17, 19 on Thread 2
218+
* even.o1: 2, 4, 6, 8, 10 on Thread 1
219+
* even.o2: 12, 14, 16, 18, 20 on Thread 2
220+
*
221+
* Then a subsequent step can merge them if desired and add serialization, such as merge(even.o1, even.o2) to become a serialized "even"
222+
*/
223+
@Test
224+
public void testConcurrencyAndSerialization() throws InterruptedException {
225+
final AtomicInteger maxOuterConcurrency = new AtomicInteger();
226+
final AtomicInteger maxGroupConcurrency = new AtomicInteger();
227+
Observable<GroupedObservable<Boolean, Integer>> o1 = getSource(2000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
228+
Observable<GroupedObservable<Boolean, Integer>> o2 = getSource(4000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
229+
Observable<GroupedObservable<Boolean, Integer>> o3 = getSource(6000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
230+
Observable<GroupedObservable<Boolean, Integer>> o4 = getSource(8000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
231+
Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2),
232+
GroupedObservable.from("o3", o3), GroupedObservable.from("o4", o4));
233+
Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>> pivoted = Observable.pivot(groups);
234+
TestSubscriber<String> ts = new TestSubscriber<String>();
235+
pivoted.take(2).flatMap(new Func1<GroupedObservable<Boolean, GroupedObservable<String, Integer>>, Observable<String>>() {
236+
237+
final AtomicInteger outerThreads = new AtomicInteger();
238+
239+
@Override
240+
public Observable<String> call(final GroupedObservable<Boolean, GroupedObservable<String, Integer>> outerGroup) {
241+
return outerGroup.flatMap(new Func1<GroupedObservable<String, Integer>, Observable<String>>() {
242+
243+
@Override
244+
public Observable<String> call(final GroupedObservable<String, Integer> innerGroup) {
245+
final AtomicInteger threadsPerGroup = new AtomicInteger();
246+
return innerGroup.take(100).map(new Func1<Integer, String>() {
247+
248+
@Override
249+
public String call(Integer i) {
250+
int outerThreadCount = outerThreads.incrementAndGet();
251+
setMaxConcurrency(maxOuterConcurrency, outerThreadCount);
252+
int innerThreadCount = threadsPerGroup.incrementAndGet();
253+
setMaxConcurrency(maxGroupConcurrency, innerThreadCount);
254+
if (innerThreadCount > 1) {
255+
System.err.println("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)");
256+
throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)");
257+
}
258+
try {
259+
return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i;
260+
} finally {
261+
int outerThreadCountAfter = outerThreads.decrementAndGet();
262+
setMaxConcurrency(maxOuterConcurrency, outerThreadCountAfter);
263+
int innerThreadCountAfter = threadsPerGroup.decrementAndGet();
264+
setMaxConcurrency(maxGroupConcurrency, innerThreadCountAfter);
265+
if (innerThreadCountAfter > 0) {
266+
System.err.println("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (after)");
267+
throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCountAfter + " (after)");
268+
}
269+
}
270+
}
271+
272+
private void setMaxConcurrency(final AtomicInteger maxOuterConcurrency, int outerThreadCount) {
273+
int max = maxOuterConcurrency.get();
274+
if (outerThreadCount > max) {
275+
maxOuterConcurrency.compareAndSet(max, outerThreadCount);
276+
}
277+
}
278+
279+
});
280+
}
281+
282+
});
283+
}
284+
285+
}).subscribe(ts);
286+
287+
ts.awaitTerminalEvent();
288+
289+
System.out.println("onNext [" + ts.getOnNextEvents().size() + "]: " + ts.getOnNextEvents());
290+
System.out.println("max outer concurrency: " + maxOuterConcurrency.get());
291+
assertTrue(maxOuterConcurrency.get() > 2); // should be 4 since we have 4 threads running but setting at 3 as this is somewhat non-deterministic
292+
System.out.println("max group concurrency: " + maxGroupConcurrency.get());
293+
assertTrue(maxGroupConcurrency.get() == 1); // should always be 1
294+
295+
assertEquals(800, ts.getOnNextEvents().size());
296+
297+
}
298+
299+
private static Observable<Integer> getSource(final int start) {
300+
return getSource(start, new AtomicInteger());
301+
}
302+
169303
private static Observable<Integer> getSource(final int start, final AtomicInteger counter) {
170304
return Observable.create(new OnSubscribe<Integer>() {
171305

0 commit comments

Comments
 (0)