@@ -1137,8 +1137,8 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
1137
1137
}
1138
1138
1139
1139
// This needs to be a store release so that we also publish the contents of
1140
- // the new Job we are adding to the atomic job queue. Pairs with load
1141
- // acquire in drainOne.
1140
+ // the new Job we are adding to the atomic job queue. Pairs with consume
1141
+ // in drainOne.
1142
1142
if (_status ().compare_exchange_weak (oldState, newState,
1143
1143
/* success */ std::memory_order_release,
1144
1144
/* failure */ std::memory_order_relaxed)) {
@@ -1180,13 +1180,13 @@ Job * DefaultActorImpl::drainOne() {
1180
1180
SWIFT_TASK_DEBUG_LOG (" Draining one job from default actor %p" , this );
1181
1181
1182
1182
// Pairs with the store release in DefaultActorImpl::enqueue
1183
- auto oldState = _status ().load (std::memory_order_acquire);
1183
+ auto oldState = _status ().load (SWIFT_MEMORY_ORDER_CONSUME);
1184
+ _swift_tsan_consume (this );
1184
1185
1185
1186
auto jobToPreprocessFrom = oldState.getFirstJob ();
1186
1187
Job *firstJob = preprocessQueue (jobToPreprocessFrom);
1187
1188
traceJobQueue (this , firstJob);
1188
1189
1189
- _swift_tsan_release (this );
1190
1190
while (true ) {
1191
1191
assert (oldState.isAnyRunning ());
1192
1192
@@ -1200,8 +1200,8 @@ Job * DefaultActorImpl::drainOne() {
1200
1200
// Dequeue the first job and set up a new head
1201
1201
newState = newState.withFirstJob (getNextJobInQueue (firstJob));
1202
1202
if (_status ().compare_exchange_weak (oldState, newState,
1203
- /* success */ std::memory_order_release ,
1204
- /* failure */ std::memory_order_acquire )) {
1203
+ /* success */ std::memory_order_relaxed ,
1204
+ /* failure */ std::memory_order_relaxed )) {
1205
1205
SWIFT_TASK_DEBUG_LOG (" Drained first job %p from actor %p" , firstJob, this );
1206
1206
traceActorStateTransition (this , oldState, newState);
1207
1207
concurrency::trace::actor_dequeue (this , firstJob);
@@ -1393,8 +1393,6 @@ retry:;
1393
1393
SWIFT_TASK_DEBUG_LOG (" Thread attempting to jump onto %p, as drainer = %d" , this , asDrainer);
1394
1394
#endif
1395
1395
1396
- // Note: This doesn't have to be a load acquire because the jobQueue is part
1397
- // of the same atomic.
1398
1396
auto oldState = _status ().load (std::memory_order_relaxed);
1399
1397
while (true ) {
1400
1398
@@ -1446,9 +1444,13 @@ retry:;
1446
1444
assert (!oldState.getFirstJob ());
1447
1445
}
1448
1446
1447
+ // Taking the drain lock clears the max priority escalated bit because we've
1448
+ // already represented the current max priority of the actor on the thread.
1449
1449
auto newState = oldState.withRunning ();
1450
+
1451
+ // This needs an acquire since we are taking a lock
1450
1452
if (_status ().compare_exchange_weak (oldState, newState,
1451
- std::memory_order_relaxed ,
1453
+ std::memory_order_acquire ,
1452
1454
std::memory_order_relaxed)) {
1453
1455
_swift_tsan_acquire (this );
1454
1456
traceActorStateTransition (this , oldState, newState);
@@ -1527,9 +1529,11 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
1527
1529
newState = newState.resetPriority ();
1528
1530
}
1529
1531
1532
+ // This needs to be a release since we are unlocking a lock
1530
1533
if (_status ().compare_exchange_weak (oldState, newState,
1531
- /* success */ std::memory_order_relaxed ,
1534
+ /* success */ std::memory_order_release ,
1532
1535
/* failure */ std::memory_order_relaxed)) {
1536
+ _swift_tsan_release (this );
1533
1537
traceActorStateTransition (this , oldState, newState);
1534
1538
1535
1539
if (newState.isScheduled ()) {
0 commit comments