2222import java .util .HashSet ;
2323import java .util .List ;
2424import java .util .Set ;
25+ import java .util .concurrent .CompletableFuture ;
26+ import java .util .concurrent .ExecutorService ;
27+ import java .util .concurrent .Executors ;
28+ import java .util .concurrent .Future ;
2529import java .util .concurrent .atomic .AtomicInteger ;
2630import java .util .function .Function ;
2731import java .util .function .Supplier ;
32+ import java .util .function .ToIntFunction ;
2833import java .util .stream .Collectors ;
2934import java .util .stream .IntStream ;
35+ import java .util .stream .Stream ;
3036
3137import com .google .common .base .Stopwatch ;
3238import com .google .common .base .Throwables ;
39+ import com .google .common .util .concurrent .Futures ;
3340import net .codacloud .ApiException ;
3441import org .slf4j .Logger ;
3542import org .slf4j .LoggerFactory ;
3946 *
4047 * @param <I> the paginated SDK type
4148 * @param <V> the item value type
42- * @author <a href="mailto:[email protected] ">Tag Spilman</a> 4349 */
44- public final class Paginator <I , V > {
50+ public final class Paginator <I , V > implements AutoCloseable {
4551
4652 private static final Logger logger =
4753 LoggerFactory .getLogger (Paginator .class );
4854
55+ private final ExecutorService service = Executors .newFixedThreadPool (
56+ Runtime .getRuntime ().availableProcessors ());
57+
4958 private final PageFetcher <I > fetcher ;
5059 private final Function <I , Page <V >> pageMapper ;
5160
5261 public Paginator (final PageFetcher <I > fetcher ,
53- final Function < I , Integer > pageNoMapper ,
54- final Function < I , Integer > totalPageMapper ,
55- final Function < I , Integer > totalCountMapper ,
62+ final ToIntFunction < I > pageNoMapper ,
63+ final ToIntFunction < I > totalPageMapper ,
64+ final ToIntFunction < I > totalCountMapper ,
5665 final Function <I , List <V >> itemsMapper ) {
5766 this .fetcher = fetcher ;
58- this .pageMapper =
59- i -> new Page <>( pageNoMapper . apply (i ), totalPageMapper . apply (i ),
60- totalCountMapper . apply ( i ), itemsMapper .apply (i ));
67+ this .pageMapper = i -> new Page <>( pageNoMapper . applyAsInt ( i ),
68+ totalPageMapper . applyAsInt (i ), totalCountMapper . applyAsInt (i ),
69+ itemsMapper .apply (i ));
6170 }
6271
6372 /**
@@ -67,7 +76,7 @@ public Paginator(final PageFetcher<I> fetcher,
6776 * @throws ApiException ...
6877 */
6978 public List <V > fetchAll () throws ApiException {
70- return fetchAll (Function . identity () , ArrayList ::new );
79+ return fetchAll (false , ArrayList ::new );
7180 }
7281
7382 /**
@@ -77,55 +86,77 @@ public List<V> fetchAll() throws ApiException {
7786 * @throws ApiException ...
7887 */
7988 public Set <V > fetchAllAsync () throws ApiException {
80- return fetchAll (IntStream :: parallel , HashSet ::new );
89+ return fetchAll (true , HashSet ::new );
8190 }
8291
83- private <C extends Collection <V >> C fetchAll (
84- final Function <IntStream , IntStream > streamMapper ,
92+ private <C extends Collection <V >> C fetchAll (final boolean parallel ,
8593 final Supplier <C > supplier ) throws ApiException {
8694 final I pageOfItems = fetcher .fetch (1 );
8795 final Page <V > firstPage = pageMapper .apply (pageOfItems );
8896 final AtomicInteger count = new AtomicInteger (0 );
8997 try {
90- final C items = streamMapper .apply (
91- IntStream .range (2 , firstPage .getTotalPages () + 1 ))
92- .mapToObj (pageNo -> fetch (pageNo , count )).map (pageMapper )
93- .map (Page ::getItems ).flatMap (List ::stream )
98+ return Stream .concat (
99+ Stream .of (pageOfItems ).map (CompletableFuture ::completedFuture ),
100+ IntStream .range (2 , firstPage .getTotalPages () + 1 )
101+ .mapToObj (pageNo -> submit (parallel , pageNo , count )))
102+ // collect here to act as a latch
103+ .toList ()
104+ .stream ()
105+ .map (Futures ::getUnchecked )
106+ .map (pageMapper )
107+ .map (Page ::getItems )
108+ .flatMap (List ::stream )
94109 .collect (Collectors .toCollection (supplier ));
95- items .addAll (firstPage .getItems ());
96- return items ;
97- } catch (RuntimeException e ) {
110+ } catch (final RuntimeException e ) {
98111 Throwables .throwIfInstanceOf (e .getCause (), ApiException .class );
99112 throw e ;
100113 }
101114 }
102115
103- private I fetch (final Integer pageNo , final AtomicInteger count ) {
104- final Stopwatch stopwatch = Stopwatch .createStarted ();
116+ private Future <I > submit (final boolean parallel , final int pageNo ,
117+ final AtomicInteger count ) {
118+ if (parallel ) {
119+ return service .submit (() -> fetch (pageNo , count ));
120+ }
105121
106122 try {
107- final I fetch = fetcher .fetch (pageNo );
108-
109- if (logger .isDebugEnabled ()) {
110- final Page <V > page = pageMapper .apply (fetch );
111- final Integer totalPages = page .getTotalPages ();
112- final String percent =
113- calculatePercentage (count .incrementAndGet (), totalPages );
114- logger .debug ("Page {}/{} ({} items) retrieved in {} ({}%)" ,
115- pageNo , totalPages , page .getItems ().size (), stopwatch ,
116- percent );
117- }
118-
119- return fetch ;
120- } catch (ApiException e ) {
121- throw new RuntimeException (e );
123+ final I result = fetch (pageNo , count );
124+
125+ return CompletableFuture .completedFuture (result );
126+ } catch (final ApiException e ) {
127+ return CompletableFuture .failedFuture (e );
122128 }
123129 }
124130
131+ private I fetch (final Integer pageNo , final AtomicInteger count )
132+ throws ApiException {
133+ final Stopwatch stopwatch = Stopwatch .createStarted ();
134+
135+ final I fetch = fetcher .fetch (pageNo );
136+
137+ if (logger .isDebugEnabled ()) {
138+ final Page <V > page = pageMapper .apply (fetch );
139+ final Integer totalPages = page .getTotalPages ();
140+ final String percent =
141+ calculatePercentage (count .incrementAndGet (), totalPages );
142+ logger .debug ("Page {}/{} ({} items) retrieved in {} ({}%)" , pageNo ,
143+ totalPages , page .getItems ().size (), stopwatch , percent );
144+ }
145+
146+ return fetch ;
147+ }
148+
125149 private static String calculatePercentage (final int a , final int b ) {
126150 return new BigDecimal (a ).divide (BigDecimal .valueOf (b ), 3 ,
127- RoundingMode .FLOOR ).multiply (BigDecimal .valueOf (100 )).setScale (1 )
151+ RoundingMode .FLOOR )
152+ .multiply (BigDecimal .valueOf (100 ))
153+ .setScale (1 , RoundingMode .UNNECESSARY )
128154 .toString ();
129155 }
130156
157+ @ Override
158+ public void close () {
159+ service .shutdown ();
160+ }
161+
131162}
0 commit comments