Skip to content
This repository was archived by the owner on Jan 8, 2019. It is now read-only.

Commit 2d54fe7

Browse files
committed
Adding type of stream processing disblaed notification
This notification is shown, when a stream has been disabled due to repeated processing timeouts during stream rule matching. This allows the user to check the configured stream rules, correct them and re- enable the stream.
1 parent 93f4fb8 commit 2d54fe7

File tree

5 files changed

+68
-10
lines changed

5 files changed

+68
-10
lines changed

app/controllers/api/SystemApiController.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import com.google.common.collect.Lists;
2222
import com.google.common.collect.Maps;
2323
import com.google.gson.Gson;
24+
import com.google.gson.GsonBuilder;
2425
import com.google.inject.Inject;
2526
import controllers.AuthenticatedController;
2627
import lib.APIException;
2728
import lib.metrics.Meter;
2829
import models.*;
2930
import play.libs.F;
31+
import play.libs.Json;
3032
import play.mvc.Http;
3133
import play.mvc.Result;
3234

@@ -84,6 +86,7 @@ public Result notifications() {
8486
result.put("count", clusterService.allNotifications().size());*/
8587
List<Notification> notifications = clusterService.allNotifications();
8688

89+
8790
return ok(new Gson().toJson(notifications)).as("application/json");
8891
} catch (IOException e) {
8992
return internalServerError("io exception");

app/lib/notifications/InputFailedToStartNotification.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ public class InputFailedToStartNotification implements NotificationType {
1616
private final String DESCRIPTION;
1717

1818
public InputFailedToStartNotification(Notification notification) {
19-
DESCRIPTION = "Input " + (String)notification.getDetail("input_id") + " has failed to start on node " +
19+
DESCRIPTION = "Input " + notification.getDetail("input_id") + " has failed to start on node " +
2020
notification.getNodeId() + " for this reason: \"" +
21-
(String)notification.getDetail("reason") + "\". " +
21+
notification.getDetail("reason") + "\". " +
2222
"This means that you are unable to receive any messages from this input. This is mostly " +
2323
"an indication for a misconfiguration or an error. " + "" +
2424
"You can click <a href='" + routes.InputsController.index() + "'>here</a> to solve this";
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package lib.notifications;
2+
3+
import com.google.common.collect.Maps;
4+
import models.Stream;
5+
import models.SystemJob;
6+
7+
import java.util.Map;
8+
9+
/**
10+
* @author Dennis Oelkers <[email protected]>
11+
*/
12+
public class StreamProcessingDisabled implements NotificationType {
13+
private final String streamTitle;
14+
private final long faultCount;
15+
16+
public StreamProcessingDisabled(String streamTitle, long faultCount) {
17+
this.streamTitle = streamTitle;
18+
this.faultCount = faultCount;
19+
}
20+
21+
@Override
22+
public Map<SystemJob.Type, String> options() {
23+
return Maps.newHashMap();
24+
}
25+
26+
@Override
27+
public String getTitle() {
28+
return "Processing of a stream has been disabled due to excessive processing time.";
29+
}
30+
31+
@Override
32+
public String getDescription() {
33+
return "The processing of stream <em>" + streamTitle
34+
+ "</em> has taken too long for " + faultCount + " times. "
35+
+ "To protect the stability of message processing, this stream has been disabled. "
36+
+ "Please correct the stream rules and reenable the stream.";
37+
38+
}
39+
40+
@Override
41+
public boolean isCloseable() {
42+
return true;
43+
}
44+
}

app/models/ClusterService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,19 @@ public class ClusterService {
5050
private final SystemJob.Factory systemJobFactory;
5151
private final ServerNodes serverNodes;
5252
private final NodeService nodeService;
53+
private final StreamService streamService;
5354

5455
@Inject
5556
private ClusterService(ApiClient api,
5657
SystemJob.Factory systemJobFactory,
5758
ServerNodes serverNodes,
58-
NodeService nodeService) {
59+
NodeService nodeService,
60+
StreamService streamService) {
5961
this.api = api;
6062
this.systemJobFactory = systemJobFactory;
6163
this.serverNodes = serverNodes;
6264
this.nodeService = nodeService;
65+
this.streamService = streamService;
6366
}
6467

6568
public void triggerSystemJob(SystemJob.Type type, User user) throws IOException, APIException {
@@ -76,7 +79,7 @@ public List<Notification> allNotifications() throws IOException, APIException {
7679
List<Notification> notifications = Lists.newArrayList();
7780
for (NotificationSummaryResponse notification : r.notifications) {
7881
try {
79-
notifications.add(new Notification(notification));
82+
notifications.add(new Notification(notification, streamService, nodeService));
8083
} catch(IllegalArgumentException e) {
8184
play.Logger.warn("There is a notification type we can't handle: [" + notification.type + "]");
8285
continue;

app/models/Notification.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@
1818
*/
1919
package models;
2020

21-
import com.google.inject.Inject;
21+
import lib.APIException;
2222
import lib.notifications.*;
2323
import models.api.responses.system.NotificationSummaryResponse;
2424
import org.joda.time.DateTime;
2525

26+
import java.io.IOException;
2627
import java.util.Map;
2728

2829
/**
2930
* @author Lennart Koopmann <[email protected]>
3031
*/
3132
public class Notification {
3233

33-
@Inject
34-
private NodeService nodeService;
34+
private transient final NodeService nodeService;
35+
private transient final StreamService streamService;
3536

3637
public enum Type {
3738
DEFLECTOR_EXISTS_AS_INDEX,
@@ -41,7 +42,8 @@ public enum Type {
4142
NO_INPUT_RUNNING,
4243
INPUT_FAILED_TO_START,
4344
CHECK_SERVER_CLOCKS,
44-
OUTDATED_VERSION;
45+
OUTDATED_VERSION,
46+
STREAM_PROCESSING_DISABLED;
4547

4648
public static Type fromString(String name) {
4749
return valueOf(name.toUpperCase());
@@ -58,15 +60,17 @@ public enum Severity {
5860
private final String node_id;
5961
private final Map<String, Object> details;
6062

61-
public Notification(NotificationSummaryResponse x) {
63+
public Notification(NotificationSummaryResponse x, StreamService streamService, NodeService nodeService) {
64+
this.streamService = streamService;
65+
this.nodeService = nodeService;
6266
this.type = Type.valueOf(x.type.toUpperCase());
6367
this.timestamp = DateTime.parse(x.timestamp);
6468
this.severity = Severity.valueOf(x.severity.toUpperCase());
6569
this.node_id = x.node_id;
6670
this.details = x.details;
6771
}
6872

69-
public NotificationType get() {
73+
public NotificationType get() throws IOException, APIException {
7074
switch (type) {
7175
case DEFLECTOR_EXISTS_AS_INDEX:
7276
return new DeflectorExistsAsIndexNotification();
@@ -84,6 +88,10 @@ public NotificationType get() {
8488
return new CheckServerClocksNotification();
8589
case OUTDATED_VERSION:
8690
return new OutdatedVersionNotification(this);
91+
case STREAM_PROCESSING_DISABLED:
92+
Stream stream = streamService.get(details.get("stream_id").toString());
93+
long faultCount = Math.round((double)details.get("fault_count"));
94+
return new StreamProcessingDisabled(stream.getTitle(), faultCount);
8795
}
8896

8997
throw new RuntimeException("No notification registered for " + type);

0 commit comments

Comments
 (0)