@@ -87,13 +87,15 @@ public String processorInfo() {
8787 * If the pvStatus auto pause is set, then the pv will be auto pause resumed as well.
8888 *
8989 * @param channels List of channels
90+ * @return Return number of channels processed
9091 * @throws JsonProcessingException If processing archiver responses fail.
9192 */
9293 @ Override
93- public void process (List <Channel > channels ) throws JsonProcessingException {
94+ public long process (List <Channel > channels ) throws JsonProcessingException {
9495 if (channels .isEmpty ()) {
95- return ;
96+ return 0 ;
9697 }
98+
9799 Map <String , List <ArchivePV >> aaArchivePVS = new HashMap <>(); // AA identifier, ArchivePV
98100 for (String alias : aaURLs .keySet ()) {
99101 aaArchivePVS .put (alias , new ArrayList <>());
@@ -106,36 +108,49 @@ public void process(List<Channel> channels) throws JsonProcessingException {
106108 .filter (xmlProperty -> archivePropertyName .equalsIgnoreCase (xmlProperty .getName ()))
107109 .findFirst ();
108110 if (archiveProperty .isPresent ()) {
109- String pvStatus = channel .getProperties ().stream ()
110- .filter (xmlProperty -> PV_STATUS_PROPERTY_NAME .equalsIgnoreCase (xmlProperty .getName ()))
111- .findFirst ()
112- .map (Property ::getValue )
113- .orElse (PV_STATUS_INACTIVE );
114- String archiverAlias = channel .getProperties ().stream ()
115- .filter (xmlProperty -> archiverPropertyName .equalsIgnoreCase (xmlProperty .getName ()))
116- .findFirst ()
117- .map (Property ::getValue )
118- .orElse (defaultArchiver );
119- ArchivePV newArchiverPV = createArchivePV (
120- policyLists .get (archiverAlias ),
121- channel ,
122- archiveProperty .get ().getValue (),
123- autoPauseOptions .contains (PV_STATUS_PROPERTY_NAME ) ? pvStatus : PV_STATUS_ACTIVE );
124- aaArchivePVS .get (archiverAlias ).add (newArchiverPV );
111+ try {
112+ addChannelChange (channel , aaArchivePVS , policyLists , archiveProperty );
113+ } catch (Exception e ) {
114+ logger .log (Level .WARNING , String .format ("Failed to process %s" , channel ), e );
115+ }
125116 } else if (autoPauseOptions .contains (archivePropertyName )) {
126117 aaURLs .keySet ().forEach (archiverAlias -> aaArchivePVS
127118 .get (archiverAlias )
128119 .add (createArchivePV (List .of (), channel , "" , PV_STATUS_INACTIVE )));
129120 }
130121 });
131-
122+ long count = 0 ;
132123 for (Map .Entry <String , List <ArchivePV >> e : aaArchivePVS .entrySet ()) {
133124 String archiverURL = aaURLs .get (e .getKey ());
134125 Map <String , ArchivePV > archivePVSList =
135126 e .getValue ().stream ().collect (Collectors .toMap (archivePV -> archivePV .pv , archivePV -> archivePV ));
136127 Map <ArchiveAction , List <ArchivePV >> archiveActionArchivePVMap =
137128 getArchiveActions (archivePVSList , archiverURL );
138- configureAA (archiveActionArchivePVMap , archiverURL );
129+ count += configureAA (archiveActionArchivePVMap , archiverURL );
130+ }
131+ long finalCount = count ;
132+ logger .log (Level .INFO , () -> String .format ("Configured %s channels." , finalCount ));
133+ return finalCount ;
134+ }
135+
136+ private void addChannelChange (Channel channel , Map <String , List <ArchivePV >> aaArchivePVS , Map <String , List <String >> policyLists , Optional <Property > archiveProperty ) {
137+ String pvStatus = channel .getProperties ().stream ()
138+ .filter (xmlProperty -> PV_STATUS_PROPERTY_NAME .equalsIgnoreCase (xmlProperty .getName ()))
139+ .findFirst ()
140+ .map (Property ::getValue )
141+ .orElse (PV_STATUS_INACTIVE );
142+ String archiverAlias = channel .getProperties ().stream ()
143+ .filter (xmlProperty -> archiverPropertyName .equalsIgnoreCase (xmlProperty .getName ()))
144+ .findFirst ()
145+ .map (Property ::getValue )
146+ .orElse (defaultArchiver );
147+ if (aaArchivePVS .containsKey (archiverAlias ) && archiveProperty .isPresent ()) {
148+ ArchivePV newArchiverPV = createArchivePV (
149+ policyLists .get (archiverAlias ),
150+ channel ,
151+ archiveProperty .get ().getValue (),
152+ autoPauseOptions .contains (PV_STATUS_PROPERTY_NAME ) ? pvStatus : PV_STATUS_ACTIVE );
153+ aaArchivePVS .get (archiverAlias ).add (newArchiverPV );
139154 }
140155 }
141156
@@ -217,13 +232,13 @@ private ArchivePV createArchivePV(
217232 return newArchiverPV ;
218233 }
219234
220- private void configureAA (Map <ArchiveAction , List <ArchivePV >> archivePVS , String aaURL )
235+ private long configureAA (Map <ArchiveAction , List <ArchivePV >> archivePVS , String aaURL )
221236 throws JsonProcessingException {
222237 logger .log (Level .INFO , () -> String .format ("Configure PVs %s in %s" , archivePVS .toString (), aaURL ));
223-
238+ long count = 0 ;
224239 // Don't request to archive an empty list.
225240 if (archivePVS .isEmpty ()) {
226- return ;
241+ return count ;
227242 }
228243 if (!archivePVS .get (ArchiveAction .ARCHIVE ).isEmpty ()) {
229244 logger .log (
@@ -234,6 +249,7 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
234249 objectMapper .writeValueAsString (archivePVS .get (ArchiveAction .ARCHIVE )),
235250 ArchiveAction .ARCHIVE .endpoint ,
236251 aaURL );
252+ count += archivePVS .get (ArchiveAction .ARCHIVE ).size ();
237253 }
238254 if (!archivePVS .get (ArchiveAction .PAUSE ).isEmpty ()) {
239255 logger .log (
@@ -246,6 +262,7 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
246262 .collect (Collectors .toList ())),
247263 ArchiveAction .PAUSE .endpoint ,
248264 aaURL );
265+ count += archivePVS .get (ArchiveAction .PAUSE ).size ();
249266 }
250267 if (!archivePVS .get (ArchiveAction .RESUME ).isEmpty ()) {
251268 logger .log (
@@ -258,20 +275,26 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
258275 .collect (Collectors .toList ())),
259276 ArchiveAction .RESUME .endpoint ,
260277 aaURL );
278+ count += archivePVS .get (ArchiveAction .RESUME ).size ();
261279 }
280+ return count ;
262281 }
263282
264283 private void submitAction (String values , String endpoint , String aaURL ) {
284+ try {
285+ String response = client .post ()
286+ .uri (URI .create (aaURL + MGMT_RESOURCE + endpoint ))
287+ .contentType (MediaType .APPLICATION_JSON )
288+ .bodyValue (values )
289+ .retrieve ()
290+ .bodyToMono (String .class )
291+ .timeout (Duration .of (10 , ChronoUnit .SECONDS ))
292+ .block ();
293+ logger .log (Level .FINE , () -> response );
265294
266- String response = client .post ()
267- .uri (URI .create (aaURL + MGMT_RESOURCE + endpoint ))
268- .contentType (MediaType .APPLICATION_JSON )
269- .bodyValue (values )
270- .retrieve ()
271- .bodyToMono (String .class )
272- .timeout (Duration .of (10 , ChronoUnit .SECONDS ))
273- .block ();
274- logger .log (Level .FINE , () -> response );
295+ } catch (Exception e ) {
296+ logger .log (Level .WARNING , String .format ("Failed to submit %s to %s on %s" , values , endpoint , aaURL ), e );
297+ }
275298 }
276299
277300 private Map <String , List <String >> getAAsPolicies (Map <String , String > aaURLs ) {
0 commit comments