Skip to content

Commit 6d2f9c0

Browse files
authored
Refactor task callback handling for cancellation/continuation (#111)
* step 1 * step 2
1 parent b21dddd commit 6d2f9c0

File tree

4 files changed

+109
-155
lines changed

4 files changed

+109
-155
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package hxcoro.concurrent;
2+
3+
import hxcoro.concurrent.AtomicState;
4+
import hxcoro.concurrent.BackOff;
5+
6+
enum abstract CallbacksState(Int) to Int {
7+
final Ready;
8+
final Modifying;
9+
final Finished;
10+
}
11+
12+
abstract class ThreadSafeCallbacks<Element, HandleIn : HandleOut, HandleOut> {
13+
var handles:Null<Array<HandleIn>>;
14+
final execute:HandleIn -> Void;
15+
final state:AtomicState<CallbacksState>;
16+
17+
function new(execute:HandleIn -> Void) {
18+
handles = null;
19+
this.execute = execute;
20+
state = new AtomicState(Ready);
21+
}
22+
23+
// single threaded
24+
25+
public function run() {
26+
while (true) {
27+
switch (state.compareExchange(Ready, Finished)) {
28+
case Ready:
29+
break;
30+
case Modifying:
31+
BackOff.backOff();
32+
case Finished:
33+
// already done
34+
return;
35+
}
36+
}
37+
final handles = handles;
38+
if (handles == null) {
39+
return;
40+
}
41+
this.handles = null;
42+
for (handle in handles) {
43+
// TODO: should we catch errors from the callbacks here?
44+
execute(handle);
45+
}
46+
}
47+
48+
// thread-safe
49+
50+
abstract function createHandle(element:Element):HandleIn;
51+
52+
public function add(element:Element):Null<HandleOut> {
53+
final handle = createHandle(element);
54+
while (true) {
55+
switch (state.compareExchange(Ready, Modifying)) {
56+
case Ready:
57+
break;
58+
case Modifying:
59+
BackOff.backOff();
60+
case Finished:
61+
execute(handle);
62+
return null;
63+
}
64+
}
65+
handles ??= [];
66+
handles.push(handle);
67+
state.store(Ready);
68+
return handle;
69+
}
70+
71+
public function remove(handle:HandleIn) {
72+
while (true) {
73+
switch (state.compareExchange(Ready, Modifying)) {
74+
case Ready:
75+
break;
76+
case Modifying:
77+
BackOff.backOff();
78+
case Finished:
79+
// already cleared, nothing to do
80+
return;
81+
}
82+
}
83+
if (handles != null) {
84+
handles.remove(handle);
85+
}
86+
state.store(Ready);
87+
}
88+
}

src/hxcoro/task/AbstractTask.hx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package hxcoro.task;
22

3-
import hxcoro.concurrent.AtomicObject;
4-
import hxcoro.concurrent.AtomicState;
5-
import hxcoro.concurrent.AtomicInt;
3+
import haxe.Exception;
64
import haxe.coro.cancellation.ICancellationCallback;
75
import haxe.coro.cancellation.ICancellationHandle;
86
import haxe.coro.cancellation.ICancellationToken;
97
import haxe.exceptions.CancellationException;
10-
import haxe.Exception;
8+
import hxcoro.concurrent.AtomicInt;
9+
import hxcoro.concurrent.AtomicObject;
10+
import hxcoro.concurrent.AtomicState;
1111

1212
@:using(AbstractTask.TaskStateTools)
1313
enum abstract TaskState(Int) to Int {
@@ -155,7 +155,7 @@ abstract class AbstractTask implements ICancellationToken {
155155
}
156156

157157
public function onCancellationRequested(callback:ICancellationCallback):ICancellationHandle {
158-
return cancellationManager.addCallback(callback);
158+
return cancellationManager.add(callback);
159159
}
160160

161161
/**

src/hxcoro/task/CoroBaseTask.hx

Lines changed: 8 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import haxe.coro.context.Key;
1818
import haxe.coro.context.IElement;
1919
import haxe.coro.dispatchers.Dispatcher;
2020
import haxe.coro.cancellation.CancellationToken;
21+
import hxcoro.concurrent.ThreadSafeCallbacks;
2122

2223
private class CoroTaskWith<T> implements ICoroNodeWith {
2324
public var context(get, null):Context;
@@ -51,64 +52,13 @@ private class CoroTaskWith<T> implements ICoroNodeWith {
5152
return task.without(...keys);
5253
}
5354
}
54-
enum abstract AggregatorState(Int) to Int {
55-
final Ready;
56-
final Modifying;
57-
final Finished;
58-
}
59-
60-
class ThreadSafeAggregator<T> {
61-
var array:Null<Array<T>>;
62-
final func:T -> Void;
63-
final state:AtomicState<AggregatorState>;
64-
65-
public function new(func:T -> Void) {
66-
array = null;
67-
this.func = func;
68-
state = new AtomicState(Ready);
69-
}
70-
71-
// single threaded
72-
73-
public function run() {
74-
while (true) {
75-
switch (state.compareExchange(Ready, Finished)) {
76-
case Ready:
77-
break;
78-
case Modifying:
79-
BackOff.backOff();
80-
case Finished:
81-
// already done
82-
return;
83-
}
84-
}
85-
final array = array;
86-
if (array == null) {
87-
return;
88-
}
89-
this.array = null;
90-
for (element in array) {
91-
func(element);
92-
}
55+
class TaskContinuationManager extends ThreadSafeCallbacks<IContinuation<Any>, IContinuation<Any>, IContinuation<Any>> {
56+
public function new(task:CoroBaseTask<Any>) {
57+
super(handle -> handle.resume(task.get(), task.getError()));
9358
}
9459

95-
// thread-safe
96-
97-
public function add(element:T) {
98-
while (true) {
99-
switch (state.compareExchange(Ready, Modifying)) {
100-
case Ready:
101-
break;
102-
case Modifying:
103-
BackOff.backOff();
104-
case Finished:
105-
func(element);
106-
return;
107-
}
108-
}
109-
array ??= [];
110-
array.push(element);
111-
state.store(Ready);
60+
function createHandle(element:IContinuation<Any>) {
61+
return element;
11262
}
11363
}
11464

@@ -124,7 +74,7 @@ abstract class CoroBaseTask<T> extends AbstractTask implements ICoroNode impleme
12474
public var context(get, null):Context;
12575

12676
final nodeStrategy:INodeStrategy;
127-
final awaitingContinuations:ThreadSafeAggregator<IContinuation<T>>;
77+
final awaitingContinuations:TaskContinuationManager;
12878
final awaitingChildContinuation:AtomicObject<Null<IContinuation<Any>>>;
12979
var result:Null<T>;
13080

@@ -135,9 +85,7 @@ abstract class CoroBaseTask<T> extends AbstractTask implements ICoroNode impleme
13585
final parent = context.get(CoroBaseTask);
13686
this.context = context.clone().with(this).set(CancellationToken, this);
13787
this.nodeStrategy = nodeStrategy;
138-
awaitingContinuations = new ThreadSafeAggregator<IContinuation<T>>(cont ->
139-
cont.resume(result, error.load())
140-
);
88+
awaitingContinuations = new TaskContinuationManager(this);
14189
awaitingChildContinuation = new AtomicObject(null);
14290
super(parent, initialState);
14391
}
Lines changed: 8 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,16 @@
11
package hxcoro.task;
22

3-
import hxcoro.concurrent.AtomicState;
4-
import hxcoro.concurrent.BackOff;
53
import haxe.coro.cancellation.ICancellationCallback;
64
import haxe.coro.cancellation.ICancellationHandle;
7-
8-
private class NoOpCancellationHandle implements ICancellationHandle {
9-
public function new() {}
10-
11-
public function close() {}
12-
}
5+
import hxcoro.concurrent.AtomicState;
6+
import hxcoro.concurrent.ThreadSafeCallbacks;
137

148
private enum abstract HandleState(Int) to Int {
159
final Open;
1610
final Closed;
1711
}
1812

1913
class CancellationHandle implements ICancellationHandle {
20-
21-
static public final noOpCancellationHandle = new NoOpCancellationHandle();
22-
2314
final callback:ICancellationCallback;
2415
final manager:TaskCancellationManager;
2516
var closed:AtomicState<HandleState>;
@@ -30,12 +21,12 @@ class CancellationHandle implements ICancellationHandle {
3021
closed = new AtomicState(Open);
3122
}
3223

33-
public function run() {
24+
public function run(task:AbstractTask) {
3425
if (closed.compareExchange(Open, Closed) != Open) {
3526
return;
3627
}
3728

38-
final error = manager.task.getError();
29+
final error = task.getError();
3930
callback.onCancellation(error.orCancellationException());
4031
}
4132

@@ -46,85 +37,12 @@ class CancellationHandle implements ICancellationHandle {
4637
}
4738
}
4839

49-
private enum abstract ManagerState(Int) to Int {
50-
final Ready;
51-
final Modifying;
52-
final Finished;
53-
}
54-
55-
class TaskCancellationManager {
56-
public final task:AbstractTask;
57-
final state:AtomicState<ManagerState>;
58-
var handles:Null<Array<CancellationHandle>>;
59-
40+
class TaskCancellationManager extends ThreadSafeCallbacks<ICancellationCallback, CancellationHandle, ICancellationHandle> {
6041
public function new(task:AbstractTask) {
61-
this.task = task;
62-
handles = null;
63-
state = new AtomicState(Ready);
64-
}
65-
66-
// single-threaded
67-
68-
public function run() {
69-
while (true) {
70-
switch (state.compareExchange(Ready, Finished)) {
71-
case Ready:
72-
break;
73-
case Modifying:
74-
BackOff.backOff();
75-
case Finished:
76-
// already done
77-
return;
78-
}
79-
}
80-
final handles = handles;
81-
if (handles == null) {
82-
return;
83-
}
84-
this.handles = null;
85-
for (handle in handles) {
86-
// TODO: should we catch errors from the callbacks here?
87-
handle.run();
88-
}
89-
42+
super(handle -> handle.run(task));
9043
}
9144

92-
// thread-safe
93-
94-
public function addCallback(callback:ICancellationCallback):ICancellationHandle {
95-
final handle = new CancellationHandle(callback, this);
96-
while (true) {
97-
switch (state.compareExchange(Ready, Modifying)) {
98-
case Ready:
99-
break;
100-
case Modifying:
101-
BackOff.backOff();
102-
case Finished:
103-
handle.run();
104-
return CancellationHandle.noOpCancellationHandle;
105-
}
106-
}
107-
handles ??= [];
108-
handles.push(handle);
109-
state.store(Ready);
110-
return handle;
111-
}
112-
113-
public function remove(handle:CancellationHandle) {
114-
while (true) {
115-
switch (state.compareExchange(Ready, Modifying)) {
116-
case Ready:
117-
break;
118-
case Modifying:
119-
BackOff.backOff();
120-
case Finished:
121-
// already cleared, nothing to do
122-
return;
123-
}
124-
}
125-
if (handles != null) {
126-
handles.remove(handle);
127-
}
128-
state.store(Ready);
45+
function createHandle(element:ICancellationCallback) {
46+
return new CancellationHandle(element, this);
12947
}
13048
}

0 commit comments

Comments
 (0)