-
Notifications
You must be signed in to change notification settings - Fork 0
FOGL-9963 Add Alerts as a source for notifications #104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d42fb17
986a170
92cf16f
2f039fb
3ffa5a6
56dadba
75f0ace
f1b6e05
f984d05
70306ac
a1b5e15
833f636
7889c20
c2b192d
e95d88c
ac4e8a9
0a495ba
682ff5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,13 @@ static const char *default_config = QUOTE({ | |
"default" : "", | ||
"displayName" : "Asset Code", | ||
"order" : "3" | ||
}, | ||
"alerts" : { | ||
"description" : "Notify when alerts are raised", | ||
"type" : "boolean", | ||
"default" : "false", | ||
"displayName" : "Alerts", | ||
"order" : "4" | ||
} | ||
}); | ||
|
||
|
@@ -147,6 +154,12 @@ string DataAvailabilityRule::triggers() | |
ret += "{ \"audit\" : \"" + audit + "\" }"; | ||
comma = ","; | ||
} | ||
if (m_alert) | ||
{ | ||
ret += comma; | ||
ret += "{ \"alert\" : \"alert\" }"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the hardcoded string value "alert", same as key? In future, We better use |
||
comma = ","; | ||
} | ||
ret += " ] }"; | ||
return ret; | ||
} | ||
|
@@ -339,5 +352,14 @@ void DataAvailabilityRule::configure(const ConfigCategory &config) | |
DatapointValue value (m_assetCodeList[i]); | ||
handle->addTrigger(m_assetCodeList[i], new RuleTrigger(m_assetCodeList[i], new Datapoint(m_assetCodeList[i], value))); | ||
} | ||
|
||
string alerts = config.getValue("alerts"); | ||
m_alert = alerts[0] == 't' ? true : false; | ||
if (m_alert) | ||
{ | ||
DatapointValue dpv("alert"); | ||
handle->addTrigger("alert", new RuleTrigger("alert", new Datapoint("alert", dpv))); | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,22 @@ void notificationStatsReceiveWrapper(shared_ptr<HttpServer::Response> response, | |
api->processStatsCallback(response, request); | ||
} | ||
|
||
/** | ||
* Wrapper function for the notification POST callback API call used for alert events. | ||
* | ||
* POST /notification/reading/alert | ||
* | ||
* @param response The response stream to send the response on | ||
* @param request The HTTP request | ||
*/ | ||
void notificationAlertReceiveWrapper(shared_ptr<HttpServer::Response> response, | ||
shared_ptr<HttpServer::Request> request) | ||
{ | ||
Logger::getLogger()->debug("Alert callback received"); | ||
NotificationApi* api = NotificationApi::getInstance(); | ||
api->processAlertCallback(response, request); | ||
} | ||
|
||
/** | ||
* Wrapper function for the notification POST callback API call used for audit events. | ||
* | ||
|
@@ -327,6 +343,7 @@ void NotificationApi::initResources() | |
m_server->resource[RECEIVE_AUDIT_NOTIFICATION]["POST"] = notificationAuditReceiveWrapper; | ||
m_server->resource[RECEIVE_STATS_NOTIFICATION]["POST"] = notificationStatsReceiveWrapper; | ||
m_server->resource[RECEIVE_STATS_RATE_NOTIFICATION]["POST"] = notificationStatsRateReceiveWrapper; | ||
m_server->resource[RECEIVE_ALERT_NOTIFICATION]["POST"] = notificationAlertReceiveWrapper; | ||
m_server->resource[GET_NOTIFICATION_INSTANCES]["GET"] = notificationGetInstances; | ||
m_server->resource[GET_NOTIFICATION_RULES]["GET"] = notificationGetRules; | ||
m_server->resource[GET_NOTIFICATION_DELIVERY]["GET"] = notificationGetDelivery; | ||
|
@@ -567,6 +584,47 @@ void NotificationApi::processStatsRateCallback(shared_ptr<HttpServer::Response> | |
} | ||
} | ||
|
||
/** | ||
* Add data provided in the alert payload of callback API call | ||
* into the notification queue. | ||
* | ||
* This is called by the storage service when new data arrives | ||
* for an asset in which we have registered an interest. | ||
* | ||
* @param response The response stream to send the response on | ||
* @param request The HTTP request | ||
*/ | ||
void NotificationApi::processAlertCallback(shared_ptr<HttpServer::Response> response, | ||
shared_ptr<HttpServer::Request> request) | ||
{ | ||
try | ||
{ | ||
// URL decode statistic | ||
string payload = request->content.string(); | ||
string responsePayload; | ||
// Add data to the queue | ||
if (queueAlertNotification(payload)) | ||
{ | ||
responsePayload = "{ \"response\" : \"processed\", \""; | ||
responsePayload += "alert"; | ||
responsePayload += "\" : \"data queued\" }"; | ||
|
||
this->respond(response, responsePayload); | ||
} | ||
else | ||
{ | ||
responsePayload = "{ \"error\": \"error_message\" }"; | ||
this->respond(response, | ||
SimpleWeb::StatusCode::client_error_bad_request, | ||
responsePayload); | ||
} | ||
} | ||
catch (exception ex) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Catching exception by value? consider catching by reference with is safer option. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category. |
||
{ | ||
this->internalError(response, ex); | ||
} | ||
} | ||
|
||
/** | ||
* Add readings data of asset name into the process queue | ||
* | ||
|
@@ -739,6 +797,48 @@ bool NotificationApi::queueStatsRateNotification(const string& statistic, | |
return queue->addElement(item); | ||
} | ||
|
||
/** | ||
* Add alert data of asset name into the process queue | ||
* | ||
* @param payload The data for the audit code | ||
* @return false error, true on success | ||
*/ | ||
bool NotificationApi::queueAlertNotification(const string& payload) | ||
{ | ||
Logger::getLogger()->debug("Recieved alert notification: %s", payload.c_str()); | ||
|
||
Reading *reading = new Reading("alert", payload); | ||
vector<Reading *> readingVec; | ||
readingVec.push_back(reading); | ||
ReadingSet* readings = NULL; | ||
try | ||
{ | ||
readings = new ReadingSet(&readingVec); | ||
} | ||
catch (exception* ex) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Constructor of ReadingSet thows exception of type ReadingSetException. Please catch by ReadingSetException* to align with ReadingSet class There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category. |
||
{ | ||
m_logger->error("Exception '" + string(ex->what()) + \ | ||
"' while parsing readings for alaert" + \ | ||
|
||
" with payload " + payload); | ||
delete ex; | ||
return false; | ||
} | ||
catch (...) | ||
{ | ||
std::exception_ptr p = std::current_exception(); | ||
string name = (p ? p.__cxa_exception_type()->name() : "null"); | ||
m_logger->error("Exception '" + name + \ | ||
"' while parsing readings for alert"); | ||
return false; | ||
} | ||
|
||
NotificationQueue* queue = NotificationQueue::getInstance(); | ||
NotificationQueueElement* item = new NotificationQueueElement("alert", "alert", readings); | ||
|
||
// Add element to the queue | ||
return queue->addElement(item); | ||
} | ||
|
||
/** | ||
* Return JSON string of a notification object | ||
* | ||
|
@@ -870,6 +970,7 @@ void NotificationApi::setCallBackURL() | |
m_auditCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/audit/"; | ||
m_statsCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/stat/"; | ||
m_statsRateCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/rate/"; | ||
m_alertCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/alert"; | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -238,6 +238,54 @@ bool StatsRateSubscriptionElement::unregister(StorageClient& storage) const | |
return storage.unregisterTableNotification("statistics_history", "key", keyValues, "insert", callBackURL + urlEncode(m_stat)); | ||
} | ||
|
||
/** | ||
* Constructor for alert subscription elements | ||
*/ | ||
AlertSubscriptionElement::AlertSubscriptionElement(const std::string& notificationName, | ||
NotificationInstance* notification) : | ||
SubscriptionElement(notificationName, notification) | ||
{ | ||
} | ||
|
||
/** | ||
* AlertSubscriptionElement class destructor | ||
*/ | ||
AlertSubscriptionElement::~AlertSubscriptionElement() | ||
{ | ||
} | ||
|
||
/** | ||
* Register the subscription with the storage engine | ||
* | ||
* @param storage The storage engine client | ||
* @return bool True if unregistered | ||
*/ | ||
bool AlertSubscriptionElement::registerSubscription(StorageClient& storage) const | ||
{ | ||
NotificationApi *api = NotificationApi::getInstance(); | ||
string callBackURL = api->getAlertCallbackURL(); | ||
vector<std::string> keyValues; | ||
Logger::getLogger()->fatal("Adding alert subscription for %s", callBackURL.c_str()); | ||
|
||
if (!storage.registerTableNotification("alerts", "", keyValues, "insert", callBackURL)) | ||
Logger::getLogger()->error("Failed to register insert handler for alert subscription"); | ||
return storage.registerTableNotification("alerts", "", keyValues, "update", callBackURL); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider adding a debug message to log if registerSubscription was successful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category. |
||
} | ||
|
||
/** | ||
* Unregister the subscription with the storage engine | ||
* | ||
* @param storage The storage engine client | ||
* @return bool True if unregistered | ||
*/ | ||
bool AlertSubscriptionElement::unregister(StorageClient& storage) const | ||
{ | ||
NotificationApi *api = NotificationApi::getInstance(); | ||
string callBackURL = api->getStatsRateCallbackURL(); | ||
vector<std::string> keyValues; | ||
storage.unregisterTableNotification("alerts", "", keyValues, "update", callBackURL); | ||
return storage.unregisterTableNotification("alerts", "", keyValues, "insert", callBackURL); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider adding a debug message to log if unregister was successful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category. |
||
} | ||
|
||
/** | ||
* Constructor for the NotificationSubscription class | ||
*/ | ||
|
@@ -359,13 +407,17 @@ bool NotificationSubscription::addSubscription(SubscriptionElement *element) | |
|
||
string key = element->getKey(); | ||
m_subscriptions[key].push_back(element); | ||
if (m_subscriptions[key].size() == 1) | ||
if (m_subscriptions[key].size() <= 1) | ||
{ | ||
if (element->registerSubscription(m_storage)) | ||
m_logger->info("Register for %s notification from the storage layer", key.c_str()); | ||
else | ||
m_logger->error("Failed to register for %s notification from the storage layer", key.c_str()); | ||
} | ||
else | ||
|
||
{ | ||
m_logger->error("Subscription not added, too few keys"); | ||
} | ||
|
||
|
||
m_logger->info("Subscription for '" + key + \ | ||
|
@@ -615,6 +667,32 @@ bool NotificationSubscription::createSubscription(NotificationInstance* instance | |
lock_guard<mutex> guard(m_subscriptionMutex); | ||
ret = this->addSubscription(subscription); | ||
|
||
} | ||
else if (itr->HasMember("alert")) | ||
{ | ||
// Get optional evaluation type and time period for asset: | ||
// (All :30, Minimum: 10, Maximum: 10, Average: 10) | ||
// If time based rule is set then | ||
// set EvaluationType::Interval for data buffer operation | ||
EvaluationType type = theRule->isTimeBased() ? | ||
EvaluationType(EvaluationType::Interval, timeBasedInterval) : | ||
this->getEvalType(*itr); | ||
|
||
// Create NotificationDetail object | ||
NotificationDetail alertInfo("alert", | ||
"alert", | ||
ruleName, | ||
type); | ||
|
||
// Add assetInfo to its rule | ||
theRule->addAsset(alertInfo); | ||
|
||
AlertSubscriptionElement *subscription = new AlertSubscriptionElement( | ||
instance->getName(), | ||
instance); | ||
lock_guard<mutex> guard(m_subscriptionMutex); | ||
ret = this->addSubscription(subscription); | ||
|
||
} | ||
else | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, We should be clear here!
Add alerts to the source of data for notifications
or
Notify for the alerts raised by the Fledge instance