46
46
import java .util .concurrent .ScheduledExecutorService ;
47
47
import java .util .concurrent .TimeUnit ;
48
48
import java .util .concurrent .atomic .AtomicBoolean ;
49
+ import java .util .concurrent .locks .Lock ;
50
+ import java .util .concurrent .locks .ReentrantLock ;
49
51
import java .util .function .Supplier ;
50
52
51
53
import com .oracle .graal .python .PythonLanguage ;
55
57
import com .oracle .truffle .api .CompilerDirectives ;
56
58
import com .oracle .truffle .api .Truffle ;
57
59
import com .oracle .truffle .api .TruffleLanguage ;
60
+ import com .oracle .truffle .api .CompilerDirectives .TruffleBoundary ;
58
61
import com .oracle .truffle .api .frame .VirtualFrame ;
59
62
import com .oracle .truffle .api .nodes .Node .Child ;
60
63
import com .oracle .truffle .api .nodes .RootNode ;
@@ -90,9 +93,9 @@ default int frameIndex() {
90
93
}
91
94
92
95
private final ScheduledExecutorService executorService = Executors .newScheduledThreadPool (2 );
93
- private ConcurrentLinkedQueue <AsyncAction > scheduledActions = new ConcurrentLinkedQueue <>();
94
- private AtomicBoolean hasScheduledAction = new AtomicBoolean (false );
95
- private AtomicBoolean executingActions = new AtomicBoolean ( false );
96
+ private final ConcurrentLinkedQueue <AsyncAction > scheduledActions = new ConcurrentLinkedQueue <>();
97
+ private final AtomicBoolean hasScheduledAction = new AtomicBoolean (false );
98
+ private final Lock executingScheduledActions = new ReentrantLock ( );
96
99
private static final int ASYNC_ACTION_DELAY = 15 ; // chosen by a fair D20 dice roll
97
100
98
101
private class AsyncRunnable implements Runnable {
@@ -105,8 +108,15 @@ public AsyncRunnable(Supplier<AsyncAction> actionSupplier) {
105
108
public void run () {
106
109
AsyncAction asyncAction = actionSupplier .get ();
107
110
if (asyncAction != null ) {
108
- scheduledActions .add (asyncAction );
109
- hasScheduledAction .set (true );
111
+ // If there's thread executing scheduled actions right now,
112
+ // we wait until adding the next work item
113
+ executingScheduledActions .lock ();
114
+ try {
115
+ scheduledActions .add (asyncAction );
116
+ hasScheduledAction .set (true );
117
+ } finally {
118
+ executingScheduledActions .unlock ();
119
+ }
110
120
}
111
121
}
112
122
}
@@ -144,10 +154,23 @@ void registerAction(Supplier<AsyncAction> actionSupplier) {
144
154
}
145
155
146
156
void triggerAsyncActions () {
147
- if (executingActions .compareAndSet (false , true )) {
148
- if (hasScheduledAction .compareAndSet (true , false )) {
149
- CompilerDirectives .transferToInterpreter ();
150
- // TODO: (tfel) - for now all async actions are slow path
157
+ if (hasScheduledAction .compareAndSet (true , false )) {
158
+ processAsyncActions ();
159
+ }
160
+ }
161
+
162
+ @ TruffleBoundary
163
+ private void processAsyncActions () {
164
+ // We'll likely be able to get the lock and start working through the async actions. But
165
+ // there could be a race between the atomic switch of the hasScheduledAction flag, an
166
+ // AsyncRunnable thread and the acquisition of the lock, so a second thread may end up
167
+ // clearing the flag again. In that case, both would get here, but only will get the lock
168
+ // and the other will skip over this and return. In any case, by the time a thread has
169
+ // this lock and is handling async actions, nothing new will be pushed to the
170
+ // scheduledActions queue, so we won't have a race between finishing the while loop and
171
+ // returning from this method.
172
+ if (executingScheduledActions .tryLock ()) {
173
+ try {
151
174
ConcurrentLinkedQueue <AsyncAction > actions = scheduledActions ;
152
175
AsyncAction action ;
153
176
while ((action = actions .poll ()) != null ) {
@@ -169,8 +192,9 @@ void triggerAsyncActions() {
169
192
}
170
193
}
171
194
}
195
+ } finally {
196
+ executingScheduledActions .unlock ();
172
197
}
173
- executingActions .set (false );
174
198
}
175
199
}
176
200
}
0 commit comments