diff --git a/C/services/dispatcher/include/pipeline_execution.h b/C/services/dispatcher/include/pipeline_execution.h index 473d36c..3509058 100644 --- a/C/services/dispatcher/include/pipeline_execution.h +++ b/C/services/dispatcher/include/pipeline_execution.h @@ -55,6 +55,7 @@ class PipelineExecutionContext { void rePlumbFilters(); bool loadPipeline(); PLUGIN_HANDLE loadFilterPlugin(const std::string& filterName); + void shutdownPlugin(FilterPlugin *plugin); static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET* readings); static void useFilteredData(OUTPUT_HANDLE *outHandle, READINGSET* readings); diff --git a/C/services/dispatcher/include/pipeline_manager.h b/C/services/dispatcher/include/pipeline_manager.h index ca320cb..fee889f 100644 --- a/C/services/dispatcher/include/pipeline_manager.h +++ b/C/services/dispatcher/include/pipeline_manager.h @@ -169,6 +169,7 @@ class ControlPipelineManager { return m_managementClient; } void setService(DispatcherService *service) { m_dispatcher = service; }; + DispatcherService *getService() { return m_dispatcher; }; void registerCategory(const std::string& category, FilterPlugin *plugin); void unregisterCategory(const std::string& category, FilterPlugin *plugin); diff --git a/C/services/dispatcher/pipeline_execution.cpp b/C/services/dispatcher/pipeline_execution.cpp index 4e6cb5c..906720b 100644 --- a/C/services/dispatcher/pipeline_execution.cpp +++ b/C/services/dispatcher/pipeline_execution.cpp @@ -8,6 +8,7 @@ * Author: Mark Riddoch * */ +#include #include #include #include @@ -45,7 +46,7 @@ PipelineExecutionContext::~PipelineExecutionContext() for (int i = 0; i < m_plugins.size(); i++) { - m_plugins[i]->shutdown(); + shutdownPlugin(m_plugins[i]); delete m_plugins[i]; } @@ -151,6 +152,13 @@ bool rval = true; plugin->init(updatedCfg, (OUTPUT_HANDLE *)this, filterReadingSetFn(useFilteredData)); } m_pipelineManager->registerCategory(m_filters[i], plugin); + + if (plugin->persistData()) + { + plugin->m_plugin_data = new PluginData(m_pipelineManager->getService()->getStorageClient()); + string pluginStoredData = plugin->m_plugin_data->loadStoredData(m_name + plugin->getName()); + plugin->startData(pluginStoredData); + } } if (!rval) @@ -350,6 +358,12 @@ void PipelineExecutionContext::addFilter(const string& filter, int order) auto it = m_plugins.begin(); it += (order - 1); m_plugins.insert(it, currentPlugin); + if (currentPlugin->persistData()) + { + currentPlugin->m_plugin_data = new PluginData(m_pipelineManager->getService()->getStorageClient()); + string pluginStoredData = currentPlugin->m_plugin_data->loadStoredData(m_name + currentPlugin->getName()); + currentPlugin->startData(pluginStoredData); + } } } } @@ -373,7 +387,7 @@ void PipelineExecutionContext::addFilter(const string& filter, int order) if (m_plugins.size() > 1) { FilterPlugin *previous = m_plugins[m_plugins.size() - 1]; - previous->shutdown(); + shutdownPlugin(previous); ConfigCategory prevConfig = m_management->getCategory(m_filters[m_plugins.size() - 1]); previous->init(prevConfig, currentPlugin, filterReadingSetFn(passToOnwardFilter)); } @@ -394,7 +408,7 @@ void PipelineExecutionContext::removeFilter(const string& filter) m_filters.erase(it); } // Unregister the filter plugin from pipeline and remove the filter plugin from m_plugins collection - m_plugins[index]->shutdown(); + shutdownPlugin(m_plugins[index]); m_pipelineManager->unregisterCategory(filter, m_plugins[index]); delete (m_plugins[index]); m_plugins.erase(m_plugins.begin()+index); @@ -439,7 +453,7 @@ void PipelineExecutionContext::rePlumbFilters() { FilterPlugin *plugin = *it; ConfigCategory updatedCfg = m_management->getCategory(m_filters[i]); - plugin->shutdown(); + shutdownPlugin(plugin); if ((it + 1) != m_plugins.end()) { plugin->init(updatedCfg, *(it + 1), filterReadingSetFn(passToOnwardFilter)); @@ -450,3 +464,23 @@ void PipelineExecutionContext::rePlumbFilters() } } } + +/** + * Shutdown the specified plugin and optional save any persisted data + * + * @param plugin The plugin to shutdown + */ +void PipelineExecutionContext::shutdownPlugin(FilterPlugin *plugin) +{ + + if (plugin->persistData()) + { + string data = plugin->shutdownSaveData(); + string key(m_name + plugin->getName()); + plugin->m_plugin_data->persistPluginData(key, data, "control"); + } + else + { + plugin->shutdown(); + } +}