Skip to content

Commit 82e6b15

Browse files
committed
Execution queue improvements
1 parent 6c7b0e5 commit 82e6b15

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

src/Dataflow/Engine/Scheduler/ExecutionStrategy.cc

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ ModuleFilter ExecutionContext::addAdditionalFilter(ModuleFilter filter) const
5454
}
5555

5656
ExecutionQueueManager::ExecutionQueueManager() :
57-
contexts_(2),
57+
contexts_(10),
5858
executionMutex_("executionQueue"),
59-
somethingToExecute_("executionQueue")
59+
somethingToExecute_("executionQueue"),
60+
contextCount_(0)
6061
{
6162
}
6263

@@ -70,8 +71,6 @@ void ExecutionQueueManager::initExecutor(ExecutionStrategyFactoryHandle factory)
7071
{
7172
if (!currentExecutor_ && factory)
7273
currentExecutor_ = factory->createDefault();
73-
if (!executionLaunchThread_)
74-
start();
7574
}
7675

7776
void ExecutionQueueManager::start()
@@ -85,10 +84,13 @@ void ExecutionQueueManager::enqueueContext(ExecutionContextHandle context)
8584
{
8685
Guard g(executionMutex_.get());
8786
contextReady = contexts_.push(context);
87+
if (contextReady)
88+
contextCount_.fetch_add(1);
8889
}
8990
if (contextReady)
9091
{
91-
contextCount_.fetch_add(1);
92+
if (!executionLaunchThread_)
93+
start();
9294
//std::cout << "ctx queued" << std::endl;
9395
somethingToExecute_.conditionBroadcast();
9496
}
@@ -105,10 +107,14 @@ void ExecutionQueueManager::executeTopContext()
105107
UniqueLock lock(executionMutex_.get());
106108
while (0 == contextCount_)
107109
{
110+
//std::cout << "waiting on launch thread lock " << boost::this_thread::get_id() << std::endl;
108111
somethingToExecute_.wait(lock);
109112
}
110-
contexts_.consume_one([&](ExecutionContextHandle ctx) { if (currentExecutor_) currentExecutor_->execute(*ctx); });
111-
contextCount_.fetch_sub(1);
113+
//std::cout << "consuming on launch thread " << boost::this_thread::get_id() << std::endl;
114+
if (contexts_.consume_one([&](ExecutionContextHandle ctx) { if (currentExecutor_) currentExecutor_->execute(*ctx); }))
115+
contextCount_.fetch_sub(1);
116+
//std::cout << "sleeping on launch thread " << boost::this_thread::get_id() << std::endl;
117+
//boost::this_thread::sleep(boost::posix_time::milliseconds(500));
112118
}
113119
}
114120

0 commit comments

Comments
 (0)