@@ -21,7 +21,7 @@ ThreadPool::~ThreadPool()
2121 std::vector<std::thread> workers;
2222 {
2323 auto state (state_.lock ());
24- state-> quit = true ;
24+ quit = true ;
2525 std::swap (workers, state->workers );
2626 }
2727
@@ -36,7 +36,7 @@ ThreadPool::~ThreadPool()
3636void ThreadPool::enqueue (const work_t & t)
3737{
3838 auto state (state_.lock ());
39- if (state-> quit )
39+ if (quit)
4040 throw ThreadPoolShutDown (" cannot enqueue a work item while the thread pool is shutting down" );
4141 state->left .push (t);
4242 if (state->left .size () > state->workers .size () && state->workers .size () < maxThreads)
@@ -46,53 +46,84 @@ void ThreadPool::enqueue(const work_t & t)
4646
4747void ThreadPool::process ()
4848{
49+ /* Loop until there are no active work items *and* there either
50+ are no queued items or there is an exception. The
51+ post-condition is that no new items will become active. */
4952 while (true ) {
5053 auto state (state_.lock ());
51- if (state->exception )
52- std::rethrow_exception (state->exception );
53- if (state->left .empty () && !state->pending ) break ;
54+ if (!state->active ) {
55+ if (state->exception )
56+ std::rethrow_exception (state->exception );
57+ if (state->left .empty ())
58+ break ;
59+ }
5460 state.wait (done);
5561 }
5662}
5763
5864void ThreadPool::workerEntry ()
5965{
66+ interruptCheck = [&]() { return (bool ) quit; };
67+
6068 bool didWork = false ;
69+ std::exception_ptr exc;
6170
6271 while (true ) {
6372 work_t w;
6473 {
6574 auto state (state_.lock ());
75+
76+ if (didWork) {
77+ assert (state->active );
78+ state->active --;
79+
80+ if (exc) {
81+
82+ if (!state->exception ) {
83+ state->exception = exc;
84+ // Tell the other workers to quit.
85+ quit = true ;
86+ work.notify_all ();
87+ } else {
88+ /* Print the exception, since we can't
89+ propagate it. */
90+ try {
91+ std::rethrow_exception (exc);
92+ } catch (std::exception & e) {
93+ if (!dynamic_cast <Interrupted*>(&e) &&
94+ !dynamic_cast <ThreadPoolShutDown*>(&e))
95+ ignoreException ();
96+ } catch (...) {
97+ }
98+ }
99+ }
100+ }
101+
102+ /* Wait until a work item is available or another thread
103+ had an exception or we're asked to quit. */
66104 while (true ) {
67- if (state->quit || state->exception ) return ;
68- if (didWork) {
69- assert (state->pending );
70- state->pending --;
71- didWork = false ;
105+ if (quit) {
106+ if (!state->active )
107+ done.notify_one ();
108+ return ;
72109 }
73110 if (!state->left .empty ()) break ;
74- if (!state->pending )
75- done.notify_all ();
111+ if (!state->active ) {
112+ done.notify_one ();
113+ return ;
114+ }
76115 state.wait (work);
77116 }
78- w = state->left .front ();
117+
118+ w = std::move (state->left .front ());
79119 state->left .pop ();
80- state->pending ++;
120+ state->active ++;
81121 }
82122
83123 try {
84124 w ();
85- } catch (std::exception & e) {
86- auto state (state_.lock ());
87- if (state->exception ) {
88- if (!dynamic_cast <Interrupted*>(&e) &&
89- !dynamic_cast <ThreadPoolShutDown*>(&e))
90- printError (format (" error: %s" ) % e.what ());
91- } else {
92- state->exception = std::current_exception ();
93- work.notify_all ();
94- done.notify_all ();
95- }
125+ } catch (...) {
126+ exc = std::current_exception ();
96127 }
97128
98129 didWork = true ;
0 commit comments