diff --git a/src/main/java/org/phoebus/channelfinder/ChannelRepository.java b/src/main/java/org/phoebus/channelfinder/ChannelRepository.java index ac671e3c..a5e89998 100644 --- a/src/main/java/org/phoebus/channelfinder/ChannelRepository.java +++ b/src/main/java/org/phoebus/channelfinder/ChannelRepository.java @@ -45,12 +45,15 @@ import java.io.IOException; import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Spliterator; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -187,53 +190,68 @@ public Channel save(Channel channel) { @SuppressWarnings("unchecked") @Override public Iterable saveAll(Iterable channels) { - // Create a list of all channel names - List ids = StreamSupport.stream(channels.spliterator(), false).map(Channel::getName).collect(Collectors.toList()); - try { - Map existingChannels = findAllById(ids).stream().collect(Collectors.toMap(Channel::getName, c -> c)); - - BulkRequest.Builder br = new BulkRequest.Builder(); - - for (Channel channel : channels) { - if (existingChannels.containsKey(channel.getName())) { - // merge with existing channel - Channel updatedChannel = existingChannels.get(channel.getName()); - if (channel.getOwner() != null && !channel.getOwner().isEmpty()) - updatedChannel.setOwner(channel.getOwner()); - updatedChannel.addProperties(channel.getProperties()); - updatedChannel.addTags(channel.getTags()); - br.operations(op -> op.index(i -> i.index(esService.getES_CHANNEL_INDEX()) - .id(updatedChannel.getName()) - .document(JsonData.of(updatedChannel, new JacksonJsonpMapper(objectMapper))))); + Spliterator split = channels.spliterator(); + int chunkSize = Integer.parseInt(System.getProperty("repository.chunking.size")); + + // Create a list of all channel names + List resultList = new ArrayList<>(); + while(true) { + List chunk = new ArrayList<>(chunkSize); + for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; + if (chunk.isEmpty()) break; + try { + List ids = StreamSupport.stream(channels.spliterator(), false) + .map(Channel::getName) + .toList(); + + BulkResponse result = getBulkResponseForChannels(chunk, ids); + // Log errors, if any + if (result.errors()) { + logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS); + for (BulkResponseItem item : result.items()) { + if (item.error() != null) { + logger.log(Level.SEVERE, () -> item.error().reason()); + } + } + // TODO cleanup? or throw exception? } else { - br.operations(op -> op.index(i -> i.index(esService.getES_CHANNEL_INDEX()) - .id(channel.getName()) - .document(JsonData.of(channel, new JacksonJsonpMapper(objectMapper))))); + resultList.addAll((Collection) findAllById(ids)); } + } catch (IOException e) { + String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels); + logger.log(Level.SEVERE, message, e); + throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null); } - BulkResponse result = null; - result = client.bulk(br.refresh(Refresh.True).build()); - // Log errors, if any - if (result.errors()) { - logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS); - for (BulkResponseItem item : result.items()) { - if (item.error() != null) { - logger.log(Level.SEVERE, () -> item.error().reason()); - } - } - // TODO cleanup? or throw exception? + } + return resultList; + } + + private BulkResponse getBulkResponseForChannels(Iterable channels, List ids) throws IOException { + Map existingChannels = findAllById(ids).stream().collect(Collectors.toMap(Channel::getName, c -> c)); + + BulkRequest.Builder br = new BulkRequest.Builder(); + + for (Channel channel : channels) { + if (existingChannels.containsKey(channel.getName())) { + // merge with existing channel + Channel updatedChannel = existingChannels.get(channel.getName()); + if (channel.getOwner() != null && !channel.getOwner().isEmpty()) + updatedChannel.setOwner(channel.getOwner()); + updatedChannel.addProperties(channel.getProperties()); + updatedChannel.addTags(channel.getTags()); + br.operations(op -> op.index(i -> i.index(esService.getES_CHANNEL_INDEX()) + .id(updatedChannel.getName()) + .document(JsonData.of(updatedChannel, new JacksonJsonpMapper(objectMapper))))); } else { - return (Iterable) findAllById(ids); + br.operations(op -> op.index(i -> i.index(esService.getES_CHANNEL_INDEX()) + .id(channel.getName()) + .document(JsonData.of(channel, new JacksonJsonpMapper(objectMapper))))); } - } catch (IOException e) { - String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels); - logger.log(Level.SEVERE, message, e); - throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null); } - return null; + return client.bulk(br.refresh(Refresh.True).build()); } /** diff --git a/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java b/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java index b7a39bae..be2d16fc 100644 --- a/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java +++ b/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java @@ -5,7 +5,9 @@ import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; +import java.util.Spliterator; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -44,9 +46,19 @@ public void sendToProcessors(List channels) { } taskExecutor.execute(() -> channelProcessors.stream() .filter(ChannelProcessor::enabled) + .forEach(channelProcessor -> { try { - channelProcessor.process(channels); + Spliterator split = channels.stream().spliterator(); + int chunkSize = Integer.parseInt(System.getProperty("processors.chunking.size")); + + while(true) { + List chunk = new ArrayList<>(chunkSize); + for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; + if (chunk.isEmpty()) break; + channelProcessor.process(chunk); + } + } catch (Exception e) { logger.log(Level.WARNING, "ChannelProcessor " + channelProcessor.getClass().getName() + " throws exception", e); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 6e23bb1b..15a7cbe0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -114,6 +114,12 @@ channelfinder.version=@project.version@ # DEBUG level will log all requests and responses to and from the REST end points logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO +################ Repository ################################################## +repository.chunking.size=10000 + +################ Processor ################################################## +processors.chunking.size=10000 + ################ Archiver Appliance Configuration Processor ################# aa.urls={'default': 'http://localhost:17665'} # Comma-separated list of archivers to use if archiver_property_name is null