Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions C/services/notification/data_availability_rule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ static const char *default_config = QUOTE({
"default" : "",
"displayName" : "Asset Code",
"order" : "3"
},
"alerts" : {
"description" : "Notify when alerts are raised",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, We should be clear here!

Add alerts to the source of data for notifications
or
Notify for the alerts raised by the Fledge instance

"type" : "boolean",
"default" : "false",
"displayName" : "Alerts",
"order" : "4"
}
});

Expand Down Expand Up @@ -147,6 +154,12 @@ string DataAvailabilityRule::triggers()
ret += "{ \"audit\" : \"" + audit + "\" }";
comma = ",";
}
if (m_alert)
{
ret += comma;
ret += "{ \"alert\" : \"alert\" }";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the hardcoded string value "alert", same as key?

In future, We better use doc.AddMember to avoid fragile and cumbersome string building!

comma = ",";
}
ret += " ] }";
return ret;
}
Expand Down Expand Up @@ -339,5 +352,14 @@ void DataAvailabilityRule::configure(const ConfigCategory &config)
DatapointValue value (m_assetCodeList[i]);
handle->addTrigger(m_assetCodeList[i], new RuleTrigger(m_assetCodeList[i], new Datapoint(m_assetCodeList[i], value)));
}

string alerts = config.getValue("alerts");
m_alert = alerts[0] == 't' ? true : false;
if (m_alert)
{
DatapointValue dpv("alert");
handle->addTrigger("alert", new RuleTrigger("alert", new Datapoint("alert", dpv)));
}


}
1 change: 1 addition & 0 deletions C/services/notification/include/data_availability_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DataAvailabilityRule : public RulePlugin
private:
std::vector<std::string> m_assetCodeList;
std::vector<std::string> m_auditCodeList;
bool m_alert;
};

#endif
7 changes: 7 additions & 0 deletions C/services/notification/include/notification_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using HttpServer = SimpleWeb::Server<SimpleWeb::HTTP>;
#define RECEIVE_AUDIT_NOTIFICATION "^/notification/reading/audit/([A-Za-z][a-zA-Z0-9_%\\-\\.]*)$"
#define RECEIVE_STATS_NOTIFICATION "^/notification/reading/stat/([A-Za-z0-9][a-zA-Z0-9_%\\-\\.]*)$"
#define RECEIVE_STATS_RATE_NOTIFICATION "^/notification/reading/rate/([A-Za-z0-9][a-zA-Z0-9_%\\-\\.]*)$"
#define RECEIVE_ALERT_NOTIFICATION "^/notification/reading/alert$"
#define GET_NOTIFICATION_INSTANCES "^/notification$"
#define GET_NOTIFICATION_DELIVERY "^/notification/delivery$"
#define GET_NOTIFICATION_RULES "^/notification/rules$"
Expand Down Expand Up @@ -81,6 +82,8 @@ class NotificationApi
shared_ptr<HttpServer::Request> request);
void processStatsRateCallback(shared_ptr<HttpServer::Response> response,
shared_ptr<HttpServer::Request> request);
void processAlertCallback(shared_ptr<HttpServer::Response> response,
shared_ptr<HttpServer::Request> request);
void getNotificationObject(NOTIFICATION_OBJECT object,
shared_ptr<HttpServer::Response> response,
shared_ptr<HttpServer::Request> request);
Expand All @@ -97,6 +100,8 @@ class NotificationApi
getStatsCallbackURL() const { return m_statsCallbackURL; };
const std::string&
getStatsRateCallbackURL() const { return m_statsRateCallbackURL; };
const std::string&
getAlertCallbackURL() const { return m_alertCallbackURL; };
void setCallBackURL();
bool removeNotification(const std::string& notificationName);
// Add asset name and data to the Readings process queue
Expand All @@ -108,6 +113,7 @@ class NotificationApi
const string& payload);
bool queueStatsRateNotification(const string& auditCode,
const string& payload);
bool queueAlertNotification(const string& payload);

void defaultResource(shared_ptr<HttpServer::Response> response,
shared_ptr<HttpServer::Request> request);
Expand All @@ -133,6 +139,7 @@ class NotificationApi
std::string m_auditCallbackURL;
std::string m_statsCallbackURL;
std::string m_statsRateCallbackURL;
std::string m_alertCallbackURL;
Logger* m_logger;
};

Expand Down
17 changes: 17 additions & 0 deletions C/services/notification/include/notification_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ class StatsRateSubscriptionElement : public SubscriptionElement
std::string m_stat;
};

/**
* The SubscriptionElement class handles the notification registration to
* storage server based on alerts and its notification name.
*/
class AlertSubscriptionElement : public SubscriptionElement
{
public:
AlertSubscriptionElement(const std::string& notificationName,
NotificationInstance* notification);

~AlertSubscriptionElement();

bool registerSubscription(StorageClient& storage) const;
bool unregister(StorageClient& storage) const;
string getKey() const { return string("alert::alert"); };
};

/**
* The NotificationSubscription class handles all notification registrations to
* storage server.
Expand Down
101 changes: 101 additions & 0 deletions C/services/notification/notification_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ void notificationStatsReceiveWrapper(shared_ptr<HttpServer::Response> response,
api->processStatsCallback(response, request);
}

/**
* Wrapper function for the notification POST callback API call used for alert events.
*
* POST /notification/reading/alert
*
* @param response The response stream to send the response on
* @param request The HTTP request
*/
void notificationAlertReceiveWrapper(shared_ptr<HttpServer::Response> response,
shared_ptr<HttpServer::Request> request)
{
Logger::getLogger()->debug("Alert callback received");
NotificationApi* api = NotificationApi::getInstance();
api->processAlertCallback(response, request);
}

/**
* Wrapper function for the notification POST callback API call used for audit events.
*
Expand Down Expand Up @@ -327,6 +343,7 @@ void NotificationApi::initResources()
m_server->resource[RECEIVE_AUDIT_NOTIFICATION]["POST"] = notificationAuditReceiveWrapper;
m_server->resource[RECEIVE_STATS_NOTIFICATION]["POST"] = notificationStatsReceiveWrapper;
m_server->resource[RECEIVE_STATS_RATE_NOTIFICATION]["POST"] = notificationStatsRateReceiveWrapper;
m_server->resource[RECEIVE_ALERT_NOTIFICATION]["POST"] = notificationAlertReceiveWrapper;
m_server->resource[GET_NOTIFICATION_INSTANCES]["GET"] = notificationGetInstances;
m_server->resource[GET_NOTIFICATION_RULES]["GET"] = notificationGetRules;
m_server->resource[GET_NOTIFICATION_DELIVERY]["GET"] = notificationGetDelivery;
Expand Down Expand Up @@ -567,6 +584,47 @@ void NotificationApi::processStatsRateCallback(shared_ptr<HttpServer::Response>
}
}

/**
* Add data provided in the alert payload of callback API call
* into the notification queue.
*
* This is called by the storage service when new data arrives
* for an asset in which we have registered an interest.
*
* @param response The response stream to send the response on
* @param request The HTTP request
*/
void NotificationApi::processAlertCallback(shared_ptr<HttpServer::Response> response,
shared_ptr<HttpServer::Request> request)
{
try
{
// URL decode statistic
string payload = request->content.string();
string responsePayload;
// Add data to the queue
if (queueAlertNotification(payload))
{
responsePayload = "{ \"response\" : \"processed\", \"";
responsePayload += "alert";
responsePayload += "\" : \"data queued\" }";

this->respond(response, responsePayload);
}
else
{
responsePayload = "{ \"error\": \"error_message\" }";
this->respond(response,
SimpleWeb::StatusCode::client_error_bad_request,
responsePayload);
}
}
catch (exception ex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching exception by value? consider catching by reference with is safer option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category.

{
this->internalError(response, ex);
}
}

/**
* Add readings data of asset name into the process queue
*
Expand Down Expand Up @@ -739,6 +797,48 @@ bool NotificationApi::queueStatsRateNotification(const string& statistic,
return queue->addElement(item);
}

/**
* Add alert data of asset name into the process queue
*
* @param payload The data for the audit code
* @return false error, true on success
*/
bool NotificationApi::queueAlertNotification(const string& payload)
{
Logger::getLogger()->debug("Recieved alert notification: %s", payload.c_str());

Reading *reading = new Reading("alert", payload);
vector<Reading *> readingVec;
readingVec.push_back(reading);
ReadingSet* readings = NULL;
try
{
readings = new ReadingSet(&readingVec);
}
catch (exception* ex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor of ReadingSet thows exception of type ReadingSetException. Please catch by ReadingSetException* to align with ReadingSet class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category.

{
m_logger->error("Exception '" + string(ex->what()) + \
"' while parsing readings for alaert" + \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alert typo

" with payload " + payload);
delete ex;
return false;
}
catch (...)
{
std::exception_ptr p = std::current_exception();
string name = (p ? p.__cxa_exception_type()->name() : "null");
m_logger->error("Exception '" + name + \
"' while parsing readings for alert");
return false;
}

NotificationQueue* queue = NotificationQueue::getInstance();
NotificationQueueElement* item = new NotificationQueueElement("alert", "alert", readings);

// Add element to the queue
return queue->addElement(item);
}

/**
* Return JSON string of a notification object
*
Expand Down Expand Up @@ -870,6 +970,7 @@ void NotificationApi::setCallBackURL()
m_auditCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/audit/";
m_statsCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/stat/";
m_statsRateCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/rate/";
m_alertCallbackURL = "http://127.0.0.1:" + to_string(apiPort) + "/notification/reading/alert";
}

/**
Expand Down
77 changes: 75 additions & 2 deletions C/services/notification/notification_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,54 @@ bool StatsRateSubscriptionElement::unregister(StorageClient& storage) const
return storage.unregisterTableNotification("statistics_history", "key", keyValues, "insert", callBackURL + urlEncode(m_stat));
}

/**
* Constructor for alert subscription elements
*/
AlertSubscriptionElement::AlertSubscriptionElement(const std::string& notificationName,
NotificationInstance* notification) :
SubscriptionElement(notificationName, notification)
{
}

/**
* AlertSubscriptionElement class destructor
*/
AlertSubscriptionElement::~AlertSubscriptionElement()
{
}

/**
* Register the subscription with the storage engine
*
* @param storage The storage engine client
* @return bool True if unregistered
*/
bool AlertSubscriptionElement::registerSubscription(StorageClient& storage) const
{
NotificationApi *api = NotificationApi::getInstance();
string callBackURL = api->getAlertCallbackURL();
vector<std::string> keyValues;
Logger::getLogger()->fatal("Adding alert subscription for %s", callBackURL.c_str());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fatal?

if (!storage.registerTableNotification("alerts", "", keyValues, "insert", callBackURL))
Logger::getLogger()->error("Failed to register insert handler for alert subscription");
return storage.registerTableNotification("alerts", "", keyValues, "update", callBackURL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding a debug message to log if registerSubscription was successful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category.

}

/**
* Unregister the subscription with the storage engine
*
* @param storage The storage engine client
* @return bool True if unregistered
*/
bool AlertSubscriptionElement::unregister(StorageClient& storage) const
{
NotificationApi *api = NotificationApi::getInstance();
string callBackURL = api->getStatsRateCallbackURL();
vector<std::string> keyValues;
storage.unregisterTableNotification("alerts", "", keyValues, "update", callBackURL);
return storage.unregisterTableNotification("alerts", "", keyValues, "insert", callBackURL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding a debug message to log if unregister was successful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Jira is to do a specific task, not to make general changes, Therefore making unrelated changes is not part of this work. If a bug was spotted then it should be fixed, tis does not fall into that category.

}

/**
* Constructor for the NotificationSubscription class
*/
Expand Down Expand Up @@ -359,15 +407,14 @@ bool NotificationSubscription::addSubscription(SubscriptionElement *element)

string key = element->getKey();
m_subscriptions[key].push_back(element);
if (m_subscriptions[key].size() == 1)
if (m_subscriptions[key].size() <= 1)
{
if (element->registerSubscription(m_storage))
m_logger->info("Register for %s notification from the storage layer", key.c_str());
else
m_logger->error("Failed to register for %s notification from the storage layer", key.c_str());
}


m_logger->info("Subscription for '" + key + \
"' has # " + to_string(m_subscriptions[key].size()) + " rules");

Expand Down Expand Up @@ -615,6 +662,32 @@ bool NotificationSubscription::createSubscription(NotificationInstance* instance
lock_guard<mutex> guard(m_subscriptionMutex);
ret = this->addSubscription(subscription);

}
else if (itr->HasMember("alert"))
{
// Get optional evaluation type and time period for asset:
// (All :30, Minimum: 10, Maximum: 10, Average: 10)
// If time based rule is set then
// set EvaluationType::Interval for data buffer operation
EvaluationType type = theRule->isTimeBased() ?
EvaluationType(EvaluationType::Interval, timeBasedInterval) :
this->getEvalType(*itr);

// Create NotificationDetail object
NotificationDetail alertInfo("alert",
"alert",
ruleName,
type);

// Add assetInfo to its rule
theRule->addAsset(alertInfo);

AlertSubscriptionElement *subscription = new AlertSubscriptionElement(
instance->getName(),
instance);
lock_guard<mutex> guard(m_subscriptionMutex);
ret = this->addSubscription(subscription);

}
else
{
Expand Down