Skip to content

Commit 9f93d76

Browse files
authored
Merge pull request #192 from jacomago/runtime-processor-config
Make processor enabled runtime
2 parents 1c7dcfb + 010fbce commit 9f93d76

17 files changed

+315
-80
lines changed

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ public interface ChannelProcessor {
88

99
boolean enabled();
1010

11-
String processorInfo();
11+
void setEnabled(boolean enabled);
12+
13+
ChannelProcessorInfo processorInfo();
1214

1315
long process(List<Channel> channels) throws JsonProcessingException;
1416
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.phoebus.channelfinder.processors;
2+
3+
import java.util.Map;
4+
5+
public record ChannelProcessorInfo(String name, boolean enabled, Map<String, String> properties) {}

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorManager.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.util.LinkedMultiValueMap;
2626
import org.springframework.util.MultiValueMap;
2727
import org.springframework.web.bind.annotation.GetMapping;
28+
import org.springframework.web.bind.annotation.PathVariable;
2829
import org.springframework.web.bind.annotation.PutMapping;
2930
import org.springframework.web.bind.annotation.RequestMapping;
3031
import org.springframework.web.bind.annotation.RequestParam;
@@ -75,10 +76,13 @@ public long processorCount() {
7576
responseCode = "200",
7677
description = "List of processor-info",
7778
content =
78-
@Content(array = @ArraySchema(schema = @Schema(implementation = String.class))))
79+
@Content(
80+
array =
81+
@ArraySchema(
82+
schema = @Schema(implementation = ChannelProcessorInfo.class))))
7983
})
80-
@GetMapping("/info")
81-
public List<String> processorInfo() {
84+
@GetMapping("/processors")
85+
public List<ChannelProcessorInfo> processorInfo() {
8286
return channelProcessorService.getProcessorsInfo();
8387
}
8488

@@ -151,4 +155,17 @@ public long processChannels(
151155
public void processChannels(List<Channel> channels) {
152156
channelProcessorService.sendToProcessors(channels);
153157
}
158+
159+
@Operation(summary = "Set if the processor is enabled or not")
160+
@PutMapping(
161+
value = "/processor/{processorName}/enabled",
162+
produces = {"application/json"},
163+
consumes = {"application/json"})
164+
public void setProcessorEnabled(
165+
@PathVariable("processorName") String processorName,
166+
@Parameter(description = "Value of enabled to set, default value: true")
167+
@RequestParam(required = false, name = "enabled", defaultValue = "true")
168+
Boolean enabled) {
169+
channelProcessorService.setProcessorEnabled(processorName, enabled);
170+
}
154171
}

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import java.util.Objects;
6+
import java.util.Optional;
57
import java.util.Spliterator;
68
import java.util.logging.Level;
79
import java.util.logging.Logger;
@@ -17,21 +19,35 @@ public class ChannelProcessorService {
1719

1820
private static final Logger logger = Logger.getLogger(ChannelProcessorService.class.getName());
1921

20-
@Autowired private List<ChannelProcessor> channelProcessors;
22+
private final List<ChannelProcessor> channelProcessors;
2123

22-
@Autowired private TaskExecutor taskExecutor;
24+
private final TaskExecutor taskExecutor;
2325

24-
@Value("${processors.chunking.size:10000}")
25-
private int chunkSize;
26+
private final int chunkSize;
27+
28+
public ChannelProcessorService(
29+
@Autowired List<ChannelProcessor> channelProcessors,
30+
@Autowired TaskExecutor taskExecutor,
31+
@Value("${processors.chunking.size:10000}") int chunkSize) {
32+
this.channelProcessors = channelProcessors;
33+
this.taskExecutor = taskExecutor;
34+
this.chunkSize = chunkSize;
35+
}
2636

2737
long getProcessorCount() {
2838
return channelProcessors.size();
2939
}
3040

31-
List<String> getProcessorsInfo() {
32-
return channelProcessors.stream()
33-
.map(ChannelProcessor::processorInfo)
34-
.collect(Collectors.toList());
41+
List<ChannelProcessorInfo> getProcessorsInfo() {
42+
return channelProcessors.stream().map(ChannelProcessor::processorInfo).toList();
43+
}
44+
45+
void setProcessorEnabled(String name, boolean enabled) {
46+
Optional<ChannelProcessor> processor =
47+
channelProcessors.stream()
48+
.filter(p -> Objects.equals(p.processorInfo().name(), name))
49+
.findFirst();
50+
processor.ifPresent(channelProcessor -> channelProcessor.setEnabled(enabled));
3551
}
3652

3753
/**
@@ -60,7 +76,6 @@ public void sendToProcessors(List<Channel> channels) {
6076
while (true) {
6177
List<Channel> chunk = new ArrayList<>(chunkSize);
6278
for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++) {}
63-
;
6479
if (chunk.isEmpty()) break;
6580
channelProcessor.process(chunk);
6681
}

src/main/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessor.java

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.phoebus.channelfinder.entity.Channel;
1616
import org.phoebus.channelfinder.entity.Property;
1717
import org.phoebus.channelfinder.processors.ChannelProcessor;
18+
import org.phoebus.channelfinder.processors.ChannelProcessorInfo;
1819
import org.springframework.beans.factory.annotation.Autowired;
1920
import org.springframework.beans.factory.annotation.Value;
2021
import org.springframework.context.annotation.Configuration;
@@ -64,8 +65,15 @@ public boolean enabled() {
6465
}
6566

6667
@Override
67-
public String processorInfo() {
68-
Map<String, String> processorProperties =
68+
public void setEnabled(boolean enabled) {
69+
this.aaEnabled = enabled;
70+
}
71+
72+
@Override
73+
public ChannelProcessorInfo processorInfo() {
74+
return new ChannelProcessorInfo(
75+
"AAChannelProcessor",
76+
aaEnabled,
6977
Map.of(
7078
"archiveProperty",
7179
archivePropertyName,
@@ -74,8 +82,7 @@ public String processorInfo() {
7482
"Archivers",
7583
aaURLs.keySet().toString(),
7684
"AutoPauseOn",
77-
autoPauseOptions.toString());
78-
return "AAChannelProcessor: ProcessProperties " + processorProperties;
85+
autoPauseOptions.toString()));
7986
}
8087

8188
/**
@@ -199,7 +206,7 @@ private void addChannelChange(
199206
aaArchivePVS.get(archiverAlias).add(newArchiverPV);
200207
}
201208
}
202-
209+
203210
private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) {
204211
if (archiveStatus.equals("Being archived") && (pvStatus.equals(PV_STATUS_INACTIVE))) {
205212
return ArchiveAction.PAUSE;
@@ -209,52 +216,57 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) {
209216
&& !archiveStatus.equals("Paused")
210217
&& pvStatus.equals(PV_STATUS_ACTIVE)) { // If archive status anything else
211218
return ArchiveAction.ARCHIVE;
212-
213219
}
214220

215221
return ArchiveAction.NONE;
216222
}
217223

218224
private Map<ArchiveAction, List<ArchivePVOptions>> getArchiveActions(
219-
Map<String, ArchivePVOptions> archivePVS, ArchiverInfo archiverInfo) {
225+
Map<String, ArchivePVOptions> archivePVS, ArchiverInfo archiverInfo) {
220226
if (archiverInfo == null) {
221227
return Map.of();
222228
}
223229

224230
logger.log(Level.INFO, () -> String.format("Get archiver status in archiver %s", archiverInfo));
225231

226232
Map<ArchiveAction, List<ArchivePVOptions>> result = new EnumMap<>(ArchiveAction.class);
227-
Arrays.stream(ArchiveAction.values()).forEach(archiveAction -> result.put(archiveAction, new ArrayList<>()));
233+
Arrays.stream(ArchiveAction.values())
234+
.forEach(archiveAction -> result.put(archiveAction, new ArrayList<>()));
228235
// Don't request to archive an empty list.
229236
if (archivePVS.isEmpty()) {
230237
return result;
231238
}
232-
List<Map<String, String>> statuses = archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias());
239+
List<Map<String, String>> statuses =
240+
archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias());
233241
logger.log(Level.FINER, "Statuses {0}", statuses);
234-
statuses.forEach(archivePVStatusJsonMap -> {
235-
String archiveStatus = archivePVStatusJsonMap.get("status");
236-
String pvName = archivePVStatusJsonMap.get("pvName");
242+
statuses.forEach(
243+
archivePVStatusJsonMap -> {
244+
String archiveStatus = archivePVStatusJsonMap.get("status");
245+
String pvName = archivePVStatusJsonMap.get("pvName");
237246

238-
if (archiveStatus == null || pvName == null) {
239-
logger.log(Level.WARNING, "Missing status or pvName in archivePVStatusJsonMap: {0}", archivePVStatusJsonMap);
240-
return;
241-
}
247+
if (archiveStatus == null || pvName == null) {
248+
logger.log(
249+
Level.WARNING,
250+
"Missing status or pvName in archivePVStatusJsonMap: {0}",
251+
archivePVStatusJsonMap);
252+
return;
253+
}
242254

243-
ArchivePVOptions archivePVOptions = archivePVS.get(pvName);
244-
if (archivePVOptions == null) {
245-
logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName);
246-
return;
247-
}
255+
ArchivePVOptions archivePVOptions = archivePVS.get(pvName);
256+
if (archivePVOptions == null) {
257+
logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName);
258+
return;
259+
}
248260

249-
String pvStatus = archivePVOptions.getPvStatus();
250-
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);
261+
String pvStatus = archivePVOptions.getPvStatus();
262+
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);
251263

252-
List<ArchivePVOptions> archivePVOptionsList = result.get(action);
253-
archivePVOptionsList.add(archivePVOptions);
254-
});
264+
List<ArchivePVOptions> archivePVOptionsList = result.get(action);
265+
archivePVOptionsList.add(archivePVOptions);
266+
});
255267
return result;
256268
}
257-
269+
258270
private ArchivePVOptions createArchivePV(
259271
List<String> policyList, Channel channel, String archiveProperty, String pvStaus) {
260272
ArchivePVOptions newArchiverPV = new ArchivePVOptions();
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package org.phoebus.channelfinder.processors;
2+
3+
import static org.hamcrest.Matchers.containsInAnyOrder;
4+
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
5+
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
6+
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;
7+
8+
import java.util.List;
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.ExtendWith;
11+
import org.mockito.Mockito;
12+
import org.phoebus.channelfinder.CFResourceDescriptors;
13+
import org.phoebus.channelfinder.ChannelScroll;
14+
import org.phoebus.channelfinder.entity.Scroll;
15+
import org.springframework.beans.factory.annotation.Autowired;
16+
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
17+
import org.springframework.boot.test.mock.mockito.MockBean;
18+
import org.springframework.http.HttpHeaders;
19+
import org.springframework.test.context.TestPropertySource;
20+
import org.springframework.test.context.junit.jupiter.SpringExtension;
21+
import org.springframework.test.web.servlet.MockMvc;
22+
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
23+
import org.springframework.util.Base64Utils;
24+
25+
@ExtendWith(SpringExtension.class)
26+
@WebMvcTest(ChannelProcessorManager.class)
27+
@TestPropertySource(
28+
value = "classpath:application_test.properties",
29+
properties = {"elasticsearch.create.indices = false"})
30+
class ChannelProcessorManagerIT {
31+
32+
protected static final String AUTHORIZATION =
33+
"Basic " + Base64Utils.encodeToString("admin:adminPass".getBytes());
34+
35+
@Autowired protected MockMvc mockMvc;
36+
@MockBean ChannelScroll channelScroll;
37+
38+
@Test
39+
void testProcessorCount() throws Exception {
40+
MockHttpServletRequestBuilder request =
41+
get("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/count");
42+
mockMvc.perform(request).andExpect(status().isOk()).andExpect(content().string("2"));
43+
}
44+
45+
@Test
46+
void testProcessorsInfo() throws Exception {
47+
MockHttpServletRequestBuilder request =
48+
get("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/processors");
49+
mockMvc
50+
.perform(request)
51+
.andExpect(status().isOk())
52+
.andExpect(
53+
jsonPath("$[*].name", containsInAnyOrder("AAChannelProcessor", "DummyProcessor")));
54+
}
55+
56+
@Test
57+
void testProcessorEnabled() throws Exception {
58+
MockHttpServletRequestBuilder request =
59+
put("/"
60+
+ CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI
61+
+ "/processor/AAChannelProcessor/enabled")
62+
.header(HttpHeaders.AUTHORIZATION, AUTHORIZATION)
63+
.contentType("application/json")
64+
.content("{\"enabled\": false}");
65+
mockMvc.perform(request).andExpect(status().isOk());
66+
}
67+
68+
@Test
69+
void testProcessAllChannels() throws Exception {
70+
Mockito.when(channelScroll.query(Mockito.any())).thenReturn(new Scroll("", List.of()));
71+
72+
MockHttpServletRequestBuilder request =
73+
put("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/process/all")
74+
.header(HttpHeaders.AUTHORIZATION, AUTHORIZATION);
75+
mockMvc.perform(request).andExpect(status().isOk());
76+
}
77+
78+
@Test
79+
void testProcessQuery() throws Exception {
80+
Mockito.when(channelScroll.query(Mockito.any())).thenReturn(new Scroll("", List.of()));
81+
MockHttpServletRequestBuilder request =
82+
put("/" + CFResourceDescriptors.CHANNEL_PROCESSOR_RESOURCE_URI + "/process/query")
83+
.header(HttpHeaders.AUTHORIZATION, AUTHORIZATION);
84+
mockMvc.perform(request).andExpect(status().isOk());
85+
}
86+
}

0 commit comments

Comments
 (0)