Skip to content

Commit d5622c4

Browse files
committed
optimization for the create channels process (primarily used by recsync)
1 parent 2aa842b commit d5622c4

File tree

2 files changed

+97
-20
lines changed

2 files changed

+97
-20
lines changed

src/main/java/org/phoebus/channelfinder/ChannelManager.java

Lines changed: 81 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
import static org.phoebus.channelfinder.CFResourceDescriptors.CHANNEL_RESOURCE_URI;
44

55
import java.text.MessageFormat;
6+
import java.util.Iterator;
67
import java.util.List;
8+
import java.util.Map;
79
import java.util.Optional;
810
import java.util.logging.Level;
911
import java.util.logging.Logger;
1012
import java.util.stream.Collectors;
13+
import java.util.stream.StreamSupport;
1114

1215
import javax.servlet.ServletContext;
1316

1417
import com.google.common.collect.FluentIterable;
18+
import com.google.common.collect.Iterators;
1519
import com.google.common.collect.Lists;
1620
import org.phoebus.channelfinder.AuthorizationService.ROLES;
1721
import org.phoebus.channelfinder.entity.Channel;
@@ -172,17 +176,23 @@ public Iterable<Channel> create(@RequestBody Iterable<Channel> channels) {
172176
if(authorizationService.isAuthorizedRole(SecurityContextHolder.getContext().getAuthentication(), ROLES.CF_CHANNEL)) {
173177
// check if authorized owner
174178
long start = System.currentTimeMillis();
179+
Map<String, Channel> existingChannels = channelRepository.findAllById(StreamSupport
180+
.stream(channels.spliterator(), true)
181+
.map(Channel::getName)
182+
.collect(Collectors.toUnmodifiableList()))
183+
.stream().collect(Collectors.toMap(Channel::getName, channel -> {
184+
return channel;
185+
}));
175186
for(Channel channel: channels) {
176-
177-
Optional<Channel> existingChannel = channelRepository.findById(channel.getName());
178-
boolean present = existingChannel.isPresent();
187+
boolean present = existingChannels.containsKey(channel.getName());
179188
if(present) {
180-
if(!authorizationService.isAuthorizedOwner(SecurityContextHolder.getContext().getAuthentication(), existingChannel.get())) {
181-
String message = MessageFormat.format(TextUtil.USER_NOT_AUTHORIZED_ON_CHANNEL, existingChannel.get().toLog());
189+
Channel existingChannel = existingChannels.get(channel.getName());
190+
if(!authorizationService.isAuthorizedOwner(SecurityContextHolder.getContext().getAuthentication(), existingChannel)) {
191+
String message = MessageFormat.format(TextUtil.USER_NOT_AUTHORIZED_ON_CHANNEL, existingChannel.toLog());
182192
logger.log(Level.SEVERE, message, new ResponseStatusException(HttpStatus.UNAUTHORIZED));
183193
throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, message, null);
184194
}
185-
channel.setOwner(existingChannel.get().getOwner());
195+
channel.setOwner(existingChannel.getOwner());
186196
} else {
187197
if(!authorizationService.isAuthorizedOwner(SecurityContextHolder.getContext().getAuthentication(), channel)) {
188198
String message = MessageFormat.format(TextUtil.USER_NOT_AUTHORIZED_ON_CHANNEL, channel.toLog());
@@ -199,26 +209,34 @@ public Iterable<Channel> create(@RequestBody Iterable<Channel> channels) {
199209
start = System.currentTimeMillis();
200210

201211
// delete existing channels
202-
for(Channel channel: channels) {
203-
if(channelRepository.existsById(channel.getName())) {
204-
// delete existing channel
205-
channelRepository.deleteById(channel.getName());
206-
}
207-
}
212+
channelRepository.deleteAll(channels);
213+
// for(Channel channel: channels) {
214+
// if(channelRepository.existsById(channel.getName())) {
215+
// // delete existing channel
216+
// channelRepository.deleteById(channel.getName());
217+
// }
218+
// }
208219
logger.log(Level.SEVERE, "Completed replacement of Channels : " + (System.currentTimeMillis() - start) + "ms");
209220
start = System.currentTimeMillis();
210221

211222
// reset owners of attached tags/props back to existing owners
223+
Map<String, String> propOwners = StreamSupport
224+
.stream(propertyRepository.findAll().spliterator(), true)
225+
.collect(Collectors.toUnmodifiableMap(Property::getName, Property::getOwner));
226+
Map<String, String> tagOwners = StreamSupport
227+
.stream(tagRepository.findAll().spliterator(), true)
228+
.collect(Collectors.toUnmodifiableMap(Tag::getName, Tag::getOwner));
229+
212230
for(Channel channel: channels) {
213-
channel.getProperties().forEach(prop -> prop.setOwner(propertyRepository.findById(prop.getName()).get().getOwner()));
214-
channel.getTags().forEach(tag -> tag.setOwner(tagRepository.findById(tag.getName()).get().getOwner()));
231+
channel.getProperties().forEach(prop -> prop.setOwner(propOwners.get(prop.getName())));
232+
channel.getTags().forEach(tag -> tag.setOwner(tagOwners.get(tag.getName())));
215233
}
216234

217235
logger.log(Level.SEVERE, "Completed reset tag and property ownership : " + (System.currentTimeMillis() - start) + "ms");
218236
start = System.currentTimeMillis();
219-
channels.forEach(log ->
220-
channelManagerAudit.log(Level.INFO, MessageFormat.format(TextUtil.CREATE_CHANNEL, log.toLog()))
221-
);
237+
// channels.forEach(log ->
238+
// channelManagerAudit.log(Level.FINE, MessageFormat.format(TextUtil.CREATE_CHANNEL, log.toLog()))
239+
// );
222240

223241
logger.log(Level.SEVERE, "Completed logging : " + (System.currentTimeMillis() - start) + "ms");
224242
start = System.currentTimeMillis();
@@ -464,8 +482,53 @@ public void validateChannelRequest(Channel channel) {
464482
* @param channels list of channels to be validated
465483
*/
466484
public void validateChannelRequest(Iterable<Channel> channels) {
485+
List<String> existingProperties = StreamSupport
486+
.stream(propertyRepository.findAll().spliterator(), true)
487+
.map(Property::getName)
488+
.collect(Collectors.toUnmodifiableList());
489+
List<String> existingTags = StreamSupport
490+
.stream(tagRepository.findAll().spliterator(), true)
491+
.map(Tag::getName)
492+
.collect(Collectors.toUnmodifiableList());
467493
for(Channel channel: channels) {
468-
validateChannelRequest(channel);
494+
// 1
495+
if (channel.getName() == null || channel.getName().isEmpty()) {
496+
String message = MessageFormat.format(TextUtil.CHANNEL_NAME_CANNOT_BE_NULL_OR_EMPTY, channel.toLog());
497+
logger.log(Level.SEVERE, message, new ResponseStatusException(HttpStatus.BAD_REQUEST));
498+
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, message, null);
499+
}
500+
// 2
501+
if (channel.getOwner() == null || channel.getOwner().isEmpty()) {
502+
String message = MessageFormat.format(TextUtil.CHANNEL_OWNER_CANNOT_BE_NULL_OR_EMPTY, channel.toLog());
503+
logger.log(Level.SEVERE, message, new ResponseStatusException(HttpStatus.BAD_REQUEST));
504+
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, message, null);
505+
}
506+
// 3
507+
List <String> tagNames = channel.getTags().stream().map(Tag::getName).collect(Collectors.toList());
508+
for(String tagName:tagNames) {
509+
if(!existingTags.contains(tagName)) {
510+
String message = MessageFormat.format(TextUtil.TAG_NAME_DOES_NOT_EXIST, tagName);
511+
logger.log(Level.SEVERE, message, new ResponseStatusException(HttpStatus.NOT_FOUND));
512+
throw new ResponseStatusException(HttpStatus.NOT_FOUND, message);
513+
}
514+
}
515+
// 3
516+
List <String> propertyNames = channel.getProperties().stream().map(Property::getName).collect(Collectors.toList());
517+
List <String> propertyValues = channel.getProperties().stream().map(Property::getValue).collect(Collectors.toList());
518+
for(String propertyName:propertyNames) {
519+
if(!existingProperties.contains(propertyName)) {
520+
String message = MessageFormat.format(TextUtil.PROPERTY_NAME_DOES_NOT_EXIST, propertyName);
521+
logger.log(Level.SEVERE, message, new ResponseStatusException(HttpStatus.NOT_FOUND));
522+
throw new ResponseStatusException(HttpStatus.NOT_FOUND, message);
523+
}
524+
}
525+
for(String propertyValue:propertyValues) {
526+
if(propertyValue == null || propertyValue.isEmpty()) {
527+
String message = MessageFormat.format(TextUtil.PROPERTY_VALUE_NULL_OR_EMPTY, propertyNames.get(propertyValues.indexOf(propertyValue)), propertyValue);
528+
logger.log(Level.SEVERE, message, new ResponseStatusException(HttpStatus.BAD_REQUEST));
529+
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, message);
530+
}
531+
}
469532
}
470533
}
471534

src/main/java/org/phoebus/channelfinder/ChannelRepository.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import co.elastic.clients.elasticsearch.core.BulkResponse;
2828
import co.elastic.clients.elasticsearch.core.CountRequest;
2929
import co.elastic.clients.elasticsearch.core.CountResponse;
30+
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
3031
import co.elastic.clients.elasticsearch.core.DeleteResponse;
3132
import co.elastic.clients.elasticsearch.core.ExistsRequest;
3233
import co.elastic.clients.elasticsearch.core.GetResponse;
@@ -381,8 +382,21 @@ public void delete(Channel channel) {
381382
}
382383

383384
@Override
384-
public void deleteAll(Iterable<? extends Channel> entities) {
385-
throw new UnsupportedOperationException(TextUtil.DELETE_ALL_NOT_SUPPORTED);
385+
public void deleteAll(Iterable<? extends Channel> channels) {
386+
387+
BulkRequest.Builder br = new BulkRequest.Builder();
388+
for (Channel channel : channels) {
389+
br.operations(op -> op
390+
. delete(idx -> idx
391+
.index(ES_CHANNEL_INDEX)
392+
.id(channel.getName()))
393+
).refresh(Refresh.True);
394+
}
395+
try {
396+
BulkResponse result = client.bulk(br.build());
397+
} catch (IOException e) {
398+
e.printStackTrace();
399+
}
386400
}
387401

388402
@Override

0 commit comments

Comments
 (0)