Skip to content

Commit de7db43

Browse files
committed
Make processor enabled runtime
1 parent 1c7dcfb commit de7db43

File tree

6 files changed

+79
-36
lines changed

6 files changed

+79
-36
lines changed

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ public interface ChannelProcessor {
88

99
boolean enabled();
1010

11-
String processorInfo();
11+
void setEnabled(boolean enabled);
12+
13+
ChannelProcessorInfo processorInfo();
1214

1315
long process(List<Channel> channels) throws JsonProcessingException;
1416
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.phoebus.channelfinder.processors;
2+
3+
import java.util.Map;
4+
5+
public record ChannelProcessorInfo(String name, boolean enabled, Map<String, String> properties) {}

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorManager.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.util.LinkedMultiValueMap;
2626
import org.springframework.util.MultiValueMap;
2727
import org.springframework.web.bind.annotation.GetMapping;
28+
import org.springframework.web.bind.annotation.PathVariable;
2829
import org.springframework.web.bind.annotation.PutMapping;
2930
import org.springframework.web.bind.annotation.RequestMapping;
3031
import org.springframework.web.bind.annotation.RequestParam;
@@ -75,10 +76,13 @@ public long processorCount() {
7576
responseCode = "200",
7677
description = "List of processor-info",
7778
content =
78-
@Content(array = @ArraySchema(schema = @Schema(implementation = String.class))))
79+
@Content(
80+
array =
81+
@ArraySchema(
82+
schema = @Schema(implementation = ChannelProcessorInfo.class))))
7983
})
80-
@GetMapping("/info")
81-
public List<String> processorInfo() {
84+
@GetMapping("/processors")
85+
public List<ChannelProcessorInfo> processorInfo() {
8286
return channelProcessorService.getProcessorsInfo();
8387
}
8488

@@ -151,4 +155,17 @@ public long processChannels(
151155
public void processChannels(List<Channel> channels) {
152156
channelProcessorService.sendToProcessors(channels);
153157
}
158+
159+
@Operation(summary = "Set if the processor is enabled or not")
160+
@PutMapping(
161+
value = "/processor/{processorName}/enabled",
162+
produces = {"application/json"},
163+
consumes = {"application/json"})
164+
public void setProcessorEnabled(
165+
@PathVariable("processorName") String processorName,
166+
@Parameter(description = "Value of enabled to set, default value: true")
167+
@RequestParam(required = false, name = "enabled", defaultValue = "true")
168+
Boolean enabled) {
169+
channelProcessorService.setProcessorEnabled(processorName, enabled);
170+
}
154171
}

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import java.util.Objects;
6+
import java.util.Optional;
57
import java.util.Spliterator;
68
import java.util.logging.Level;
79
import java.util.logging.Logger;
@@ -28,10 +30,16 @@ long getProcessorCount() {
2830
return channelProcessors.size();
2931
}
3032

31-
List<String> getProcessorsInfo() {
32-
return channelProcessors.stream()
33-
.map(ChannelProcessor::processorInfo)
34-
.collect(Collectors.toList());
33+
List<ChannelProcessorInfo> getProcessorsInfo() {
34+
return channelProcessors.stream().map(ChannelProcessor::processorInfo).toList();
35+
}
36+
37+
void setProcessorEnabled(String name, boolean enabled) {
38+
Optional<ChannelProcessor> processor =
39+
channelProcessors.stream()
40+
.filter(p -> Objects.equals(p.processorInfo().name(), name))
41+
.findFirst();
42+
processor.ifPresent(channelProcessor -> channelProcessor.setEnabled(enabled));
3543
}
3644

3745
/**

src/main/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessor.java

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.phoebus.channelfinder.entity.Channel;
1616
import org.phoebus.channelfinder.entity.Property;
1717
import org.phoebus.channelfinder.processors.ChannelProcessor;
18+
import org.phoebus.channelfinder.processors.ChannelProcessorInfo;
1819
import org.springframework.beans.factory.annotation.Autowired;
1920
import org.springframework.beans.factory.annotation.Value;
2021
import org.springframework.context.annotation.Configuration;
@@ -64,8 +65,15 @@ public boolean enabled() {
6465
}
6566

6667
@Override
67-
public String processorInfo() {
68-
Map<String, String> processorProperties =
68+
public void setEnabled(boolean enabled) {
69+
this.aaEnabled = enabled;
70+
}
71+
72+
@Override
73+
public ChannelProcessorInfo processorInfo() {
74+
return new ChannelProcessorInfo(
75+
"AAChannelProcessor",
76+
aaEnabled,
6977
Map.of(
7078
"archiveProperty",
7179
archivePropertyName,
@@ -74,8 +82,7 @@ public String processorInfo() {
7482
"Archivers",
7583
aaURLs.keySet().toString(),
7684
"AutoPauseOn",
77-
autoPauseOptions.toString());
78-
return "AAChannelProcessor: ProcessProperties " + processorProperties;
85+
autoPauseOptions.toString()));
7986
}
8087

8188
/**
@@ -199,7 +206,7 @@ private void addChannelChange(
199206
aaArchivePVS.get(archiverAlias).add(newArchiverPV);
200207
}
201208
}
202-
209+
203210
private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) {
204211
if (archiveStatus.equals("Being archived") && (pvStatus.equals(PV_STATUS_INACTIVE))) {
205212
return ArchiveAction.PAUSE;
@@ -209,52 +216,57 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) {
209216
&& !archiveStatus.equals("Paused")
210217
&& pvStatus.equals(PV_STATUS_ACTIVE)) { // If archive status anything else
211218
return ArchiveAction.ARCHIVE;
212-
213219
}
214220

215221
return ArchiveAction.NONE;
216222
}
217223

218224
private Map<ArchiveAction, List<ArchivePVOptions>> getArchiveActions(
219-
Map<String, ArchivePVOptions> archivePVS, ArchiverInfo archiverInfo) {
225+
Map<String, ArchivePVOptions> archivePVS, ArchiverInfo archiverInfo) {
220226
if (archiverInfo == null) {
221227
return Map.of();
222228
}
223229

224230
logger.log(Level.INFO, () -> String.format("Get archiver status in archiver %s", archiverInfo));
225231

226232
Map<ArchiveAction, List<ArchivePVOptions>> result = new EnumMap<>(ArchiveAction.class);
227-
Arrays.stream(ArchiveAction.values()).forEach(archiveAction -> result.put(archiveAction, new ArrayList<>()));
233+
Arrays.stream(ArchiveAction.values())
234+
.forEach(archiveAction -> result.put(archiveAction, new ArrayList<>()));
228235
// Don't request to archive an empty list.
229236
if (archivePVS.isEmpty()) {
230237
return result;
231238
}
232-
List<Map<String, String>> statuses = archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias());
239+
List<Map<String, String>> statuses =
240+
archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias());
233241
logger.log(Level.FINER, "Statuses {0}", statuses);
234-
statuses.forEach(archivePVStatusJsonMap -> {
235-
String archiveStatus = archivePVStatusJsonMap.get("status");
236-
String pvName = archivePVStatusJsonMap.get("pvName");
242+
statuses.forEach(
243+
archivePVStatusJsonMap -> {
244+
String archiveStatus = archivePVStatusJsonMap.get("status");
245+
String pvName = archivePVStatusJsonMap.get("pvName");
237246

238-
if (archiveStatus == null || pvName == null) {
239-
logger.log(Level.WARNING, "Missing status or pvName in archivePVStatusJsonMap: {0}", archivePVStatusJsonMap);
240-
return;
241-
}
247+
if (archiveStatus == null || pvName == null) {
248+
logger.log(
249+
Level.WARNING,
250+
"Missing status or pvName in archivePVStatusJsonMap: {0}",
251+
archivePVStatusJsonMap);
252+
return;
253+
}
242254

243-
ArchivePVOptions archivePVOptions = archivePVS.get(pvName);
244-
if (archivePVOptions == null) {
245-
logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName);
246-
return;
247-
}
255+
ArchivePVOptions archivePVOptions = archivePVS.get(pvName);
256+
if (archivePVOptions == null) {
257+
logger.log(Level.WARNING, "archivePVS does not contain pvName: {0}", pvName);
258+
return;
259+
}
248260

249-
String pvStatus = archivePVOptions.getPvStatus();
250-
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);
261+
String pvStatus = archivePVOptions.getPvStatus();
262+
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);
251263

252-
List<ArchivePVOptions> archivePVOptionsList = result.get(action);
253-
archivePVOptionsList.add(archivePVOptions);
254-
});
264+
List<ArchivePVOptions> archivePVOptionsList = result.get(action);
265+
archivePVOptionsList.add(archivePVOptions);
266+
});
255267
return result;
256268
}
257-
269+
258270
private ArchivePVOptions createArchivePV(
259271
List<String> policyList, Channel channel, String archiveProperty, String pvStaus) {
260272
ArchivePVOptions newArchiverPV = new ArchivePVOptions();

src/test/resources/application_test_multi.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO
8888
################ Archiver Appliance Configuration Processor #################
8989
aa.urls={'post': 'http://localhost:17664', 'query': 'http://localhost:17665'}
9090
aa.default_alias=post, query
91-
aa.enabled=true
9291
aa.pva=false
9392
aa.archive_property_name=archive
9493
aa.archiver_property_name=archiver

0 commit comments

Comments
 (0)