Skip to content

Commit 0f112fd

Browse files
committed
Fix construction of new class
1 parent a58af34 commit 0f112fd

File tree

3 files changed

+76
-50
lines changed

3 files changed

+76
-50
lines changed

src/Dataflow/Engine/Controller/NetworkEditorController.cc

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ NetworkEditorController::NetworkEditorController(ModuleFactoryHandle mf, ModuleS
6464
reexFactory_(reex),
6565
executorFactory_(executorFactory),
6666
serializationManager_(nesm),
67-
executionManager_(currentExecutor_), //TODO: pass in factory instead
6867
signalSwitch_(true)
6968
{
7069
dynamicPortManager_.reset(new DynamicPortManager(connectionAdded_, connectionRemoved_, this));
@@ -77,7 +76,7 @@ NetworkEditorController::NetworkEditorController(ModuleFactoryHandle mf, ModuleS
7776
}
7877

7978
NetworkEditorController::NetworkEditorController(SCIRun::Dataflow::Networks::NetworkHandle network, ExecutionStrategyFactoryHandle executorFactory, NetworkEditorSerializationManager* nesm)
80-
: theNetwork_(network), executorFactory_(executorFactory), serializationManager_(nesm), executionManager_(currentExecutor_) //TODO: pass in factory instead
79+
: theNetwork_(network), executorFactory_(executorFactory), serializationManager_(nesm)
8180
{
8281
}
8382

@@ -352,10 +351,7 @@ void NetworkEditorController::executeModule(const ModuleHandle& module, const Ex
352351

353352
void NetworkEditorController::initExecutor()
354353
{
355-
if (!currentExecutor_)
356-
{
357-
currentExecutor_ = executorFactory_->createDefault();
358-
}
354+
executionManager_.initExecutor(executorFactory_);
359355
}
360356

361357
ExecutionContextHandle NetworkEditorController::createExecutionContext(const ExecutableLookup* lookup, ModuleFilter filter)
@@ -372,46 +368,6 @@ void NetworkEditorController::executeGeneric(const ExecutableLookup* lookup, Mod
372368
executionManager_.enqueueContext(context);
373369
}
374370

375-
ExecutionQueueManager::ExecutionQueueManager(ExecutionStrategyHandle& currentExecutor) : contexts_(2), currentExecutor_(currentExecutor),
376-
executionLaunchThread_([this]() {executeTopContext(); } ),
377-
executionMutex_("executionQueue"),
378-
somethingToExecute_("executionQueue")
379-
{
380-
}
381-
382-
void ExecutionQueueManager::enqueueContext(ExecutionContextHandle context)
383-
{
384-
bool contextReady = false;
385-
{
386-
Guard g(executionMutex_.get());
387-
contextReady = contexts_.push(context);
388-
}
389-
if (contextReady)
390-
{
391-
contextCount_.fetch_add(1);
392-
//std::cout << "ctx queued" << std::endl;
393-
somethingToExecute_.conditionBroadcast();
394-
}
395-
else
396-
{
397-
//std::cout << "ctx queue is full" << std::endl;
398-
}
399-
}
400-
401-
void ExecutionQueueManager::executeTopContext()
402-
{
403-
while (true)
404-
{
405-
UniqueLock lock(executionMutex_.get());
406-
while (0 == contextCount_)
407-
{
408-
somethingToExecute_.wait(lock);
409-
}
410-
contexts_.consume_one([&](ExecutionContextHandle ctx) { currentExecutor_->execute(*ctx); });
411-
contextCount_.fetch_sub(1);
412-
}
413-
}
414-
415371
NetworkHandle NetworkEditorController::getNetwork() const
416372
{
417373
return theNetwork_;
@@ -430,7 +386,7 @@ NetworkGlobalSettings& NetworkEditorController::getSettings()
430386

431387
void NetworkEditorController::setExecutorType(int type)
432388
{
433-
currentExecutor_ = executorFactory_->create((ExecutionStrategy::Type)type);
389+
executionManager_.setExecutionStrategy(executorFactory_->create((ExecutionStrategy::Type)type));
434390
}
435391

436392
const ModuleDescriptionMap& NetworkEditorController::getAllAvailableModuleDescriptions() const

src/Dataflow/Engine/Scheduler/ExecutionStrategy.cc

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
using namespace SCIRun::Dataflow::Engine;
3232
using namespace SCIRun::Dataflow::Networks;
33+
using namespace SCIRun::Core::Thread;
3334

3435
ExecutionBounds ExecutionContext::executionBounds_;
3536

@@ -51,3 +52,68 @@ ModuleFilter ExecutionContext::addAdditionalFilter(ModuleFilter filter) const
5152

5253
return boost::bind(filter, _1) && boost::bind(additionalFilter, _1);
5354
}
55+
56+
ExecutionQueueManager::ExecutionQueueManager() :
57+
contexts_(2),
58+
executionMutex_("executionQueue"),
59+
somethingToExecute_("executionQueue")
60+
{
61+
}
62+
63+
void ExecutionQueueManager::setExecutionStrategy(ExecutionStrategyHandle exec)
64+
{
65+
Guard g(executionMutex_.get());
66+
currentExecutor_ = exec;
67+
}
68+
69+
void ExecutionQueueManager::initExecutor(ExecutionStrategyFactoryHandle factory)
70+
{
71+
if (!currentExecutor_ && factory)
72+
currentExecutor_ = factory->createDefault();
73+
if (!executionLaunchThread_)
74+
start();
75+
}
76+
77+
void ExecutionQueueManager::start()
78+
{
79+
executionLaunchThread_.reset(new boost::thread([this]() { executeTopContext(); }));
80+
}
81+
82+
void ExecutionQueueManager::enqueueContext(ExecutionContextHandle context)
83+
{
84+
bool contextReady = false;
85+
{
86+
Guard g(executionMutex_.get());
87+
contextReady = contexts_.push(context);
88+
}
89+
if (contextReady)
90+
{
91+
contextCount_.fetch_add(1);
92+
//std::cout << "ctx queued" << std::endl;
93+
somethingToExecute_.conditionBroadcast();
94+
}
95+
else
96+
{
97+
//std::cout << "ctx queue is full" << std::endl;
98+
}
99+
}
100+
101+
void ExecutionQueueManager::executeTopContext()
102+
{
103+
while (true)
104+
{
105+
UniqueLock lock(executionMutex_.get());
106+
while (0 == contextCount_)
107+
{
108+
somethingToExecute_.wait(lock);
109+
}
110+
contexts_.consume_one([&](ExecutionContextHandle ctx) { if (currentExecutor_) currentExecutor_->execute(*ctx); });
111+
contextCount_.fetch_sub(1);
112+
}
113+
}
114+
115+
void ExecutionQueueManager::stop()
116+
{
117+
executionLaunchThread_->interrupt();
118+
executionLaunchThread_.reset();
119+
}

src/Dataflow/Engine/Scheduler/ExecutionStrategy.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,19 @@ namespace Engine {
7272
class SCISHARE ExecutionQueueManager
7373
{
7474
public:
75-
explicit ExecutionQueueManager(ExecutionStrategyHandle& currentExecutor);
75+
ExecutionQueueManager();
76+
void initExecutor(ExecutionStrategyFactoryHandle factory);
77+
void setExecutionStrategy(ExecutionStrategyHandle exec);
7678
void enqueueContext(ExecutionContextHandle context);
79+
void start();
80+
void stop();
7781
private:
7882
typedef DynamicExecutor::WorkQueue<ExecutionContextHandle>::Impl ExecutionContextQueue;
7983
ExecutionContextQueue contexts_;
8084

81-
ExecutionStrategyHandle& currentExecutor_;
85+
ExecutionStrategyHandle currentExecutor_;
8286

83-
boost::thread executionLaunchThread_;
87+
boost::shared_ptr<boost::thread> executionLaunchThread_;
8488
Core::Thread::Mutex executionMutex_;
8589
Core::Thread::ConditionVariable somethingToExecute_;
8690
boost::atomic<int> contextCount_; // need certain member function on spsc_queue, need to check boost version...

0 commit comments

Comments
 (0)