diff --git a/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java b/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java index b7a39ba..8ce7f6b 100644 --- a/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java +++ b/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java @@ -2,10 +2,13 @@ import org.phoebus.channelfinder.entity.Channel; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; +import java.util.Spliterator; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -21,6 +24,9 @@ public class ChannelProcessorService { @Autowired private TaskExecutor taskExecutor; + @Value("processors.chunking.size") + private int chunkSize; + long getProcessorCount() { return channelProcessors.size(); } @@ -44,9 +50,18 @@ public void sendToProcessors(List channels) { } taskExecutor.execute(() -> channelProcessors.stream() .filter(ChannelProcessor::enabled) + .forEach(channelProcessor -> { try { - channelProcessor.process(channels); + Spliterator split = channels.stream().spliterator(); + + while(true) { + List chunk = new ArrayList<>(chunkSize); + for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; + if (chunk.isEmpty()) break; + channelProcessor.process(chunk); + } + } catch (Exception e) { logger.log(Level.WARNING, "ChannelProcessor " + channelProcessor.getClass().getName() + " throws exception", e); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f4a652d..52b9580 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -114,6 +114,9 @@ channelfinder.version=@project.version@ # DEBUG level will log all requests and responses to and from the REST end points logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO +################ Processor ################################################## +processors.chunking.size=10000 + ################ Archiver Appliance Configuration Processor ################# aa.urls={'default': 'http://localhost:17665'} # Comma-separated list of archivers to use if archiver_property_name is null