@@ -60,7 +60,7 @@ public Observable<String> call(final GroupedObservable<String, Integer> innerGro
6060
6161 @ Override
6262 public String call (Integer i ) {
63- return (outerGroup .getKey () ? "Even" : "Odd " ) + " => from source: " + innerGroup .getKey () + " Value: " + i ;
63+ return (outerGroup .getKey () ? "Even" : "Odd " ) + " => from source: " + outerGroup . getKey () + "." + innerGroup .getKey () + " Value: " + i + " Thread: " + Thread . currentThread () ;
6464 }
6565
6666 });
@@ -166,6 +166,140 @@ public String call(Integer i) {
166166 assertTrue (counter2 .get () < 50000 ); // should be much smaller (< 1000) but this will be non-deterministic
167167 }
168168
169+ /**
170+ * The pivot operator does not need to add any serialization but this is confirming the expected behavior.
171+ *
172+ * It does not need serializing as it never merges groups, it just re-arranges them.
173+ *
174+ * For example, a simple 2-stream case with odd/even:
175+ *
176+ * Observable<GroupedObservable<Boolean, Integer>> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 1
177+ * Observable<GroupedObservable<Boolean, Integer>> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 2
178+ * Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2));
179+ * Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>> pivoted = Observable.pivot(groups);
180+ *
181+ * ============> OnNext: Odd => from source: false.o1 Value: 1 Thread: Thread[RxNewThreadScheduler-1,5,main]
182+ * ============> OnNext: Even => from source: true.o2 Value: 12 Thread: Thread[RxNewThreadScheduler-2,5,main]
183+ * ============> OnNext: Even => from source: true.o1 Value: 2 Thread: Thread[RxNewThreadScheduler-1,5,main]
184+ * ============> OnNext: Odd => from source: false.o2 Value: 11 Thread: Thread[RxNewThreadScheduler-2,5,main]
185+ * ============> OnNext: Odd => from source: false.o2 Value: 13 Thread: Thread[RxNewThreadScheduler-2,5,main]
186+ * ============> OnNext: Odd => from source: false.o2 Value: 15 Thread: Thread[RxNewThreadScheduler-2,5,main]
187+ * ============> OnNext: Odd => from source: false.o2 Value: 17 Thread: Thread[RxNewThreadScheduler-2,5,main]
188+ * ============> OnNext: Even => from source: true.o2 Value: 14 Thread: Thread[RxNewThreadScheduler-2,5,main]
189+ * ============> OnNext: Odd => from source: false.o1 Value: 3 Thread: Thread[RxNewThreadScheduler-1,5,main]
190+ * ============> OnNext: Odd => from source: false.o1 Value: 5 Thread: Thread[RxNewThreadScheduler-1,5,main]
191+ * ============> OnNext: Odd => from source: false.o1 Value: 7 Thread: Thread[RxNewThreadScheduler-1,5,main]
192+ * ============> OnNext: Odd => from source: false.o1 Value: 9 Thread: Thread[RxNewThreadScheduler-1,5,main]
193+ * ============> OnNext: Even => from source: true.o2 Value: 16 Thread: Thread[RxNewThreadScheduler-2,5,main]
194+ * ============> OnNext: Even => from source: true.o2 Value: 18 Thread: Thread[RxNewThreadScheduler-2,5,main]
195+ * ============> OnNext: Odd => from source: false.o2 Value: 19 Thread: Thread[RxNewThreadScheduler-2,5,main]
196+ * ============> OnNext: Even => from source: true.o1 Value: 4 Thread: Thread[RxNewThreadScheduler-1,5,main]
197+ * ============> OnNext: Even => from source: true.o1 Value: 6 Thread: Thread[RxNewThreadScheduler-1,5,main]
198+ * ============> OnNext: Even => from source: true.o1 Value: 8 Thread: Thread[RxNewThreadScheduler-1,5,main]
199+ * ============> OnNext: Even => from source: true.o1 Value: 10 Thread: Thread[RxNewThreadScheduler-1,5,main]
200+ * ============> OnNext: Even => from source: true.o2 Value: 20 Thread: Thread[RxNewThreadScheduler-2,5,main]
201+ * ============> OnCompleted
202+ *
203+ * This starts as:
204+ *
205+ * => Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>>:
206+ *
207+ * o1.odd: 1, 3, 5, 7, 9 on Thread 1
208+ * o1.even: 2, 4, 6, 8, 10 on Thread 1
209+ * o2.odd: 11, 13, 15, 17, 19 on Thread 2
210+ * o2.even: 12, 14, 16, 18, 20 on Thread 2
211+ *
212+ * It pivots to become:
213+ *
214+ * => Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>>:
215+ *
216+ * odd.o1: 1, 3, 5, 7, 9 on Thread 1
217+ * odd.o2: 11, 13, 15, 17, 19 on Thread 2
218+ * even.o1: 2, 4, 6, 8, 10 on Thread 1
219+ * even.o2: 12, 14, 16, 18, 20 on Thread 2
220+ *
221+ * Then a subsequent step can merge them if desired and add serialization, such as merge(even.o1, even.o2) to become a serialized "even"
222+ */
223+ @ Test
224+ public void testConcurrencyAndSerialization () throws InterruptedException {
225+ final AtomicInteger maxOuterConcurrency = new AtomicInteger ();
226+ final AtomicInteger maxGroupConcurrency = new AtomicInteger ();
227+ Observable <GroupedObservable <Boolean , Integer >> o1 = getSource (2000 ).subscribeOn (Schedulers .newThread ()).groupBy (modKeySelector );
228+ Observable <GroupedObservable <Boolean , Integer >> o2 = getSource (4000 ).subscribeOn (Schedulers .newThread ()).groupBy (modKeySelector );
229+ Observable <GroupedObservable <Boolean , Integer >> o3 = getSource (6000 ).subscribeOn (Schedulers .newThread ()).groupBy (modKeySelector );
230+ Observable <GroupedObservable <Boolean , Integer >> o4 = getSource (8000 ).subscribeOn (Schedulers .newThread ()).groupBy (modKeySelector );
231+ Observable <GroupedObservable <String , GroupedObservable <Boolean , Integer >>> groups = Observable .from (GroupedObservable .from ("o1" , o1 ), GroupedObservable .from ("o2" , o2 ),
232+ GroupedObservable .from ("o3" , o3 ), GroupedObservable .from ("o4" , o4 ));
233+ Observable <GroupedObservable <Boolean , GroupedObservable <String , Integer >>> pivoted = Observable .pivot (groups );
234+ TestSubscriber <String > ts = new TestSubscriber <String >();
235+ pivoted .take (2 ).flatMap (new Func1 <GroupedObservable <Boolean , GroupedObservable <String , Integer >>, Observable <String >>() {
236+
237+ final AtomicInteger outerThreads = new AtomicInteger ();
238+
239+ @ Override
240+ public Observable <String > call (final GroupedObservable <Boolean , GroupedObservable <String , Integer >> outerGroup ) {
241+ return outerGroup .flatMap (new Func1 <GroupedObservable <String , Integer >, Observable <String >>() {
242+
243+ @ Override
244+ public Observable <String > call (final GroupedObservable <String , Integer > innerGroup ) {
245+ final AtomicInteger threadsPerGroup = new AtomicInteger ();
246+ return innerGroup .take (100 ).map (new Func1 <Integer , String >() {
247+
248+ @ Override
249+ public String call (Integer i ) {
250+ int outerThreadCount = outerThreads .incrementAndGet ();
251+ setMaxConcurrency (maxOuterConcurrency , outerThreadCount );
252+ int innerThreadCount = threadsPerGroup .incrementAndGet ();
253+ setMaxConcurrency (maxGroupConcurrency , innerThreadCount );
254+ if (innerThreadCount > 1 ) {
255+ System .err .println ("more than 1 thread for this group [" + innerGroup .getKey () + "]: " + innerThreadCount + " (before)" );
256+ throw new RuntimeException ("more than 1 thread for this group [" + innerGroup .getKey () + "]: " + innerThreadCount + " (before)" );
257+ }
258+ try {
259+ return (outerGroup .getKey () ? "Even" : "Odd " ) + " => from source: " + innerGroup .getKey () + " Value: " + i ;
260+ } finally {
261+ int outerThreadCountAfter = outerThreads .decrementAndGet ();
262+ setMaxConcurrency (maxOuterConcurrency , outerThreadCountAfter );
263+ int innerThreadCountAfter = threadsPerGroup .decrementAndGet ();
264+ setMaxConcurrency (maxGroupConcurrency , innerThreadCountAfter );
265+ if (innerThreadCountAfter > 0 ) {
266+ System .err .println ("more than 1 thread for this group [" + innerGroup .getKey () + "]: " + innerThreadCount + " (after)" );
267+ throw new RuntimeException ("more than 1 thread for this group [" + innerGroup .getKey () + "]: " + innerThreadCountAfter + " (after)" );
268+ }
269+ }
270+ }
271+
272+ private void setMaxConcurrency (final AtomicInteger maxOuterConcurrency , int outerThreadCount ) {
273+ int max = maxOuterConcurrency .get ();
274+ if (outerThreadCount > max ) {
275+ maxOuterConcurrency .compareAndSet (max , outerThreadCount );
276+ }
277+ }
278+
279+ });
280+ }
281+
282+ });
283+ }
284+
285+ }).subscribe (ts );
286+
287+ ts .awaitTerminalEvent ();
288+
289+ System .out .println ("onNext [" + ts .getOnNextEvents ().size () + "]: " + ts .getOnNextEvents ());
290+ System .out .println ("max outer concurrency: " + maxOuterConcurrency .get ());
291+ assertTrue (maxOuterConcurrency .get () > 2 ); // should be 4 since we have 4 threads running but setting at 3 as this is somewhat non-deterministic
292+ System .out .println ("max group concurrency: " + maxGroupConcurrency .get ());
293+ assertTrue (maxGroupConcurrency .get () == 1 ); // should always be 1
294+
295+ assertEquals (800 , ts .getOnNextEvents ().size ());
296+
297+ }
298+
299+ private static Observable <Integer > getSource (final int start ) {
300+ return getSource (start , new AtomicInteger ());
301+ }
302+
169303 private static Observable <Integer > getSource (final int start , final AtomicInteger counter ) {
170304 return Observable .create (new OnSubscribe <Integer >() {
171305
0 commit comments