Skip to content

Commit dd52749

Browse files
authored
Merge pull request #107 from jacomago/archiver-pause-resume
Add pause resume functionality to archiver functionality
2 parents 32b0194 + 8949ef2 commit dd52749

File tree

11 files changed

+704
-53
lines changed

11 files changed

+704
-53
lines changed

pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,22 @@
182182
</exclusion>
183183
</exclusions>
184184
</dependency>
185+
<dependency>
186+
<groupId>com.squareup.okhttp3</groupId>
187+
<artifactId>mockwebserver</artifactId>
188+
<version>4.11.0</version>
189+
<scope>test</scope>
190+
<exclusions>
191+
<exclusion>
192+
<groupId>junit</groupId>
193+
<artifactId>junit</artifactId>
194+
</exclusion>
195+
</exclusions>
196+
</dependency>
197+
<dependency>
198+
<groupId>io.netty</groupId> <!-- Need for running tests on mac -->
199+
<artifactId>netty-all</artifactId>
200+
</dependency>
185201
</dependencies>
186202
<build>
187203
<!-- read properties from the pom file and add them to the application.properties -->

src/main/java/org/phoebus/channelfinder/processors/AAChannelProcessor.java

Lines changed: 218 additions & 45 deletions
Large diffs are not rendered by default.

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class ChannelProcessorService {
2222
private TaskExecutor taskExecutor;
2323

2424
long getProcessorCount() {
25-
return channelProcessors.stream().count();
25+
return channelProcessors.size();
2626
}
2727

2828
List<String> getProcessorsNames() {

src/main/resources/application.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ server.ssl.key-alias=cf
1313

1414
security.require-ssl=true
1515

16-
logging.level.org.springframework.web=DEBUG
16+
logging.level.org.springframework.web=INFO
1717
spring.http.log-request-details=true
1818

1919
# Enable HTTP/2 support, if the current environment supports it

src/site/sphinx/config.rst

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,26 @@ Embedded LDAP Server
6161
When :ref:`conf-embedded_ldap.enabled` is **true**,
6262
An LDAP server is run by the channelfinder service process and is initially populated
6363
with entries read from the file referenced by :ref:`conf-embedded_ldap.urls`.
64+
65+
Archiver Appliance Configuration Processor
66+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67+
To enable the archiver appliance configuration processor, set the property :ref:`aa.enabled` to **true**.
68+
69+
A list of archiver appliance URLs and aliases. ::
70+
71+
aa.urls={'default': 'http://archiver-01.example.com:17665', 'neutron-controls': 'http://archiver-02.example.com:17665'}
72+
73+
To set the choice of default archiver appliance, set the property :ref:`aa.default_alias` to the alias of the default archiver appliance.
74+
75+
To pass the PV as "pva://PVNAME" to the archiver appliance, set the property :ref:`aa.pva` to **true**.
76+
77+
The properties checked for setting a PV to be archived are ::
78+
79+
aa.archive_property_name=archive
80+
aa.archiver_property_name=archiver
81+
82+
83+
To set the auto pause behaviour, configure the parameter :ref:`aa.auto_pause`. Set to pvStatus to pause on pvStatus=Inactive,
84+
and resume on pvStatus=Active. Set to archive to pause on archive_property_name not existing. Set to both to pause on pvStatus=Inactive and archive_property_name::
85+
86+
aa.auto_pause=pvStatus,archive
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package org.phoebus.channelfinder.processors;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import okhttp3.mockwebserver.MockResponse;
6+
import okhttp3.mockwebserver.MockWebServer;
7+
import okhttp3.mockwebserver.RecordedRequest;
8+
import org.jetbrains.annotations.NotNull;
9+
import org.junit.jupiter.api.AfterEach;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.params.ParameterizedTest;
13+
import org.junit.jupiter.params.provider.Arguments;
14+
import org.junit.jupiter.params.provider.MethodSource;
15+
import org.phoebus.channelfinder.entity.Channel;
16+
import org.phoebus.channelfinder.entity.Property;
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
19+
import org.springframework.test.context.TestPropertySource;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.stream.Stream;
26+
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
29+
@WebMvcTest(AAChannelProcessor.class)
30+
@TestPropertySource(value = "classpath:application_test.properties")
31+
class AAChannelProcessorIT {
32+
33+
protected static Property archiveProperty = new Property("archive", "owner", "default");
34+
protected static Property activeProperty = new Property("pvStatus", "owner", "Active");
35+
protected static Property inactiveProperty = new Property("pvStatus", "owner", "Inactive");
36+
37+
@Autowired
38+
AAChannelProcessor aaChannelProcessor;
39+
40+
MockWebServer mockArchiverAppliance;
41+
ObjectMapper objectMapper;
42+
43+
@NotNull
44+
private static Stream<Arguments> processSource() {
45+
return Stream.of(
46+
Arguments.of(
47+
new Channel("PVArchivedActive", "owner", List.of(archiveProperty, activeProperty), List.of()),
48+
"Being archived",
49+
"",
50+
""),
51+
Arguments.of(
52+
new Channel("PVPausedActive", "owner", List.of(archiveProperty, activeProperty), List.of()),
53+
"Paused",
54+
"resumeArchivingPV",
55+
"[\"PVPausedActive\"]"),
56+
Arguments.of(
57+
new Channel("PVNoneActive", "owner", List.of(archiveProperty, activeProperty), List.of()),
58+
"Not being archived",
59+
"archivePV",
60+
"[{\"pv\":\"PVNoneActive\"}]"),
61+
Arguments.of(
62+
new Channel(
63+
"PVArchivedInactive", "owner", List.of(archiveProperty, inactiveProperty), List.of()),
64+
"Being archived",
65+
"pauseArchivingPV",
66+
"[\"PVArchivedInactive\"]"),
67+
Arguments.of(
68+
new Channel("PVPausedInactive", "owner", List.of(archiveProperty, inactiveProperty), List.of()),
69+
"Paused",
70+
"",
71+
""),
72+
Arguments.of(
73+
new Channel("PVNoneInactive", "owner", List.of(archiveProperty, inactiveProperty), List.of()),
74+
"Not being archived",
75+
"",
76+
""),
77+
Arguments.of(
78+
new Channel("PVArchivedNotag", "owner", List.of(), List.of()),
79+
"Being archived",
80+
"pauseArchivingPV",
81+
"[\"PVArchivedNotag\"]"));
82+
}
83+
84+
public static void paramableAAChannelProcessorTest(
85+
MockWebServer mockArchiverAppliance,
86+
ObjectMapper objectMapper,
87+
ChannelProcessor aaChannelProcessor,
88+
Channel channel,
89+
String archiveStatus,
90+
String archiverEndpoint,
91+
String submissionBody)
92+
throws JsonProcessingException, InterruptedException {
93+
// Request to policies
94+
Map<String, String> policyList = Map.of("policy", "description");
95+
mockArchiverAppliance.enqueue(new MockResponse()
96+
.setBody(objectMapper.writeValueAsString(policyList))
97+
.addHeader("Content-Type", "application/json"));
98+
99+
if (!archiveStatus.isEmpty()) {
100+
101+
// Request to archiver status
102+
List<Map<String, String>> archivePVStatuses =
103+
List.of(Map.of("pvName", channel.getName(), "status", archiveStatus));
104+
mockArchiverAppliance.enqueue(new MockResponse()
105+
.setBody(objectMapper.writeValueAsString(archivePVStatuses))
106+
.addHeader("Content-Type", "application/json"));
107+
}
108+
if (!archiverEndpoint.isEmpty()) {
109+
// Request to archiver to archive
110+
List<Map<String, String>> archiverResponse =
111+
List.of(Map.of("pvName", channel.getName(), "status", "Archive request submitted"));
112+
mockArchiverAppliance.enqueue(new MockResponse()
113+
.setBody(objectMapper.writeValueAsString(archiverResponse))
114+
.addHeader("Content-Type", "application/json"));
115+
}
116+
117+
aaChannelProcessor.process(List.of(channel));
118+
119+
int expectedRequests = 1;
120+
RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS);
121+
assert requestPolicy != null;
122+
assertEquals("/mgmt/bpl/getPolicyList", requestPolicy.getPath());
123+
124+
if (!archiveStatus.isEmpty()) {
125+
expectedRequests += 1;
126+
RecordedRequest requestStatus = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS);
127+
assert requestStatus != null;
128+
assert requestStatus.getRequestUrl() != null;
129+
assertEquals("/mgmt/bpl/getPVStatus", requestStatus.getRequestUrl().encodedPath());
130+
}
131+
132+
if (!archiverEndpoint.isEmpty()) {
133+
expectedRequests += 1;
134+
RecordedRequest requestAction = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS);
135+
assert requestAction != null;
136+
assertEquals("/mgmt/bpl/" + archiverEndpoint, requestAction.getPath());
137+
assertEquals(submissionBody, requestAction.getBody().readUtf8());
138+
}
139+
140+
assertEquals(mockArchiverAppliance.getRequestCount(), expectedRequests);
141+
}
142+
143+
@BeforeEach
144+
void setUp() throws IOException {
145+
mockArchiverAppliance = new MockWebServer();
146+
mockArchiverAppliance.start(17665);
147+
148+
objectMapper = new ObjectMapper();
149+
}
150+
151+
@AfterEach
152+
void teardown() throws IOException {
153+
mockArchiverAppliance.shutdown();
154+
}
155+
156+
@Test
157+
void testProcessNoPVs() throws JsonProcessingException {
158+
aaChannelProcessor.process(List.of());
159+
160+
assertEquals(mockArchiverAppliance.getRequestCount(), 0);
161+
}
162+
163+
@ParameterizedTest
164+
@MethodSource("processSource")
165+
void testProcessNotArchivedActive(
166+
Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody)
167+
throws JsonProcessingException, InterruptedException {
168+
paramableAAChannelProcessorTest(
169+
mockArchiverAppliance,
170+
objectMapper,
171+
aaChannelProcessor,
172+
channel,
173+
archiveStatus,
174+
archiverEndpoint,
175+
submissionBody);
176+
}
177+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.phoebus.channelfinder.processors;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import okhttp3.mockwebserver.MockWebServer;
6+
import org.junit.jupiter.api.AfterEach;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.params.ParameterizedTest;
9+
import org.junit.jupiter.params.provider.Arguments;
10+
import org.junit.jupiter.params.provider.MethodSource;
11+
import org.phoebus.channelfinder.entity.Channel;
12+
import org.springframework.beans.factory.annotation.Autowired;
13+
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
14+
import org.springframework.test.context.TestPropertySource;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.stream.Stream;
19+
20+
import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.archiveProperty;
21+
import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.inactiveProperty;
22+
import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.paramableAAChannelProcessorTest;
23+
24+
@WebMvcTest(AAChannelProcessor.class)
25+
@TestPropertySource(locations = "classpath:application_test.properties", properties = "aa.auto_pause=none")
26+
class AAChannelProcessorNoPauseIT {
27+
28+
@Autowired
29+
AAChannelProcessor aaChannelProcessor;
30+
31+
MockWebServer mockArchiverAppliance;
32+
ObjectMapper objectMapper;
33+
34+
private static Stream<Arguments> processNoPauseSource() {
35+
36+
return Stream.of(
37+
Arguments.of(
38+
new Channel(
39+
"PVArchivedInactive", "owner", List.of(archiveProperty, inactiveProperty), List.of()),
40+
"Being archived",
41+
"",
42+
""),
43+
Arguments.of(
44+
new Channel("PVArchivedNotag", "owner", List.of(), List.of()),
45+
"",
46+
"",
47+
""));
48+
}
49+
50+
@BeforeEach
51+
void setUp() throws IOException {
52+
mockArchiverAppliance = new MockWebServer();
53+
mockArchiverAppliance.start(17665);
54+
55+
objectMapper = new ObjectMapper();
56+
}
57+
58+
@AfterEach
59+
void teardown() throws IOException {
60+
mockArchiverAppliance.shutdown();
61+
}
62+
63+
@ParameterizedTest
64+
@MethodSource("processNoPauseSource")
65+
void testProcessNotArchivedActive(
66+
Channel channel,
67+
String archiveStatus,
68+
String archiverEndpoint,
69+
String submissionBody)
70+
throws JsonProcessingException, InterruptedException {
71+
paramableAAChannelProcessorTest(
72+
mockArchiverAppliance,
73+
objectMapper,
74+
aaChannelProcessor,
75+
channel,
76+
archiveStatus,
77+
archiverEndpoint,
78+
submissionBody);
79+
}
80+
}

0 commit comments

Comments
 (0)