diff --git a/C/services/common/include/notification_api.h b/C/services/common/include/notification_api.h index 508fa5f..90c8e21 100644 --- a/C/services/common/include/notification_api.h +++ b/C/services/common/include/notification_api.h @@ -65,6 +65,7 @@ class NotificationApi void start(); void startServer(); void wait(); + bool serverDown(); void stop(); void stopServer(); unsigned short getListenerPort(); diff --git a/C/services/common/include/notification_manager.h b/C/services/common/include/notification_manager.h old mode 100644 new mode 100755 index 9bcfcde..4cb1948 --- a/C/services/common/include/notification_manager.h +++ b/C/services/common/include/notification_manager.h @@ -50,6 +50,11 @@ class EvaluationType EVAL_TYPE getType() const { return m_type; }; time_t getInterval() const { return m_interval; }; + inline bool operator==(const EvaluationType& x) const + { + return ( x.m_type==m_type && + x.m_interval==m_interval ); + } private: EVAL_TYPE m_type; @@ -75,6 +80,13 @@ class NotificationDetail getType() const { return m_value.getType(); }; const time_t getInterval() const { return m_value.getInterval(); }; + inline bool operator==(const NotificationDetail& x) const + { + return ( x.m_asset==m_asset && + x.m_rule==m_rule && + x.m_value==m_value ); + } + private: std::string m_asset; std::string m_rule; @@ -130,7 +142,18 @@ class NotificationRule : public NotificationElement // Add an asset name void addAsset(NotificationDetail& info) { + for (const auto & a : m_assets) + { + if (a==info) + { + Logger::getLogger()->info("NotificationRule::addAsset(): Asset %s already present in rule %s, not adding again, m_assets.size()=%d", + info.getAssetName().c_str(), info.getRuleName().c_str(), m_assets.size()); + return; + } + } m_assets.push_back(info); + Logger::getLogger()->info("NotificationRule::addAsset(): Added asset %s to rule %s, m_assets.size()=%d", + info.getAssetName().c_str(), info.getRuleName().c_str(), m_assets.size()); }; std::string toJSON(); bool isTimeBased() { return m_timeBased != 0; }; diff --git a/C/services/common/include/notification_service.h b/C/services/common/include/notification_service.h old mode 100644 new mode 100755 index 25e2638..6ed4ab6 --- a/C/services/common/include/notification_service.h +++ b/C/services/common/include/notification_service.h @@ -35,7 +35,8 @@ class NotificationService : public ServiceAuthHandler const std::string& getName() { return m_name; }; bool start(std::string& coreAddress, unsigned short corePort); - void stop(); + bool connectToStorage(std::map>* m); + void stop(bool unregisterSubscriptions=true); void shutdown(); void restart(); bool isRunning() { return !m_shutdown; }; @@ -58,6 +59,9 @@ class NotificationService : public ServiceAuthHandler void setDryRun() { m_dryRun = true; }; bool sendToDispatcher(const string& path, const string& payload); + bool getStorageServiceRestartPendingFlag() { return m_storageServiceRestartPending; } + bool setStorageServiceRestartPendingFlag() { m_storageServiceRestartPending = true; } + bool resetStorageServiceRestartPendingFlag() { m_storageServiceRestartPending = false; } private: Logger* m_logger; @@ -74,5 +78,6 @@ class NotificationService : public ServiceAuthHandler const std::string m_token; bool m_dryRun; bool m_restartRequest; + bool m_storageServiceRestartPending; }; #endif diff --git a/C/services/common/include/notification_subscription.h b/C/services/common/include/notification_subscription.h old mode 100644 new mode 100755 index b1cb050..e4d4521 --- a/C/services/common/include/notification_subscription.h +++ b/C/services/common/include/notification_subscription.h @@ -84,7 +84,8 @@ class NotificationSubscription void lockSubscriptions() { m_subscriptionMutex.lock(); }; void unlockSubscriptions() { m_subscriptionMutex.unlock(); }; void removeSubscription(const string& assetName, - const string& ruleName); + const string& ruleName, bool storageServiceRestartPending=false); + void removeAllSubscriptions(bool storageServiceRestartPending); private: EvaluationType getEvalType(const Value& value); diff --git a/C/services/common/notification_api.cpp b/C/services/common/notification_api.cpp index bd3fabc..c6eb750 100644 --- a/C/services/common/notification_api.cpp +++ b/C/services/common/notification_api.cpp @@ -244,6 +244,7 @@ void NotificationApi::start() { * Start method for HTTP server */ void NotificationApi::startServer() { + Logger::getLogger()->info("NotificationApi::startServer()"); m_server->start(); } @@ -251,6 +252,7 @@ void NotificationApi::startServer() { * Stop method for HTTP server */ void NotificationApi::stopServer() { + Logger::getLogger()->info("NotificationApi::stopServer()"); m_server->stop(); } @@ -269,6 +271,14 @@ void NotificationApi::wait() { m_thread->join(); } +/** + * Check whether HTTP server has shutdown + */ +bool NotificationApi::serverDown() { + return m_thread->joinable(); +} + + /** * Initialise the API entry points for the common data resource and * the readings resource. @@ -359,6 +369,7 @@ void NotificationApi::respond(shared_ptr response, void NotificationApi::processCallback(shared_ptr response, shared_ptr request) { + PRINT_FUNC; try { // URL decode assetName diff --git a/C/services/common/notification_manager.cpp b/C/services/common/notification_manager.cpp old mode 100644 new mode 100755 index c6ae409..ed90eef --- a/C/services/common/notification_manager.cpp +++ b/C/services/common/notification_manager.cpp @@ -322,6 +322,7 @@ NotificationManager::NotificationManager(const std::string& serviceName, */ NotificationManager::~NotificationManager() { + PRINT_FUNC; lock_guard guard(m_instancesMutex); // Mark is instance as zombie for (auto it = m_instances.begin(); diff --git a/C/services/common/notification_queue.cpp b/C/services/common/notification_queue.cpp old mode 100644 new mode 100755 index dab45cf..a0cde1d --- a/C/services/common/notification_queue.cpp +++ b/C/services/common/notification_queue.cpp @@ -244,6 +244,7 @@ bool NotificationQueue::addElement(NotificationQueueElement* element) { if (!m_running) { + PRINT_FUNC; // Don't add new elements if queue is being stopped delete element; return true; @@ -251,6 +252,7 @@ bool NotificationQueue::addElement(NotificationQueueElement* element) lock_guard loadLock(m_qMutex); + PRINT_FUNC; m_queue.push(element); #ifdef QUEUE_DEBUG_DATA @@ -439,6 +441,9 @@ bool NotificationQueue::feedDataBuffer(const std::string& ruleName, const std::string& assetName, ReadingSet* assetData) { + Logger::getLogger()->info("%s:%d: assetName=%s, assetData->getCount()=%d", + __FUNCTION__, __LINE__, assetName.c_str(), assetData->getCount()); + vector readings = assetData->getAllReadings(); vector newReadings; // Create a ReadingSet deep copy @@ -577,6 +582,7 @@ bool NotificationQueue::processDataBuffer(map& results, if (readingsData.size() == 0) { + PRINT_FUNC; return false; } @@ -595,6 +601,7 @@ bool NotificationQueue::processDataBuffer(map& results, } #endif + PRINT_FUNC; // Process all reading data in the buffer return this->processAllReadings(info, readingsData, results); } @@ -608,7 +615,7 @@ bool NotificationQueue::processDataBuffer(map& results, void NotificationQueue::evalRule(map& results, NotificationRule* rule) { - + PRINT_FUNC; // Output data string for MIN/MAX/AVG/ALL DATA map JSONOutput; // Points in time data for all SingleItem assets data @@ -648,6 +655,8 @@ void NotificationQueue::evalRule(map& results, } } + PRINT_FUNC; + // No SingleItem evaluations found if (!singleItem.size()) { @@ -655,11 +664,13 @@ void NotificationQueue::evalRule(map& results, addReadyData(JSONOutput, evalJSON); evalJSON += " }"; + PRINT_FUNC; // Call plugin_eval, plugin_reason and plugin_deliver deliverNotifications(rule, evalJSON); } else { + PRINT_FUNC; // Deliver SingleItem data + ready data deliverData(rule, singleItem, JSONOutput); } @@ -684,7 +695,7 @@ void NotificationQueue::evalRule(map& results, /** * Process all data buffers for a given assetName * - * The assetName might belong to differen rules: + * The assetName might belong to different rules: * * (1) Get all rules for the given asset name * (2) For each rule process data for all assets belonging to the rule @@ -698,6 +709,7 @@ void NotificationQueue::evalRule(map& results, */ void NotificationQueue::processAllDataBuffers(const string& assetName) { + PRINT_FUNC; // Get the subscriptions instance NotificationSubscription* subscriptions = NotificationSubscription::getInstance(); if (!subscriptions) @@ -712,11 +724,13 @@ void NotificationQueue::processAllDataBuffers(const string& assetName) // Get NotificationManager instance NotificationManager* manager = NotificationManager::getInstance(); + PRINT_FUNC; // Iterate trough subscriptions for (auto it = registeredItems.begin(); it != registeredItems.end(); ++it) { + PRINT_FUNC; lock_guard guard(manager->m_instancesMutex); // Per asset notification map @@ -727,7 +741,7 @@ void NotificationQueue::processAllDataBuffers(const string& assetName) // Get instance pointer NotificationInstance* instance = manager->getNotificationInstance(notificationName); - // Check wether the instance exists and it is enabled + // Check whether the instance exists and it is enabled if (!instance || !instance->getRule() || !instance->isEnabled()) @@ -750,14 +764,18 @@ void NotificationQueue::processAllDataBuffers(const string& assetName) // Get ruleName for the assetName string ruleName = instance->getRule()->getName(); - // Get all assests belonging to current rule + PRINT_FUNC; + + // Get all assets belonging to current rule vector& assets = instance->getRule()->getAssets(); + Logger::getLogger()->info("%s:%d: assets.size()=%d", __FUNCTION__, __LINE__, assets.size()); - // Iterate trough assets + // Iterate through assets for (auto itr = assets.begin(); itr != assets.end(); ++itr) { + PRINT_FUNC; // Process data buffer and fill results this->processDataBuffer(results, ruleName, @@ -765,11 +783,15 @@ void NotificationQueue::processAllDataBuffers(const string& assetName) *itr); } + Logger::getLogger()->info("%s:%d: results.size()=%d, assets.size()=%d, instance->getRule()->evaluateAny()=%s", + __FUNCTION__, __LINE__, results.size(), assets.size(), instance->getRule()->evaluateAny()?"true":"false"); + // Eval rule? We have all assets data or at least one, given the // rule multiple evaluation value set to MultipleEvaluation::M_ANY if (results.size() == assets.size() || (results.size() > 0 && instance->getRule()->evaluateAny())) { + PRINT_FUNC; // Notification data ready: eval data and sent notification this->sendNotification(results, *it); } @@ -795,6 +817,7 @@ bool NotificationQueue::processAllReadings(NotificationDetail& info, vector& readingsData, map& results) { + PRINT_FUNC; bool evalRule = false; string assetName = info.getAssetName(); string ruleName = info.getRuleName(); @@ -823,10 +846,15 @@ bool NotificationQueue::processAllReadings(NotificationDetail& info, } #endif + PRINT_FUNC; + Logger::getLogger()->info("%s:%d: info.getType()=%d, info.getAssetName=%s, info.getRuleName=%s", + __FUNCTION__, __LINE__, info.getType(), info.getAssetName().c_str(), info.getRuleName().c_str()); + switch(info.getType()) { case EvaluationType::SingleItem: case EvaluationType::Interval: + PRINT_FUNC; results[assetName].type = info.getType(); // Add all Reading data this->setSingleItemData(readingsData, results); @@ -844,6 +872,7 @@ bool NotificationQueue::processAllReadings(NotificationDetail& info, { // Process ALL buffers map output; + PRINT_FUNC; this->processAllBuffers(readingsData, info.getType(), info.getInterval(), @@ -929,8 +958,10 @@ bool NotificationQueue::processAllReadings(NotificationDetail& info, void NotificationQueue::sendNotification(map& results, SubscriptionElement& subscription) { + PRINT_FUNC; if (subscription.getInstance()) - { + { + PRINT_FUNC; this->evalRule(results, subscription.getRule()); } } @@ -988,6 +1019,7 @@ void NotificationQueue::processAllBuffers(vector& rea if (((*item)->getTime() - first_time) > timeInterval) { + PRINT_FUNC; // Exit from buffers loop evalRule = true; break; @@ -997,6 +1029,7 @@ void NotificationQueue::processAllBuffers(vector& rea // Return notification data if (buffersDone && evalRule) { + PRINT_FUNC; // Aggregate data in the buffers and set values in result map aggregateData(readingsData, buffersDone, type, result); @@ -1279,6 +1312,7 @@ static void deliverNotificationsExtra( DeliveryPlugin* plugin = delivery.second->getPlugin(); + PRINT_FUNC; sendNotification(delivery.second, plugin, rule, reason); } @@ -1301,6 +1335,7 @@ static void deliverNotificationsExtra( static void deliverNotifications(NotificationRule* rule, const string& data) { + PRINT_FUNC; // Eval notification data via rule "plugin_eval" bool evalRule = rule->getPlugin()->eval(data); @@ -1330,6 +1365,7 @@ static void deliverNotifications(NotificationRule* rule, DeliveryPlugin* plugin = instance->getDeliveryPlugin(); NotificationDelivery* delivery = instance->getDelivery(); + PRINT_FUNC; sendNotification(delivery, plugin, rule, reason); } @@ -1503,22 +1539,25 @@ void NotificationQueue::aggregateData(vector& reading void NotificationQueue::setSingleItemData(vector& readingsData, map& results) { + PRINT_FUNC; for (auto item = readingsData.begin(); item != readingsData.end(); ++item) { const std::vector& readings = (*item)->getData()->getAllReadings(); + Logger::getLogger()->info("NotificationQueue::setSingleItemData: readings vector length=%d", readings.size()); for (auto r = readings.begin(); r != readings.end(); ++r) { + // Logger::getLogger()->info("NotificationQueue::setSingleItemData: (*r)->getAssetName()=%s", (*r)->getAssetName().c_str()); results[(*r)->getAssetName()].rData.push_back(*r); } } } /** - * Build notification JOSN data for time aggregated data + * Build notification JSON data for time aggregated data * * @param readyData Input map with ready time aggregated data * @param output The output string to pass to plugin_eval @@ -1638,6 +1677,7 @@ static void deliverData(NotificationRule* rule, // If all assets are available call plugin_eval, plugin_reason and plugin_deliver if (assets.size() == values.size()) { + PRINT_FUNC; deliverNotifications(rule, output); } } diff --git a/C/services/common/notification_service.cpp b/C/services/common/notification_service.cpp old mode 100644 new mode 100755 index 121aac6..f0e8fe4 --- a/C/services/common/notification_service.cpp +++ b/C/services/common/notification_service.cpp @@ -40,7 +40,8 @@ NotificationService::NotificationService(const string& myName, m_shutdown(false), m_token(token), m_dryRun(false), - m_restartRequest(false) + m_restartRequest(false), + m_storageServiceRestartPending(false) { // Set name m_name = myName; @@ -79,6 +80,43 @@ NotificationService::~NotificationService() delete m_logger; } +bool NotificationService::connectToStorage(std::map>* m) +{ + // Get Storage service + ServiceRecord storageInfo("Fledge Storage"); + if (!m_mgtClient->getService(storageInfo)) + { + m_logger->fatal("Unable to find Fledge storage " + "connection info for service '" + m_name + "'"); + + this->cleanupResources(); + + if (m_restartRequest) + { + PRINT_FUNC; + // Request the Fledge core to restart the service + m_mgtClient->restartService(); + } + else if (!getStorageServiceRestartPendingFlag()) + { + PRINT_FUNC; + // Unregister from Fledge + m_mgtClient->unregisterService(); + } + + return false; + } + m_logger->info("Connect to storage on %s:%d", + storageInfo.getAddress().c_str(), + storageInfo.getPort()); + + // Setup StorageClient + m_storage = new StorageClient(storageInfo.getAddress(), storageInfo.getPort(), m); + + return true; +} + + /** * Start the notification service * by connecting to Fledge core service. @@ -94,26 +132,44 @@ bool NotificationService::start(string& coreAddress, unsigned short managementPort = (unsigned short)0; // Instantiate ManagementApi class + PRINT_FUNC; m_managementApi = new ManagementApi(SERVICE_NAME, managementPort); + PRINT_FUNC; m_managementApi->registerService(this); + PRINT_FUNC; m_managementApi->start(); + PRINT_FUNC; // Allow time for the listeners to start before we register while(m_managementApi->getListenerPort() == 0) { + PRINT_FUNC; sleep(1); } - // Enable http API methods - m_api->initResources(); + // Enable http API methods + PRINT_FUNC; + m_api->initResources(); - // Start the NotificationApi on service port + // Start the NotificationApi on service port + PRINT_FUNC; m_api->start(); + PRINT_FUNC; + sleep(1); // this delay is required only when new storage service instance has been re-started, but seems no way to differentiate that case presently + PRINT_FUNC; // Allow time for the listeners to start before we continue - while(m_api->getListenerPort() == 0) + try { - sleep(1); + while(m_api->getListenerPort() == 0) + { + PRINT_FUNC; + sleep(1); + } + } + catch(std::exception const& ex) + { + m_logger->fatal("Error during m_api->getListenerPort(): error=%s", ex.what()); } // Set Notification callback url prefix @@ -200,6 +256,7 @@ bool NotificationService::start(string& coreAddress, // Register 'm_name' category name to Fledge Core // for configuration changes update this->registerCategory(m_name); + this->registerCategory("storage_service_restart"); // Get 'm_name' category name to Fledge Core @@ -220,38 +277,11 @@ bool NotificationService::start(string& coreAddress, m_logger->info("Starting Notification service '" + m_name + "' ..."); - // Get Storage service - ServiceRecord storageInfo("Fledge Storage"); - if (!m_mgtClient->getService(storageInfo)) - { - m_logger->fatal("Unable to find Fledge storage " - "connection info for service '" + m_name + "'"); - - this->cleanupResources(); - - if (m_restartRequest) - { - // Request the Fledge core to restart the service - m_mgtClient->restartService(); - } - else - { - // Unregister from Fledge - m_mgtClient->unregisterService(); - } - + bool rv = connectToStorage(NULL); + m_logger->info("connectToStorage() 1 returned %s", rv?"true":"false"); + if (rv == false) return false; - } - m_logger->info("Connect to storage on %s:%d", - storageInfo.getAddress().c_str(), - storageInfo.getPort()); - - // Setup StorageClient - StorageClient storageClient(storageInfo.getAddress(), - storageInfo.getPort()); - m_storage = &storageClient; - - + // Setup NotificationManager class NotificationManager instances(m_name, m_mgtClient, this); // Get all notification instances under Notifications @@ -266,16 +296,16 @@ bool NotificationService::start(string& coreAddress, // note we do not get here if m_dryRun is true this->createSecurityCategories(m_mgtClient, m_dryRun); - // We have notitication instances loaded + // We have notification instances loaded // (1.1) Start the NotificationQueue // (1.2) Start the DeliveryQueue - NotificationQueue queue(m_name); - DeliveryQueue dQueue(m_name, m_delivery_threads); + NotificationQueue *queue = new NotificationQueue(m_name); + DeliveryQueue *dQueue = new DeliveryQueue(m_name, m_delivery_threads); // (2) Register notification interest, per assetName: // by call Storage layer Notification API. - NotificationSubscription subscriptions(m_name, storageClient); - subscriptions.registerSubscriptions(); + NotificationSubscription *subscriptions = new NotificationSubscription(m_name, *m_storage); + subscriptions->registerSubscriptions(); // Notification data will be now received via NotificationApi server // and added into the queue for processing. @@ -283,8 +313,111 @@ bool NotificationService::start(string& coreAddress, // .... wait until shutdown ... // Wait for all the API threads to complete - m_api->wait(); + // m_api->wait(); + while (1) + { + if(m_api->serverDown()) + { + m_logger->info("m_api->serverDown() returned TRUE"); + m_api->wait(); // let the HTTP server thread join + m_logger->info("m_api->wait() returned here"); + + m_logger->info("HTTP server down, thread joined, getStorageServiceRestartPendingFlag()=%s", + getStorageServiceRestartPendingFlag()?"true":"false"); + + if (getStorageServiceRestartPendingFlag()) + { + PRINT_FUNC; +#if 1 + // NotificationSubscription* subscriptions = NotificationSubscription::getInstance(); + subscriptions->removeAllSubscriptions(getStorageServiceRestartPendingFlag()); + delete subscriptions; +#endif + PRINT_FUNC; + +#if 0 + // Flush all data in the queues + queue->stop(); delete queue; + dQueue->stop(); delete dQueue; +#endif + + PRINT_FUNC; + m_api->start(); + + PRINT_FUNC; + sleep(1); // this delay is required only when new storage service instance has been started + // but seems no way to differentiate that case + PRINT_FUNC; + + // Allow time for the listeners to start before we continue + try + { + while(m_api->getListenerPort() == 0) + { + PRINT_FUNC; + sleep(1); + } + } + catch(std::exception const& ex) + { + m_logger->fatal("Error during m_api->getListenerPort(): error=%s", ex.what()); + } + PRINT_FUNC; + + m_logger->info("%s:%d: BEFORE: m_storage->getUrlBase().str().c_str()=%s", __FUNCTION__, __LINE__, m_storage->getUrlBase().str().c_str()); + + m_logger->info("calling connectToStorage(): prev m_storage seqMap has %d entries", m_storage->getSeqNumMap()?0:m_storage->getSeqNumMap()->size()); + StorageClient* prev_storage_client = m_storage; + bool rv = connectToStorage(m_storage->getSeqNumMap()); + m_logger->info("connectToStorage() 2 returned %s", rv?"true":"false"); + + delete prev_storage_client; + // previous storage service is now gone, so can't/needn't unregister subscriptions done to it + + m_logger->info("%s:%d: AFTER: m_storage->getUrlBase().str().c_str()=%s", __FUNCTION__, __LINE__, m_storage->getUrlBase().str().c_str()); + + if (rv == false) + return false; + + // Set Notification callback url prefix + m_api->setCallBackURL(); + + m_mgtClient->addAuditEntry("NTFST", + "INFORMATION", + "{\"name\": \"" + m_name + "\"}"); + + PRINT_FUNC; + +#if 0 + + // We have notification instances loaded + // (1.1) Start the NotificationQueue + // (1.2) Start the DeliveryQueue + queue = new NotificationQueue(m_name); + dQueue = new DeliveryQueue(m_name, m_delivery_threads); + + PRINT_FUNC; +#endif + +#if 1 + // (2) Register notification interest, per assetName: + // by calling Storage layer Notification API. + NotificationSubscription *subscriptions = new NotificationSubscription(m_name, *m_storage); + subscriptions->registerSubscriptions(); +#endif + PRINT_FUNC; + resetStorageServiceRestartPendingFlag(); + } + else + { + PRINT_FUNC; + break; + } + } + sleep(1); + } + // Shutdown is starting ... // NOTE: // - Notification API listener is already down. @@ -297,8 +430,8 @@ bool NotificationService::start(string& coreAddress, m_managementApi->stop(); // Flush all data in the queues - queue.stop(); - dQueue.stop(); + queue->stop(); delete queue; + dQueue->stop(); delete dQueue; m_logger->info("Notification service '" + m_name + "' shutdown completed."); @@ -313,18 +446,22 @@ bool NotificationService::start(string& coreAddress, * Unregister notification subscriptions and * stop NotificationAPi listener */ -void NotificationService::stop() +void NotificationService::stop(bool unregisterSubscriptions) { m_logger->info("Stopping Notification service '" + m_name + "' ..."); - // Unregister notifications to storage service - NotificationSubscription* subscriptions = NotificationSubscription::getInstance(); - if (subscriptions) + if (unregisterSubscriptions) { - subscriptions->unregisterSubscriptions(); + // Unregister notifications to storage service + NotificationSubscription* subscriptions = NotificationSubscription::getInstance(); + if (subscriptions) + { + subscriptions->unregisterSubscriptions(); + } } // Stop the NotificationApi + m_logger->info("1. Calling m_api->stop()"); m_api->stop(); } @@ -449,6 +586,8 @@ void NotificationService::configChange(const string& categoryName, NotificationManager* notifications = NotificationManager::getInstance(); NotificationInstance* instance = NULL; + m_logger->info("NotificationService::configChange(): categoryName=%s, category=%s", categoryName.c_str(), category.c_str()); + if (categoryName == m_name) { ConfigCategory config(categoryName, category); @@ -467,6 +606,17 @@ void NotificationService::configChange(const string& categoryName, return; } + // Update the Security category + if (categoryName.compare("storage_service_restart") == 0) + { + m_logger->info("Notification service received storage_service_restart in configChange"); + m_logger->info("2. Calling m_api->stop()"); + m_api->stop(); + setStorageServiceRestartPendingFlag(); + + return; + } + std::size_t found; std::size_t foundRule = categoryName.find("rule"); std::size_t foundDelivery = categoryName.find(CATEGORY_DELIVERY_PREFIX); diff --git a/C/services/common/notification_subscription.cpp b/C/services/common/notification_subscription.cpp old mode 100644 new mode 100755 index 47544b5..8591436 --- a/C/services/common/notification_subscription.cpp +++ b/C/services/common/notification_subscription.cpp @@ -136,6 +136,7 @@ void NotificationSubscription::registerSubscriptions() } // Create a new subscription + m_logger->info("NotificationSubscription::registerSubscriptions(): calling createSubscription()"); bool ret = this->createSubscription(instance); } // Unlock instances @@ -176,8 +177,11 @@ bool NotificationSubscription::addSubscription(const std::string& assetName, * We can have different Subscriptions for each asset: * add new one into the vector */ + m_logger->info("NotificationSubscription::addSubscription(): adding SubscriptionElement to m_subscriptions[%s]", assetName.c_str()); m_subscriptions[assetName].push_back(element); - + m_logger->info("NotificationSubscription::addSubscription(): AFTER: m_subscriptions.size()=%d, m_subscriptions[%s] has %d elements", + m_subscriptions.size(), assetName.c_str(), m_subscriptions[assetName].size()); + // Register once per asset Notification interest to Storage server if (m_subscriptions[assetName].size() == 1) { @@ -358,6 +362,7 @@ bool NotificationSubscription::createSubscription(NotificationInstance* instance type); // Add assetInfo to its rule + Logger::getLogger()->info("Calling theRule->addAsset(assetInfo): rulename=%s, assetname=%s", theRule->getName().c_str(), asset.c_str()); theRule->addAsset(assetInfo); // Create subscription object @@ -380,17 +385,21 @@ bool NotificationSubscription::createSubscription(NotificationInstance* instance * @param ruleName The associated ruleName */ void NotificationSubscription::removeSubscription(const string& assetName, - const string& ruleName) + const string& ruleName, bool storageServiceRestartPending) { + PRINT_FUNC; // Get all instances NotificationManager* manager = NotificationManager::getInstance(); - + PRINT_FUNC; // Get subscriptions for assetName this->lockSubscriptions(); + PRINT_FUNC; map>& allSubscriptions = this->getAllSubscriptions(); + Logger::getLogger()->info("NotificationSubscription::removeSubscription: BEFORE: Number of subscriptions=%d", allSubscriptions.size()); auto it = allSubscriptions.find(assetName); bool ret = it != allSubscriptions.end(); + PRINT_FUNC; // For the found assetName subscriptions // 1- Unsubscribe notification interest for assetNamme @@ -399,12 +408,15 @@ void NotificationSubscription::removeSubscription(const string& assetName, // 4- Remove Subscription if (ret) { + PRINT_FUNC; vector& elems = (*it).second; if (elems.size() == 1) { + PRINT_FUNC; // 1- We have only one subscription for current asset // call unregister interest - this->unregisterSubscription(assetName); + if (!storageServiceRestartPending) + this->unregisterSubscription(assetName); } // Get Notification queue instance @@ -455,11 +467,66 @@ void NotificationSubscription::removeSubscription(const string& assetName, } } + Logger::getLogger()->info("NotificationSubscription::removeSubscription: elems.size()=%d", elems.size()); + // 4- Remove subscription if array is empty if (!elems.size()) { + PRINT_FUNC; allSubscriptions.erase(it); } } + Logger::getLogger()->info("NotificationSubscription::removeSubscription: AFTER: Number of subscriptions=%d", allSubscriptions.size()); this->unlockSubscriptions(); + PRINT_FUNC; +} + + +/** + * Remove a given subscription + * + * @param assetName The register assetName for notifications + * @param ruleName The associated ruleName + */ +void NotificationSubscription::removeAllSubscriptions(bool storageServiceRestartPending) +{ + // Get all NotificationSubscriptions + m_subscriptionMutex.lock(); + PRINT_FUNC; + // std::map> + auto & subscriptions = this->getAllSubscriptions(); + PRINT_FUNC; + Logger::getLogger()->info("%s:%d: subscriptions.size()=%d", __FUNCTION__, __LINE__, subscriptions.size()); + PRINT_FUNC; + m_subscriptionMutex.unlock(); + + int n=0; + for (auto & it : subscriptions) + { + PRINT_FUNC; + std::string assetName = it.first; + PRINT_FUNC; + auto & seVector = it.second; + PRINT_FUNC; + Logger::getLogger()->info("%s:%d: assetName=%s, SubscriptionElement vec size=%d", + __FUNCTION__, __LINE__, assetName.c_str(), seVector.size()); + PRINT_FUNC; + for(auto & se : seVector) + { + PRINT_FUNC; + if (!se.getRule()) + break; + Logger::getLogger()->info("SubscriptionElement: assetName=%s, ruleName=%s", + se.getAssetName().c_str(), se.getRule()->getName().c_str()); + removeSubscription(se.getAssetName(), se.getRule()->getName(), storageServiceRestartPending); + } + PRINT_FUNC; + n++; + Logger::getLogger()->info("%s:%d: n=%d, subscriptions.size()=%d", + __FUNCTION__, __LINE__, n, subscriptions.size()); + if (n >= subscriptions.size()) + break; + } + PRINT_FUNC; } +