Skip to content

Commit 8ea5fe6

Browse files
Merge pull request #168 from wttech/events
Events for distributed actions
2 parents 3c0367b + 796cad4 commit 8ea5fe6

File tree

17 files changed

+514
-47
lines changed

17 files changed

+514
-47
lines changed

core/src/main/java/dev/vml/es/acm/core/code/ExecutionHistory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,18 @@ private Stream<ExecutionSummary> toExecutionSummaries(Stream<Resource> entries)
120120
private Stream<Resource> executeSql(String sql) {
121121
return StreamUtils.asStream(resourceResolver.findResources(sql, Query.JCR_SQL2));
122122
}
123+
124+
public boolean clear() {
125+
try {
126+
Resource root = resourceResolver.getResource(ROOT);
127+
if (root != null) {
128+
resourceResolver.delete(root);
129+
resourceResolver.commit();
130+
return true;
131+
}
132+
return false;
133+
} catch (PersistenceException e) {
134+
throw new AcmException(String.format("Cannot clear execution history at root '%s'!", ROOT), e);
135+
}
136+
}
123137
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package dev.vml.es.acm.core.event;
2+
3+
import java.util.Calendar;
4+
5+
public interface Event {
6+
7+
String getName();
8+
9+
Calendar getTriggeredAt();
10+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package dev.vml.es.acm.core.event;
2+
3+
import dev.vml.es.acm.core.code.ExecutionHistory;
4+
import dev.vml.es.acm.core.code.ExecutionQueue;
5+
import dev.vml.es.acm.core.util.ResourceUtils;
6+
import org.apache.sling.api.resource.LoginException;
7+
import org.apache.sling.api.resource.ResourceResolver;
8+
import org.apache.sling.api.resource.ResourceResolverFactory;
9+
import org.osgi.service.component.annotations.Component;
10+
import org.osgi.service.component.annotations.Reference;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
@Component(
15+
service = {EventDispatcher.class, EventListener.class},
16+
immediate = true)
17+
public class EventDispatcher implements EventListener {
18+
19+
private final Logger LOG = LoggerFactory.getLogger(EventDispatcher.class);
20+
21+
@Reference
22+
private EventManager eventManager;
23+
24+
@Reference
25+
private ExecutionQueue executionQueue;
26+
27+
@Reference
28+
private ResourceResolverFactory resourceResolverFactory;
29+
30+
public void dispatch(EventType eventType) {
31+
eventManager.triggerEvent(eventType.name());
32+
}
33+
34+
@Override
35+
public void onEvent(Event event) {
36+
EventType eventType = EventType.of(event.getName()).orElse(null);
37+
if (eventType == null) {
38+
return;
39+
}
40+
switch (eventType) {
41+
case EXECUTION_QUEUE_RESET:
42+
executionQueue.reset();
43+
break;
44+
case HISTORY_CLEAR:
45+
doHistoryClear();
46+
break;
47+
default:
48+
// not handled
49+
break;
50+
}
51+
}
52+
53+
private void doHistoryClear() {
54+
try (ResourceResolver resolver = ResourceUtils.contentResolver(resourceResolverFactory, null)) {
55+
ExecutionHistory executionHistory = new ExecutionHistory(resolver);
56+
executionHistory.clear();
57+
} catch (LoginException e) {
58+
LOG.error("Cannot access repository while clearing history!", e);
59+
}
60+
}
61+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.vml.es.acm.core.event;
2+
3+
public interface EventListener {
4+
5+
void onEvent(Event event);
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.vml.es.acm.core.event;
2+
3+
public interface EventManager {
4+
5+
void triggerEvent(String name);
6+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package dev.vml.es.acm.core.event;
2+
3+
import java.util.Arrays;
4+
import java.util.Optional;
5+
6+
public enum EventType {
7+
EXECUTION_QUEUE_RESET,
8+
HISTORY_CLEAR;
9+
10+
public static Optional<EventType> of(String name) {
11+
return Arrays.stream(values())
12+
.filter(s -> s.name().equalsIgnoreCase(name))
13+
.findFirst();
14+
}
15+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package dev.vml.es.acm.core.event;
2+
3+
import dev.vml.es.acm.core.AcmConstants;
4+
import dev.vml.es.acm.core.repo.RepoResource;
5+
import java.util.Calendar;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import org.apache.commons.lang3.StringUtils;
9+
import org.apache.commons.lang3.builder.ToStringBuilder;
10+
import org.apache.commons.lang3.builder.ToStringStyle;
11+
import org.apache.jackrabbit.JcrConstants;
12+
import org.apache.sling.api.resource.Resource;
13+
import org.apache.sling.api.resource.ResourceResolver;
14+
15+
public class ResourceEvent implements Event {
16+
17+
public static final String ROOT = AcmConstants.VAR_ROOT + "/event";
18+
19+
public static final String TRIGGERED_AT_PROP = "triggeredAt";
20+
21+
private final String name;
22+
23+
private final Calendar triggeredAt;
24+
25+
public ResourceEvent(Resource resource) {
26+
this.name = StringUtils.substringAfter(resource.getPath(), ROOT + "/");
27+
this.triggeredAt = resource.getValueMap().get(TRIGGERED_AT_PROP, Calendar.class);
28+
}
29+
30+
public static ResourceEvent create(String name, ResourceResolver resolver) {
31+
RepoResource result = RepoResource.of(resolver, String.format("%s/%s", ROOT, name));
32+
Map<String, Object> properties = new HashMap<>();
33+
properties.put(JcrConstants.JCR_PRIMARYTYPE, JcrConstants.NT_UNSTRUCTURED);
34+
properties.put(TRIGGERED_AT_PROP, Calendar.getInstance());
35+
result.parent().ensureRegularFolder();
36+
result.save(properties);
37+
return new ResourceEvent(result.require());
38+
}
39+
40+
public String getName() {
41+
return name;
42+
}
43+
44+
public Calendar getTriggeredAt() {
45+
return triggeredAt;
46+
}
47+
48+
public String getPath() {
49+
return ROOT + "/" + name;
50+
}
51+
52+
@Override
53+
public String toString() {
54+
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
55+
.append("name", name)
56+
.append("triggeredAt", triggeredAt)
57+
.toString();
58+
}
59+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package dev.vml.es.acm.core.event;
2+
3+
import com.day.cq.replication.ReplicationActionType;
4+
import com.day.cq.replication.ReplicationException;
5+
import com.day.cq.replication.Replicator;
6+
import dev.vml.es.acm.core.AcmException;
7+
import dev.vml.es.acm.core.util.ResourceUtils;
8+
import java.util.*;
9+
import java.util.concurrent.CopyOnWriteArrayList;
10+
import java.util.stream.Collectors;
11+
import javax.jcr.Session;
12+
import org.apache.commons.collections.CollectionUtils;
13+
import org.apache.sling.api.resource.LoginException;
14+
import org.apache.sling.api.resource.ResourceResolver;
15+
import org.apache.sling.api.resource.ResourceResolverFactory;
16+
import org.apache.sling.api.resource.observation.ResourceChange;
17+
import org.apache.sling.api.resource.observation.ResourceChangeListener;
18+
import org.jetbrains.annotations.NotNull;
19+
import org.osgi.service.component.annotations.Component;
20+
import org.osgi.service.component.annotations.Reference;
21+
import org.osgi.service.component.annotations.ReferenceCardinality;
22+
import org.osgi.service.component.annotations.ReferencePolicy;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
@Component(
27+
service = {EventManager.class, ResourceChangeListener.class},
28+
immediate = true,
29+
property = {
30+
ResourceChangeListener.PATHS + "=glob:" + ResourceEvent.ROOT + "/**/*",
31+
ResourceChangeListener.CHANGES + "=ADDED",
32+
ResourceChangeListener.CHANGES + "=CHANGED",
33+
})
34+
public class ResourceEventManager implements EventManager, ResourceChangeListener {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(ResourceEventManager.class);
37+
38+
@Reference
39+
private Replicator replicator;
40+
41+
@Reference
42+
private ResourceResolverFactory resourceResolverFactory;
43+
44+
@Reference(
45+
cardinality = ReferenceCardinality.MULTIPLE,
46+
policy = ReferencePolicy.DYNAMIC,
47+
service = EventListener.class)
48+
private final Collection<EventListener> listeners = new CopyOnWriteArrayList<>();
49+
50+
@Override
51+
public void triggerEvent(String name) {
52+
try (ResourceResolver resourceResolver = ResourceUtils.contentResolver(resourceResolverFactory, null)) {
53+
LOG.debug("Triggering event: {}", name);
54+
ResourceEvent event = ResourceEvent.create(name, resourceResolver);
55+
replicator.replicate(
56+
resourceResolver.adaptTo(Session.class), ReplicationActionType.ACTIVATE, event.getPath());
57+
LOG.debug("Triggered event: {}", event);
58+
} catch (LoginException e) {
59+
throw new AcmException(String.format("Cannot access repository while triggering event '%s'!", name), e);
60+
} catch (ReplicationException e) {
61+
throw new AcmException(String.format("Cannot replicate event '%s'!", name), e);
62+
}
63+
}
64+
65+
@Override
66+
public void onChange(@NotNull List<ResourceChange> changes) {
67+
if (CollectionUtils.isEmpty(changes) || listeners.isEmpty()) {
68+
return;
69+
}
70+
try (ResourceResolver resourceResolver = ResourceUtils.contentResolver(resourceResolverFactory, null)) {
71+
Collection<Event> events = changes.stream()
72+
.map(c -> resourceResolver.getResource(c.getPath()))
73+
.filter(Objects::nonNull)
74+
.map(ResourceEvent::new)
75+
.collect(Collectors.toList());
76+
77+
events.forEach(event -> {
78+
LOG.debug("Dispatching event to listeners ({}): {}", listeners.size(), event);
79+
listeners.forEach(listener -> {
80+
try {
81+
listener.onEvent(event);
82+
} catch (Exception e) {
83+
LOG.error(
84+
"Event listener '{}' cannot handle event properly!",
85+
listener.getClass().getName(),
86+
e);
87+
}
88+
});
89+
LOG.debug("Dispatched event to listeners ({}): {}", listeners.size(), event);
90+
});
91+
} catch (LoginException e) {
92+
throw new AcmException("Cannot access repository while processing events!", e);
93+
}
94+
}
95+
}

core/src/main/java/dev/vml/es/acm/core/script/AutomaticScriptScheduler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ public class AutomaticScriptScheduler implements ResourceChangeListener {
6666

6767
@AttributeDefinition(
6868
name = "Health Check Retry Count On Boot",
69-
description =
70-
"Maximum number of retries when checking instance health on boot script execution")
69+
description = "Maximum number of retries when checking instance health on boot script execution")
7170
long healthCheckRetryCountBoot() default 90; // * 10 seconds = 15 minutes
7271

7372
@AttributeDefinition(
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package dev.vml.es.acm.core.servlet;
2+
3+
import static dev.vml.es.acm.core.util.ServletResult.*;
4+
import static dev.vml.es.acm.core.util.ServletUtils.respondJson;
5+
6+
import dev.vml.es.acm.core.event.EventDispatcher;
7+
import dev.vml.es.acm.core.event.EventType;
8+
import java.io.IOException;
9+
import javax.servlet.Servlet;
10+
import org.apache.sling.api.SlingHttpServletRequest;
11+
import org.apache.sling.api.SlingHttpServletResponse;
12+
import org.apache.sling.api.servlets.ServletResolverConstants;
13+
import org.apache.sling.api.servlets.SlingAllMethodsServlet;
14+
import org.osgi.service.component.annotations.Component;
15+
import org.osgi.service.component.annotations.Reference;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
@Component(
20+
service = {Servlet.class},
21+
property = {
22+
ServletResolverConstants.SLING_SERVLET_RESOURCE_TYPES + "=" + EventServlet.RT,
23+
ServletResolverConstants.SLING_SERVLET_METHODS + "=POST",
24+
ServletResolverConstants.SLING_SERVLET_EXTENSIONS + "=json",
25+
})
26+
public class EventServlet extends SlingAllMethodsServlet {
27+
28+
public static final String RT = "acm/api/event";
29+
30+
public static final String NAME_PARAM = "name";
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(EventServlet.class);
33+
34+
@Reference
35+
private EventDispatcher dispatcher;
36+
37+
@Override
38+
protected void doPost(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException {
39+
String name = request.getParameter(NAME_PARAM);
40+
EventType event = EventType.of(name).orElse(null);
41+
if (event == null) {
42+
respondJson(response, badRequest(String.format("Event '%s' is not supported!", name)));
43+
return;
44+
}
45+
46+
try {
47+
dispatcher.dispatch(event);
48+
respondJson(response, ok(String.format("Event '%s' dispatched successfully!", name)));
49+
} catch (Exception e) {
50+
LOG.error("Event '{}' cannot be dispatched!", name, e);
51+
respondJson(
52+
response,
53+
badRequest(String.format("Event '%s' cannot be dispatched! %s", name, e.getMessage())
54+
.trim()));
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)