@@ -4,14 +4,14 @@ package hxcoro.thread;
44#error " This class is not available on this target"
55#end
66
7+ import haxe .coro .dispatchers .IDispatchObject ;
78import haxe .ds .Vector ;
9+ import hxcoro .concurrent .AtomicState ;
10+ import hxcoro .concurrent .BackOff ;
811import sys .thread .Condition ;
912import sys .thread .Semaphore ;
10- import sys .thread .Tls ;
1113import sys .thread .Thread ;
12- import hxcoro .concurrent .AtomicState ;
13- import hxcoro .concurrent .BackOff ;
14- import haxe .coro .dispatchers .IDispatchObject ;
14+ import sys .thread .Tls ;
1515
1616typedef DispatchQueue = WorkStealingQueue <IDispatchObject >;
1717
@@ -57,9 +57,8 @@ class FixedThreadPool implements IThreadPool {
5757 function get_isShutDown (): Bool return shutdownState .load () != Active ;
5858
5959 final shutdownState : AtomicState <ShutdownState >;
60- final cond : Condition ;
60+ final semaphore : Semaphore ;
6161 final pool : Array <Worker >;
62- final activity : WorkerActivity ;
6362 final queueTls : Tls <DispatchQueue >;
6463
6564 /**
@@ -70,12 +69,11 @@ class FixedThreadPool implements IThreadPool {
7069 throw new ThreadPoolException (' FixedThreadPool needs threadsCount to be at least 1.' );
7170 this .threadsCount = threadsCount ;
7271 shutdownState = new AtomicState (Active );
73- cond = new Condition ( );
72+ semaphore = new Semaphore ( 0 );
7473 queueTls = new Tls ();
7574 final queues = Vector .fromArrayCopy ([for (_ in 0 ... threadsCount + 1 ) new WorkStealingQueue ()]);
7675 queueTls .value = queues [0 ];
77- activity = new WorkerActivity (threadsCount );
78- pool = [for (i in 0 ... threadsCount ) new Worker (cond , queueTls , queues , i + 1 , activity )];
76+ pool = [for (i in 0 ... threadsCount ) new Worker (semaphore , queueTls , queues , i + 1 )];
7977 for (worker in pool ) {
8078 worker .start ();
8179 }
@@ -92,12 +90,7 @@ class FixedThreadPool implements IThreadPool {
9290 throw new ThreadPoolException (' Task to run must not be null.' );
9391 }
9492 queueTls .value .add (obj );
95- if (cond .tryAcquire ()) {
96- // If no one holds onto the condition, notify someone.
97- activity .hadEvent = true ;
98- cond .signal ();
99- cond .release ();
100- }
93+ semaphore .release ();
10194 }
10295
10396 /**
@@ -112,10 +105,8 @@ class FixedThreadPool implements IThreadPool {
112105
113106 for (worker in pool ) {
114107 worker .shutDown (shutdownSemaphore );
108+ semaphore .release ();
115109 }
116- cond .acquire ();
117- cond .broadcast ();
118- cond .release ();
119110 if (block ) {
120111 final ownQueue = queueTls .value ;
121112 for (worker in pool ) {
@@ -140,7 +131,6 @@ class FixedThreadPool implements IThreadPool {
140131 }
141132 Sys .println (' \t total worker loops: $totalLoops ' );
142133 Sys .println (' \t total worker dispatches: $totalDispatches ' );
143- Sys .println (' \t workers (active/available/total): ${activity .activeWorkers }/ ${activity .availableWorkers }/ ${pool .length }' );
144134 Sys .print (' \t queue 0: ' );
145135 queueTls .value .dump ();
146136 for (worker in pool ) {
@@ -194,17 +184,15 @@ private class Worker {
194184 public var numLooped (default , null ): Int ;
195185
196186 var shutdownSemaphore : Null <Semaphore >;
197- final cond : Condition ;
187+ final semaphore : Semaphore ;
198188 final queues : Vector <DispatchQueue >;
199189 final ownQueueIndex : Int ;
200- final activity : WorkerActivity ;
201190 final queueTls : Tls <DispatchQueue >;
202191
203- public function new (cond : Condition , queueTls : Tls <DispatchQueue >, queues : Vector <DispatchQueue >, ownQueueIndex : Int , activity : WorkerActivity ) {
204- this .cond = cond ;
192+ public function new (semaphore : Semaphore , queueTls : Tls <DispatchQueue >, queues : Vector <DispatchQueue >, ownQueueIndex : Int ) {
193+ this .semaphore = semaphore ;
205194 this .queues = queues ;
206195 this .ownQueueIndex = ownQueueIndex ;
207- this .activity = activity ;
208196 this .queueTls = queueTls ;
209197 numDispatched = 0 ;
210198 numLooped = 0 ;
@@ -260,46 +248,18 @@ private class Worker {
260248 inShutdown = false ;
261249 }
262250
263- // If we did nothing, wait for the condition variable.
264- if (cond .tryAcquire ()) {
265- // An event could have come in between the queue checking and the acquire. In this
266- // case, the condition might have been signalled to, but we would still go to sleep.
267- if (activity .hadEvent ) {
268- activity .hadEvent = false ;
269- cond .release ();
251+ if (shutdownSemaphore != null ) {
252+ if (inShutdown ) {
253+ semaphore .release ();
254+ break ;
255+ } else {
256+ inShutdown = true ;
270257 continue ;
271258 }
272- if (shutdownSemaphore != null ) {
273- if (inShutdown ) {
274- -- activity .activeWorkers ;
275- -- activity .availableWorkers ;
276- cond .broadcast ();
277- cond .release ();
278- break ;
279- } else {
280- inShutdown = true ;
281- cond .broadcast ();
282- cond .release ();
283- continue ;
284- }
285- }
286- if (activity .activeWorkers == 1 ) {
287- // Always keep one thread alive until we can find a better solution to the
288- // run/loop synchronization problem.
289- cond .release ();
290- BackOff .backOff ();
291- continue ;
292- }
293- // These modifications are fine because we hold onto the cond mutex.
294- -- activity .activeWorkers ;
295- state = Waiting ;
296- cond .wait ();
297- state = CheckingQueues ;
298- ++ activity .activeWorkers ;
299- cond .release ();
300- } else {
301- BackOff .backOff ();
302259 }
260+ state = Waiting ;
261+ semaphore .acquire ();
262+ state = CheckingQueues ;
303263 }
304264 }
305265
0 commit comments