@@ -975,7 +975,6 @@ class DefaultActorImplHeader : public HeapObject {
975975class DefaultActorImplFooter {
976976protected:
977977#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
978- using SimpleQueue = swift::SimpleQueue<Job *, JobQueueTraits>;
979978 using PriorityQueue = swift::PriorityQueue<Job *, JobQueueTraits>;
980979
981980 // When enqueued, jobs are atomically added to a linked list with the head
@@ -1080,9 +1079,13 @@ class DefaultActorImpl
10801079 // / new priority
10811080 void enqueueStealer (Job *job, JobPriority priority);
10821081
1083- // / Dequeues one job from `prioritizedJobs `.
1082+ // / Dequeues one job from `prioritisedJobs `.
10841083 // / The calling thread must be holding the actor lock while calling this
10851084 Job *drainOne ();
1085+
1086+ // / Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
1087+ // / Called with actor lock held on current thread.
1088+ void processIncomingQueue ();
10861089#endif
10871090
10881091 // / Check if the actor is actually a distributed *remote* actor.
@@ -1119,14 +1122,10 @@ class DefaultActorImpl
11191122 // / when actor gets a priority override and we schedule a stealer.
11201123 void scheduleActorProcessJob (JobPriority priority);
11211124
1122- // / Atomically takes a list of jobs from ActiveActorStatus, reversing them in
1123- // / the process. Returns jobs of mixed priorities in FIFO order.
1124- SimpleQueue collectJobs ();
1125-
1126- // / Check for new jobs in the incoming queue and move them to the
1127- // / processing queue.
1125+ // / Processes claimed incoming jobs into `prioritizedJobs`.
1126+ // / Incoming jobs are of mixed priorities and in LIFO order.
11281127 // / Called with actor lock held on current thread.
1129- void processJobs ( );
1128+ void handleUnprioritizedJobs (Job *head );
11301129#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
11311130
11321131 void deallocateUnconditional ();
@@ -1372,13 +1371,12 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
13721371
13731372}
13741373
1375- DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs () {
1374+ void DefaultActorImpl::processIncomingQueue () {
13761375 // Pairs with the store release in DefaultActorImpl::enqueue
13771376 bool distributedActorIsRemote = swift_distributed_actor_is_remote (this );
13781377 auto oldState = _status ().load (SWIFT_MEMORY_ORDER_CONSUME);
13791378 _swift_tsan_consume (this );
13801379
1381- SimpleQueue result;
13821380 // We must ensure that any jobs not seen by collectJobs() don't have any
13831381 // dangling references to the jobs that have been collected. For that we must
13841382 // atomically set head pointer to NULL. If it fails because more jobs have
@@ -1387,7 +1385,7 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
13871385 // If there aren't any new jobs in the incoming queue, we can return
13881386 // immediately without updating the status.
13891387 if (!oldState.getFirstUnprioritisedJob ()) {
1390- return result ;
1388+ return ;
13911389 }
13921390 assert (oldState.isAnyRunning ());
13931391
@@ -1405,28 +1403,26 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
14051403 }
14061404 }
14071405
1408- // Collect jobs, reversing them in the process
1409- auto job = oldState.getFirstUnprioritisedJob ();
1410- while (job) {
1411- auto next = getNextJob (job);
1412- result.prepend (job);
1413- job = next;
1414- }
1415-
1416- return result;
1406+ handleUnprioritizedJobs (oldState.getFirstUnprioritisedJob ());
14171407}
14181408
14191409// Called with actor lock held on current thread
1420- void DefaultActorImpl::processJobs () {
1421- SimpleQueue jobs = collectJobs ();
1422- prioritizedJobs.enqueueContentsOf (jobs.head );
1410+ void DefaultActorImpl::handleUnprioritizedJobs (Job *head) {
1411+ // Reverse jobs from LIFO to FIFO order
1412+ Job *reversed = nullptr ;
1413+ while (head) {
1414+ auto next = getNextJob (head);
1415+ setNextJob (head, reversed);
1416+ reversed = head;
1417+ head = next;
1418+ }
1419+ prioritizedJobs.enqueueContentsOf (reversed);
14231420}
14241421
14251422// Called with actor lock held on current thread
14261423Job *DefaultActorImpl::drainOne () {
14271424 SWIFT_TASK_DEBUG_LOG (" Draining one job from default actor %p" , this );
14281425
1429- processJobs ();
14301426 traceJobQueue (this , prioritizedJobs.peek ());
14311427 auto firstJob = prioritizedJobs.dequeue ();
14321428 if (!firstJob) {
@@ -1490,39 +1486,40 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
14901486 TaskExecutorRef::undefined ());
14911487
14921488 while (true ) {
1493- if (shouldYieldThread ()) {
1494- currentActor->unlock (true );
1495- break ;
1496- }
1497-
14981489 Job *job = currentActor->drainOne ();
14991490 if (job == NULL ) {
15001491 // No work left to do, try unlocking the actor. This may fail if there is
15011492 // work concurrently enqueued in which case, we'd try again in the loop
1502- if (!currentActor->unlock (false )) {
1503- continue ;
1493+ if (currentActor->unlock (false )) {
1494+ break ;
1495+ }
1496+ } else {
1497+ if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1498+ auto taskExecutor = task->getPreferredTaskExecutor ();
1499+ trackingInfo.setTaskExecutor (taskExecutor);
15041500 }
1505- break ;
1506- }
15071501
1508- if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1509- auto taskExecutor = task->getPreferredTaskExecutor ();
1510- trackingInfo.setTaskExecutor (taskExecutor);
1502+ // This thread is now going to follow the task on this actor. It may hop off
1503+ // the actor
1504+ runJobInEstablishedExecutorContext (job);
1505+
1506+ // We could have come back from the job on a generic executor and not as
1507+ // part of a default actor. If so, there is no more work left for us to do
1508+ // here.
1509+ auto currentExecutor = trackingInfo.getActiveExecutor ();
1510+ if (!currentExecutor.isDefaultActor ()) {
1511+ currentActor = nullptr ;
1512+ break ;
1513+ }
1514+ currentActor = asImpl (currentExecutor.getDefaultActor ());
15111515 }
15121516
1513- // This thread is now going to follow the task on this actor. It may hop off
1514- // the actor
1515- runJobInEstablishedExecutorContext (job);
1516-
1517- // We could have come back from the job on a generic executor and not as
1518- // part of a default actor. If so, there is no more work left for us to do
1519- // here.
1520- auto currentExecutor = trackingInfo.getActiveExecutor ();
1521- if (!currentExecutor.isDefaultActor ()) {
1522- currentActor = nullptr ;
1517+ if (shouldYieldThread ()) {
1518+ currentActor->unlock (true );
15231519 break ;
15241520 }
1525- currentActor = asImpl (currentExecutor.getDefaultActor ());
1521+
1522+ currentActor->processIncomingQueue ();
15261523 }
15271524
15281525 // Leave the tracking info.
@@ -1668,6 +1665,17 @@ retry:;
16681665 auto newState = oldState.withRunning ();
16691666 newState = newState.withoutEscalatedPriority ();
16701667
1668+ // Claim incoming jobs when obtaining lock as a drainer, to save one
1669+ // round of atomic load and compare-exchange.
1670+ // This is not useful when obtaining lock for assuming thread during actor
1671+ // switching, because arbitrary use code can run between locking and
1672+ // draining the next job. So we still need to call processIncomingQueue() to
1673+ // check for higher priority jobs that could have been scheduled in the
1674+ // meantime. And processing is more efficient when done in larger batches.
1675+ if (asDrainer) {
1676+ newState = newState.withFirstUnprioritisedJob (nullptr );
1677+ }
1678+
16711679 // This needs an acquire since we are taking a lock
16721680 if (_status ().compare_exchange_weak (oldState, newState,
16731681 std::memory_order_acquire,
@@ -1677,6 +1685,9 @@ retry:;
16771685 assert (prioritizedJobs.empty ());
16781686 }
16791687 traceActorStateTransition (this , oldState, newState, distributedActorIsRemote);
1688+ if (asDrainer) {
1689+ handleUnprioritizedJobs (oldState.getFirstUnprioritisedJob ());
1690+ }
16801691 return true ;
16811692 }
16821693 }
0 commit comments