22
33import org .phoebus .channelfinder .entity .Channel ;
44import org .springframework .beans .factory .annotation .Autowired ;
5+ import org .springframework .beans .factory .annotation .Value ;
56import org .springframework .core .task .TaskExecutor ;
67import org .springframework .stereotype .Service ;
78
9+ import java .util .ArrayList ;
810import java .util .List ;
11+ import java .util .Spliterator ;
912import java .util .logging .Level ;
1013import java .util .logging .Logger ;
1114import java .util .stream .Collectors ;
@@ -21,6 +24,9 @@ public class ChannelProcessorService {
2124 @ Autowired
2225 private TaskExecutor taskExecutor ;
2326
27+ @ Value ("processors.chunking.size" )
28+ private int chunkSize ;
29+
2430 long getProcessorCount () {
2531 return channelProcessors .size ();
2632 }
@@ -44,9 +50,18 @@ public void sendToProcessors(List<Channel> channels) {
4450 }
4551 taskExecutor .execute (() -> channelProcessors .stream ()
4652 .filter (ChannelProcessor ::enabled )
53+
4754 .forEach (channelProcessor -> {
4855 try {
49- channelProcessor .process (channels );
56+ Spliterator <Channel > split = channels .stream ().spliterator ();
57+
58+ while (true ) {
59+ List <Channel > chunk = new ArrayList <>(chunkSize );
60+ for (int i = 0 ; i < chunkSize && split .tryAdvance (chunk ::add ); i ++){};
61+ if (chunk .isEmpty ()) break ;
62+ channelProcessor .process (chunk );
63+ }
64+
5065 } catch (Exception e ) {
5166 logger .log (Level .WARNING , "ChannelProcessor " + channelProcessor .getClass ().getName () + " throws exception" , e );
5267 }
0 commit comments