@@ -981,6 +981,35 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
981981 }
982982};
983983
984+ #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
985+
986+ // / Given that a job is enqueued normally on a default actor, get/set
987+ // / the next job in the actor's queue.
988+ static JobRef getNextJobInQueue (Job *job) {
989+ return *reinterpret_cast <JobRef *>(job->SchedulerPrivate );
990+ }
991+ static void setNextJobInQueue (Job *job, JobRef next) {
992+ *reinterpret_cast <JobRef *>(job->SchedulerPrivate ) = next;
993+ }
994+
995+ namespace {
996+
997+ struct JobQueueTraits {
998+ static Job *getNext (Job *job) {
999+ return getNextJobInQueue (job).getAsPreprocessedJob ();
1000+ }
1001+ static void setNext (Job *job, Job *next) {
1002+ setNextJobInQueue (job, JobRef::getPreprocessed (next));
1003+ }
1004+ static int compare (Job *lhs, Job *rhs) {
1005+ return descendingPriorityOrder (lhs->getPriority (), rhs->getPriority ());
1006+ }
1007+ };
1008+
1009+ } // end anonymous namespace
1010+
1011+ #endif
1012+
9841013#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
9851014#define ACTIVE_ACTOR_STATUS_SIZE (4 * (sizeof (uintptr_t )))
9861015#else
@@ -1052,10 +1081,13 @@ class DefaultActorImpl : public HeapObject {
10521081 // enforce alignment. This is space that is available for us to use in
10531082 // the future
10541083 alignas (sizeof (ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
1084+
1085+ using ListMerger = swift::ListMerger<Job *, JobQueueTraits>;
1086+ ListMerger::LastInsertionPoint lastInsertionPoint =
1087+ ListMerger::LastInsertionPoint ();
10551088#endif
10561089 // TODO (rokhinip): Make this a flagset
10571090 bool isDistributedRemoteActor;
1058-
10591091public:
10601092 // / Properly construct an actor, except for the heap header.
10611093 void initialize (bool isDistributedRemote = false ) {
@@ -1128,6 +1160,10 @@ class DefaultActorImpl : public HeapObject {
11281160 // / It can be done when actor transitions from Idle to Scheduled or
11291161 // / when actor gets a priority override and we schedule a stealer.
11301162 void scheduleActorProcessJob (JobPriority priority);
1163+
1164+ Job *preprocessQueue (JobRef start);
1165+ Job *preprocessQueue (JobRef unprocessedStart, JobRef unprocessedEnd,
1166+ Job *existingProcessedJobsToMergeInto);
11311167#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
11321168
11331169 void deallocateUnconditional ();
@@ -1203,31 +1239,6 @@ static NonDefaultDistributedActorImpl *asImpl(NonDefaultDistributedActor *actor)
12031239/* ****************************************************************************/
12041240
12051241#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
1206- // / Given that a job is enqueued normally on a default actor, get/set
1207- // / the next job in the actor's queue.
1208- static JobRef getNextJobInQueue (Job *job) {
1209- return *reinterpret_cast <JobRef*>(job->SchedulerPrivate );
1210- }
1211- static void setNextJobInQueue (Job *job, JobRef next) {
1212- *reinterpret_cast <JobRef*>(job->SchedulerPrivate ) = next;
1213- }
1214-
1215- namespace {
1216-
1217- struct JobQueueTraits {
1218- static Job *getNext (Job *job) {
1219- return getNextJobInQueue (job).getAsPreprocessedJob ();
1220- }
1221- static void setNext (Job *job, Job *next) {
1222- setNextJobInQueue (job, JobRef::getPreprocessed (next));
1223- }
1224- static int compare (Job *lhs, Job *rhs) {
1225- return descendingPriorityOrder (lhs->getPriority (), rhs->getPriority ());
1226- }
1227- };
1228-
1229- } // end anonymous namespace
1230-
12311242
12321243// Called with the actor drain lock held
12331244//
@@ -1240,15 +1251,14 @@ struct JobQueueTraits {
12401251// and the previous start. We can then process these jobs and merge them into
12411252// the already processed list of jobs from the previous iteration of
12421253// preprocessQueue
1243- static Job *
1244- preprocessQueue (JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingProcessedJobsToMergeInto)
1245- {
1254+ Job *DefaultActorImpl::preprocessQueue (JobRef unprocessedStart,
1255+ JobRef unprocessedEnd,
1256+ Job *existingProcessedJobsToMergeInto) {
12461257 assert (existingProcessedJobsToMergeInto != NULL );
12471258 assert (unprocessedStart.needsPreprocessing ());
12481259 assert (unprocessedStart.getAsJob () != unprocessedEnd.getAsJob ());
12491260
12501261 // Build up a list of jobs we need to preprocess
1251- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
12521262 ListMerger jobsToProcess;
12531263
12541264 // Get just the prefix list of unprocessed jobs
@@ -1263,19 +1273,20 @@ preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingPro
12631273 }
12641274
12651275 // Finish processing the unprocessed jobs
1266- Job *newProcessedJobs = jobsToProcess.release ();
1276+ Job *newProcessedJobs = std::get< 0 >( jobsToProcess.release () );
12671277 assert (newProcessedJobs);
12681278
1269- ListMerger mergedList (existingProcessedJobsToMergeInto);
1279+ ListMerger mergedList (existingProcessedJobsToMergeInto, lastInsertionPoint );
12701280 mergedList.merge (newProcessedJobs);
1271- return mergedList.release ();
1281+ Job *result;
1282+ std::tie (result, lastInsertionPoint) = mergedList.release ();
1283+ return result;
12721284}
12731285
12741286// Called with the actor drain lock held.
12751287//
12761288// Preprocess the queue starting from the top
1277- static Job *
1278- preprocessQueue (JobRef start) {
1289+ Job *DefaultActorImpl::preprocessQueue (JobRef start) {
12791290 if (!start) {
12801291 return NULL ;
12811292 }
@@ -1288,7 +1299,6 @@ preprocessQueue(JobRef start) {
12881299 // There exist some jobs which haven't been preprocessed
12891300
12901301 // Build up a list of jobs we need to preprocess
1291- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
12921302 ListMerger jobsToProcess;
12931303
12941304 Job *wellFormedListStart = NULL ;
@@ -1311,18 +1321,19 @@ preprocessQueue(JobRef start) {
13111321 }
13121322
13131323 // Finish processing the unprocessed jobs
1314- auto processedJobHead = jobsToProcess.release ();
1324+ auto processedJobHead = std::get< 0 >( jobsToProcess.release () );
13151325 assert (processedJobHead);
13161326
13171327 Job *firstJob = NULL ;
13181328 if (wellFormedListStart) {
13191329 // Merge it with already known well formed list if we have one.
1320- ListMerger mergedList (wellFormedListStart);
1330+ ListMerger mergedList (wellFormedListStart, lastInsertionPoint );
13211331 mergedList.merge (processedJobHead);
1322- firstJob = mergedList.release ();
1332+ std::tie ( firstJob, lastInsertionPoint) = mergedList.release ();
13231333 } else {
13241334 // Nothing to merge with, just return the head we already have
13251335 firstJob = processedJobHead;
1336+ lastInsertionPoint = ListMerger::LastInsertionPoint ();
13261337 }
13271338
13281339 return firstJob;
@@ -1528,6 +1539,7 @@ Job * DefaultActorImpl::drainOne() {
15281539 if (_status ().compare_exchange_weak (oldState, newState,
15291540 /* success */ std::memory_order_relaxed,
15301541 /* failure */ std::memory_order_relaxed)) {
1542+ lastInsertionPoint.nodeWasRemoved (firstJob);
15311543 SWIFT_TASK_DEBUG_LOG (" Drained first job %p from actor %p" , firstJob, this );
15321544 traceActorStateTransition (this , oldState, newState, distributedActorIsRemote);
15331545 concurrency::trace::actor_dequeue (this , firstJob);
0 commit comments