Skip to content

Commit 6021be9

Browse files
committed
use Semaphore instead of Condition for thread pool synchronization
1 parent ccba54e commit 6021be9

File tree

1 file changed

+21
-61
lines changed

1 file changed

+21
-61
lines changed

src/hxcoro/thread/FixedThreadPool.hx

Lines changed: 21 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ package hxcoro.thread;
44
#error "This class is not available on this target"
55
#end
66

7+
import haxe.coro.dispatchers.IDispatchObject;
78
import haxe.ds.Vector;
9+
import hxcoro.concurrent.AtomicState;
10+
import hxcoro.concurrent.BackOff;
811
import sys.thread.Condition;
912
import sys.thread.Semaphore;
10-
import sys.thread.Tls;
1113
import sys.thread.Thread;
12-
import hxcoro.concurrent.AtomicState;
13-
import hxcoro.concurrent.BackOff;
14-
import haxe.coro.dispatchers.IDispatchObject;
14+
import sys.thread.Tls;
1515

1616
typedef DispatchQueue = WorkStealingQueue<IDispatchObject>;
1717

@@ -57,9 +57,8 @@ class FixedThreadPool implements IThreadPool {
5757
function get_isShutDown():Bool return shutdownState.load() != Active;
5858

5959
final shutdownState:AtomicState<ShutdownState>;
60-
final cond:Condition;
60+
final semaphore:Semaphore;
6161
final pool:Array<Worker>;
62-
final activity:WorkerActivity;
6362
final queueTls:Tls<DispatchQueue>;
6463

6564
/**
@@ -70,12 +69,11 @@ class FixedThreadPool implements IThreadPool {
7069
throw new ThreadPoolException('FixedThreadPool needs threadsCount to be at least 1.');
7170
this.threadsCount = threadsCount;
7271
shutdownState = new AtomicState(Active);
73-
cond = new Condition();
72+
semaphore = new Semaphore(0);
7473
queueTls = new Tls();
7574
final queues = Vector.fromArrayCopy([for (_ in 0...threadsCount + 1) new WorkStealingQueue()]);
7675
queueTls.value = queues[0];
77-
activity = new WorkerActivity(threadsCount);
78-
pool = [for(i in 0...threadsCount) new Worker(cond, queueTls, queues, i + 1, activity)];
76+
pool = [for(i in 0...threadsCount) new Worker(semaphore, queueTls, queues, i + 1)];
7977
for (worker in pool) {
8078
worker.start();
8179
}
@@ -92,12 +90,7 @@ class FixedThreadPool implements IThreadPool {
9290
throw new ThreadPoolException('Task to run must not be null.');
9391
}
9492
queueTls.value.add(obj);
95-
if (cond.tryAcquire()) {
96-
// If no one holds onto the condition, notify someone.
97-
activity.hadEvent = true;
98-
cond.signal();
99-
cond.release();
100-
}
93+
semaphore.release();
10194
}
10295

10396
/**
@@ -112,10 +105,8 @@ class FixedThreadPool implements IThreadPool {
112105

113106
for (worker in pool) {
114107
worker.shutDown(shutdownSemaphore);
108+
semaphore.release();
115109
}
116-
cond.acquire();
117-
cond.broadcast();
118-
cond.release();
119110
if (block) {
120111
final ownQueue = queueTls.value;
121112
for (worker in pool) {
@@ -140,7 +131,6 @@ class FixedThreadPool implements IThreadPool {
140131
}
141132
Sys.println('\ttotal worker loops: $totalLoops');
142133
Sys.println('\ttotal worker dispatches: $totalDispatches');
143-
Sys.println('\tworkers (active/available/total): ${activity.activeWorkers}/${activity.availableWorkers}/${pool.length}');
144134
Sys.print('\tqueue 0: ');
145135
queueTls.value.dump();
146136
for (worker in pool) {
@@ -194,17 +184,15 @@ private class Worker {
194184
public var numLooped(default, null):Int;
195185

196186
var shutdownSemaphore:Null<Semaphore>;
197-
final cond:Condition;
187+
final semaphore:Semaphore;
198188
final queues:Vector<DispatchQueue>;
199189
final ownQueueIndex:Int;
200-
final activity:WorkerActivity;
201190
final queueTls:Tls<DispatchQueue>;
202191

203-
public function new(cond:Condition, queueTls:Tls<DispatchQueue>, queues:Vector<DispatchQueue>, ownQueueIndex:Int, activity:WorkerActivity) {
204-
this.cond = cond;
192+
public function new(semaphore:Semaphore, queueTls:Tls<DispatchQueue>, queues:Vector<DispatchQueue>, ownQueueIndex:Int) {
193+
this.semaphore = semaphore;
205194
this.queues = queues;
206195
this.ownQueueIndex = ownQueueIndex;
207-
this.activity = activity;
208196
this.queueTls = queueTls;
209197
numDispatched = 0;
210198
numLooped = 0;
@@ -260,46 +248,18 @@ private class Worker {
260248
inShutdown = false;
261249
}
262250

263-
// If we did nothing, wait for the condition variable.
264-
if (cond.tryAcquire()) {
265-
// An event could have come in between the queue checking and the acquire. In this
266-
// case, the condition might have been signalled to, but we would still go to sleep.
267-
if (activity.hadEvent) {
268-
activity.hadEvent = false;
269-
cond.release();
251+
if (shutdownSemaphore != null) {
252+
if (inShutdown) {
253+
semaphore.release();
254+
break;
255+
} else {
256+
inShutdown = true;
270257
continue;
271258
}
272-
if (shutdownSemaphore != null) {
273-
if (inShutdown) {
274-
--activity.activeWorkers;
275-
--activity.availableWorkers;
276-
cond.broadcast();
277-
cond.release();
278-
break;
279-
} else {
280-
inShutdown = true;
281-
cond.broadcast();
282-
cond.release();
283-
continue;
284-
}
285-
}
286-
if (activity.activeWorkers == 1) {
287-
// Always keep one thread alive until we can find a better solution to the
288-
// run/loop synchronization problem.
289-
cond.release();
290-
BackOff.backOff();
291-
continue;
292-
}
293-
// These modifications are fine because we hold onto the cond mutex.
294-
--activity.activeWorkers;
295-
state = Waiting;
296-
cond.wait();
297-
state = CheckingQueues;
298-
++activity.activeWorkers;
299-
cond.release();
300-
} else {
301-
BackOff.backOff();
302259
}
260+
state = Waiting;
261+
semaphore.acquire();
262+
state = CheckingQueues;
303263
}
304264
}
305265

0 commit comments

Comments
 (0)