Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions C/services/dispatcher/include/pipeline_execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class PipelineExecutionContext {
void rePlumbFilters();
bool loadPipeline();
PLUGIN_HANDLE loadFilterPlugin(const std::string& filterName);
void shutdownPlugin(FilterPlugin *plugin);

static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET* readings);
static void useFilteredData(OUTPUT_HANDLE *outHandle, READINGSET* readings);
Expand Down
1 change: 1 addition & 0 deletions C/services/dispatcher/include/pipeline_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class ControlPipelineManager {
return m_managementClient;
}
void setService(DispatcherService *service) { m_dispatcher = service; };
DispatcherService *getService() { return m_dispatcher; };

void registerCategory(const std::string& category, FilterPlugin *plugin);
void unregisterCategory(const std::string& category, FilterPlugin *plugin);
Expand Down
42 changes: 38 additions & 4 deletions C/services/dispatcher/pipeline_execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Author: Mark Riddoch
*
*/
#include <dispatcher_service.h>
#include <pipeline_execution.h>
#include <pipeline_manager.h>
#include <plugin_manager.h>
Expand Down Expand Up @@ -45,7 +46,7 @@ PipelineExecutionContext::~PipelineExecutionContext()

for (int i = 0; i < m_plugins.size(); i++)
{
m_plugins[i]->shutdown();
shutdownPlugin(m_plugins[i]);
delete m_plugins[i];
}

Expand Down Expand Up @@ -151,6 +152,13 @@ bool rval = true;
plugin->init(updatedCfg, (OUTPUT_HANDLE *)this, filterReadingSetFn(useFilteredData));
}
m_pipelineManager->registerCategory(m_filters[i], plugin);

if (plugin->persistData())
{
plugin->m_plugin_data = new PluginData(m_pipelineManager->getService()->getStorageClient());
string pluginStoredData = plugin->m_plugin_data->loadStoredData(m_name + plugin->getName());
plugin->startData(pluginStoredData);
}
}

if (!rval)
Expand Down Expand Up @@ -350,6 +358,12 @@ void PipelineExecutionContext::addFilter(const string& filter, int order)
auto it = m_plugins.begin();
it += (order - 1);
m_plugins.insert(it, currentPlugin);
if (currentPlugin->persistData())
{
currentPlugin->m_plugin_data = new PluginData(m_pipelineManager->getService()->getStorageClient());
string pluginStoredData = currentPlugin->m_plugin_data->loadStoredData(m_name + currentPlugin->getName());
currentPlugin->startData(pluginStoredData);
}
}
}
}
Expand All @@ -373,7 +387,7 @@ void PipelineExecutionContext::addFilter(const string& filter, int order)
if (m_plugins.size() > 1)
{
FilterPlugin *previous = m_plugins[m_plugins.size() - 1];
previous->shutdown();
shutdownPlugin(previous);
ConfigCategory prevConfig = m_management->getCategory(m_filters[m_plugins.size() - 1]);
previous->init(prevConfig, currentPlugin, filterReadingSetFn(passToOnwardFilter));
}
Expand All @@ -394,7 +408,7 @@ void PipelineExecutionContext::removeFilter(const string& filter)
m_filters.erase(it);
}
// Unregister the filter plugin from pipeline and remove the filter plugin from m_plugins collection
m_plugins[index]->shutdown();
shutdownPlugin(m_plugins[index]);
m_pipelineManager->unregisterCategory(filter, m_plugins[index]);
delete (m_plugins[index]);
m_plugins.erase(m_plugins.begin()+index);
Expand Down Expand Up @@ -439,7 +453,7 @@ void PipelineExecutionContext::rePlumbFilters()
{
FilterPlugin *plugin = *it;
ConfigCategory updatedCfg = m_management->getCategory(m_filters[i]);
plugin->shutdown();
shutdownPlugin(plugin);
if ((it + 1) != m_plugins.end())
{
plugin->init(updatedCfg, *(it + 1), filterReadingSetFn(passToOnwardFilter));
Expand All @@ -450,3 +464,23 @@ void PipelineExecutionContext::rePlumbFilters()
}
}
}

/**
* Shutdown the specified plugin and optional save any persisted data
*
* @param plugin The plugin to shutdown
*/
void PipelineExecutionContext::shutdownPlugin(FilterPlugin *plugin)
{

if (plugin->persistData())
{
string data = plugin->shutdownSaveData();
string key(m_name + plugin->getName());
plugin->m_plugin_data->persistPluginData(key, data);
}
else
{
plugin->shutdown();
}
}