Skip to content

Commit 3aa04db

Browse files
committed
Track whether a task is actively running.
Tracking this as a single bit is actually largely uninteresting to the runtime. To handle priority escalation properly, we really need to track this at a finer grain of detail: recording that the task is running on a specific thread, enqueued on a specific actor, or so on. But starting by tracking a single bit is important for two reasons: - First, it's more realistic about the performance overheads of tasks: we're going to be doing this tracking eventually, and the cost of that tracking will be dominated by the atomic access, so doing that access now sets the baseline about right. - Second, it ensures that we've actually got runtime involvement in all the right places to do this tracking. A propos of the latter: there was no runtime involvement with awaiting a continuation, which is a point at which the task potentially transitions from running to suspended. We must do the tracking as part of this transition, rather than recognizing in the run-loops that a task is still active and treating it as having suspended, because the latter point potentially races with the resumption of the task. To do this, I've had to introduce a runtime function, swift_continuation_await, to do this awaiting rather than inlining the atomic operation on the continuation. As part of doing this work, I've also fixed a bug where we failed to load-acquire in swift_task_escalate before walking the task status records to invoke escalation actions. I've also fixed several places where the handling of task statuses may have accidentally allowed the task to revert to uncancelled.
1 parent e223a4f commit 3aa04db

File tree

14 files changed

+401
-167
lines changed

14 files changed

+401
-167
lines changed

include/swift/ABI/Task.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,25 @@ class AsyncTask : public Job {
260260
void runInFullyEstablishedContext() {
261261
return ResumeTask(ResumeContext); // 'return' forces tail call
262262
}
263-
263+
264+
/// Flag that this task is now running. This can update
265+
/// the priority stored in the job flags if the priority has been
266+
/// escalated.
267+
///
268+
/// Generally this should be done immediately after updating
269+
/// ActiveTask.
270+
void flagAsRunning();
271+
void flagAsRunning_slow();
272+
273+
/// Flag that this task is now suspended. This can update the
274+
/// priority stored in the job flags if the priority hsa been
275+
/// escalated. Generally this should be done immediately after
276+
/// clearing ActiveTask and immediately before enqueuing the task
277+
/// somewhere. TODO: record where the task is enqueued if
278+
/// possible.
279+
void flagAsSuspended();
280+
void flagAsSuspended_slow();
281+
264282
/// Check whether this task has been cancelled.
265283
/// Checking this is, of course, inherently race-prone on its own.
266284
bool isCancelled() const;

include/swift/Runtime/Concurrency.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -605,20 +605,28 @@ void swift_defaultActor_enqueue(Job *job, DefaultActor *actor);
605605
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
606606
bool swift_distributed_actor_is_remote(DefaultActor *actor);
607607

608+
/// Do a primitive suspension of the current task, as if part of
609+
/// a continuation, although this does not provide any of the
610+
/// higher-level continuation semantics. The current task is returned;
611+
/// its ResumeFunction and ResumeContext will need to be initialized,
612+
/// and then it will need to be enqueued or run as a job later.
613+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
614+
AsyncTask *swift_task_suspend();
615+
608616
/// Prepare a continuation in the current task.
609617
///
610618
/// The caller should initialize the Parent, ResumeParent,
611619
/// and NormalResult fields. This function will initialize the other
612-
/// fields with appropriate defaaults; the caller may then overwrite
620+
/// fields with appropriate defaults; the caller may then overwrite
613621
/// them if desired.
614-
///
615-
/// This function is provided as a code-size and runtime-usage
616-
/// optimization; calling it is not required if code is willing to
617-
/// do all its work inline.
618622
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
619623
AsyncTask *swift_continuation_init(ContinuationAsyncContext *context,
620624
AsyncContinuationFlags flags);
621625

626+
/// Await an initialized continuation.
627+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync)
628+
void swift_continuation_await(ContinuationAsyncContext *continuationContext);
629+
622630
/// Resume a task from a non-throwing continuation, given a normal
623631
/// result which has already been stored into the continuation.
624632
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)

include/swift/Runtime/RuntimeFunctions.def

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,6 +1582,14 @@ FUNCTION(ContinuationInit,
15821582
ARGS(ContinuationAsyncContextPtrTy, SizeTy),
15831583
ATTRS(NoUnwind))
15841584

1585+
// void swift_continuation_await(AsyncContext *continuationContext);
1586+
FUNCTION(ContinuationAwait,
1587+
swift_continuation_await, SwiftAsyncCC,
1588+
ConcurrencyAvailability,
1589+
RETURNS(VoidTy),
1590+
ARGS(ContinuationAsyncContextPtrTy),
1591+
ATTRS(NoUnwind))
1592+
15851593
// void swift_continuation_resume(AsyncTask *continuation);
15861594
FUNCTION(ContinuationResume,
15871595
swift_continuation_resume, SwiftCC,

lib/IRGen/GenConcurrency.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "ExtraInhabitants.h"
2222
#include "GenProto.h"
2323
#include "GenType.h"
24+
#include "IRGenDebugInfo.h"
2425
#include "IRGenFunction.h"
2526
#include "IRGenModule.h"
2627
#include "LoadableTypeInfo.h"
@@ -241,3 +242,34 @@ void irgen::emitDestroyTaskGroup(IRGenFunction &IGF, llvm::Value *group) {
241242

242243
IGF.Builder.CreateLifetimeEnd(group);
243244
}
245+
246+
llvm::Function *IRGenModule::getAwaitAsyncContinuationFn() {
247+
StringRef name = "__swift_continuation_await_point";
248+
if (llvm::GlobalValue *F = Module.getNamedValue(name))
249+
return cast<llvm::Function>(F);
250+
251+
// The parameters here match the extra arguments passed to
252+
// @llvm.coro.suspend.async by emitAwaitAsyncContinuation.
253+
llvm::Type *argTys[] = { ContinuationAsyncContextPtrTy };
254+
auto *suspendFnTy =
255+
llvm::FunctionType::get(VoidTy, argTys, false /*vaargs*/);
256+
257+
llvm::Function *suspendFn =
258+
llvm::Function::Create(suspendFnTy, llvm::Function::InternalLinkage,
259+
name, &Module);
260+
suspendFn->setCallingConv(SwiftAsyncCC);
261+
suspendFn->setDoesNotThrow();
262+
IRGenFunction suspendIGF(*this, suspendFn);
263+
if (DebugInfo)
264+
DebugInfo->emitArtificialFunction(suspendIGF, suspendFn);
265+
auto &Builder = suspendIGF.Builder;
266+
267+
llvm::Value *context = suspendFn->getArg(0);
268+
auto *call = Builder.CreateCall(getContinuationAwaitFn(), { context });
269+
call->setDoesNotThrow();
270+
call->setCallingConv(SwiftAsyncCC);
271+
call->setTailCallKind(AsyncTailCallKind);
272+
273+
Builder.CreateRetVoid();
274+
return suspendFn;
275+
}

lib/IRGen/IRGenFunction.cpp

Lines changed: 4 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -665,50 +665,8 @@ void IRGenFunction::emitAwaitAsyncContinuation(
665665
assert(AsyncCoroutineCurrentContinuationContext && "no active continuation");
666666
auto pointerAlignment = IGM.getPointerAlignment();
667667

668-
// Check whether the continuation has already been resumed.
669-
// If so, we can just immediately continue with the control flow.
670-
// Otherwise, we need to suspend, and resuming the continuation will
671-
// trigger the function to resume.
672-
//
673-
// We do this by atomically trying to change the synchronization field
674-
// in the continuation context from 0 (the state it was initialized
675-
// with) to 1. If this fails, the continuation must already have been
676-
// resumed, so we can bypass the suspension point and immediately
677-
// start interpreting the result stored in the continuation.
678-
// Note that we use a strong compare-exchange (the default for the LLVM
679-
// cmpxchg instruction), so spurious failures are disallowed; we can
680-
// therefore trust that a failure means that the continuation has
681-
// already been resumed.
682-
683-
auto contAwaitSyncAddr =
684-
Builder.CreateStructGEP(AsyncCoroutineCurrentContinuationContext, 1);
685-
686-
auto pendingV = llvm::ConstantInt::get(
687-
contAwaitSyncAddr->getType()->getPointerElementType(),
688-
unsigned(ContinuationStatus::Pending));
689-
auto awaitedV = llvm::ConstantInt::get(
690-
contAwaitSyncAddr->getType()->getPointerElementType(),
691-
unsigned(ContinuationStatus::Awaited));
692-
auto results = Builder.CreateAtomicCmpXchg(
693-
contAwaitSyncAddr, pendingV, awaitedV,
694-
llvm::AtomicOrdering::Release /*success ordering*/,
695-
llvm::AtomicOrdering::Acquire /* failure ordering */,
696-
llvm::SyncScope::System);
697-
auto firstAtAwait = Builder.CreateExtractValue(results, 1);
698-
auto contBB = createBasicBlock("await.async.resume");
699-
auto abortBB = createBasicBlock("await.async.abort");
700-
Builder.CreateCondBr(firstAtAwait, abortBB, contBB);
701-
Builder.emitBlock(abortBB);
702-
{
703-
// We were the first to the sync point. "Abort" (return from the
704-
// coroutine partial function, without making a tail call to anything)
705-
// because the continuation result is not available yet. When the
706-
// continuation is later resumed, the task will get scheduled
707-
// starting from the suspension point.
708-
emitCoroutineOrAsyncExit();
709-
}
710-
711-
Builder.emitBlock(contBB);
668+
// Call swift_continuation_await to check whether the continuation
669+
// has already been resumed.
712670
{
713671
// Set up the suspend point.
714672
SmallVector<llvm::Value *, 8> arguments;
@@ -718,15 +676,10 @@ void IRGenFunction::emitAwaitAsyncContinuation(
718676
auto resumeProjFn = getOrCreateResumePrjFn();
719677
arguments.push_back(
720678
Builder.CreateBitOrPointerCast(resumeProjFn, IGM.Int8PtrTy));
721-
// The dispatch function just calls the resume point.
722-
auto resumeFnPtr =
723-
getFunctionPointerForResumeIntrinsic(AsyncCoroutineCurrentResume);
724679
arguments.push_back(Builder.CreateBitOrPointerCast(
725-
createAsyncDispatchFn(resumeFnPtr, {IGM.Int8PtrTy}),
680+
IGM.getAwaitAsyncContinuationFn(),
726681
IGM.Int8PtrTy));
727-
arguments.push_back(AsyncCoroutineCurrentResume);
728-
arguments.push_back(Builder.CreateBitOrPointerCast(
729-
AsyncCoroutineCurrentContinuationContext, IGM.Int8PtrTy));
682+
arguments.push_back(AsyncCoroutineCurrentContinuationContext);
730683
auto resultTy =
731684
llvm::StructType::get(IGM.getLLVMContext(), {IGM.Int8PtrTy}, false /*packed*/);
732685
emitSuspendAsyncCall(swiftAsyncContextIndex, resultTy, arguments);

lib/IRGen/IRGenModule.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,7 @@ private: \
13301330
llvm::Constant *getFixLifetimeFn();
13311331

13321332
llvm::Constant *getFixedClassInitializationFn();
1333+
llvm::Function *getAwaitAsyncContinuationFn();
13331334

13341335
/// The constructor used when generating code.
13351336
///

stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,21 @@ OVERRIDE_TASK(task_asyncMainDrainQueue, void,
159159
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::,
160160
, )
161161

162+
OVERRIDE_TASK(task_suspend, AsyncTask *,
163+
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
164+
swift::, ,)
165+
166+
OVERRIDE_TASK(continuation_init, AsyncTask *,
167+
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
168+
swift::, (ContinuationAsyncContext *context,
169+
AsyncContinuationFlags flags),
170+
(context, flags))
171+
172+
OVERRIDE_TASK(continuation_await, void,
173+
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swiftasync),
174+
swift::, (ContinuationAsyncContext *context),
175+
(context))
176+
162177
OVERRIDE_ASYNC_LET(asyncLet_wait, void, SWIFT_EXPORT_FROM(swift_Concurrency),
163178
SWIFT_CC(swiftasync), swift::,
164179
(OpaqueValue *result,

stdlib/public/Concurrency/Actor.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,11 @@ void swift::runJobInEstablishedExecutorContext(Job *job) {
231231
// Update the active task in the current thread.
232232
ActiveTask::set(task);
233233

234-
// FIXME: update the task status to say that it's running
235-
// on the current thread. If the task suspends itself to run
236-
// on an actor, it should update the task status appropriately;
237-
// we don't need to update it afterwards.
234+
// Update the task status to say that it's running on the
235+
// current thread. If the task suspends somewhere, it should
236+
// update the task status appropriately; we don't need to update
237+
// it afterwards.
238+
task->flagAsRunning();
238239

239240
task->runInFullyEstablishedContext();
240241

@@ -1845,6 +1846,9 @@ SWIFT_CC(swiftasync)
18451846
static void runOnAssumedThread(AsyncTask *task, ExecutorRef executor,
18461847
ExecutorTrackingInfo *oldTracking,
18471848
RunningJobInfo runner) {
1849+
// Note that this doesn't change the active task and so doesn't
1850+
// need to either update ActiveTask or flagAsRunning/flagAsSuspended.
1851+
18481852
// If there's alreaady tracking info set up, just change the executor
18491853
// there and tail-call the task. We don't want these frames to
18501854
// potentially accumulate linearly.
@@ -1932,6 +1936,7 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
19321936
fprintf(stderr, "[%lu] switch failed, task %p enqueued on executor %p\n",
19331937
_swift_get_thread_id(), task, newExecutor.getIdentity());
19341938
#endif
1939+
task->flagAsSuspended();
19351940
swift_task_enqueue(task, newExecutor);
19361941
}
19371942

stdlib/public/Concurrency/Task.cpp

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
8989
_swift_get_thread_id(), waitingTask, this);
9090
#endif
9191
_swift_tsan_acquire(static_cast<Job *>(this));
92+
if (contextIntialized) waitingTask->flagAsRunning();
9293
// The task is done; we don't need to wait.
9394
return queueHead.getStatus();
9495

@@ -98,7 +99,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
9899
_swift_get_thread_id(), waitingTask, this);
99100
#endif
100101
_swift_tsan_release(static_cast<Job *>(waitingTask));
101-
// Task is now complete. We'll need to add ourselves to the queue.
102+
// Task is not complete. We'll need to add ourselves to the queue.
102103
break;
103104
}
104105

@@ -110,6 +111,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
110111
context->successResultPointer = result;
111112
context->ResumeParent = resumeFn;
112113
context->Parent = callerContext;
114+
waitingTask->flagAsSuspended();
113115
}
114116

115117
// Put the waiting task at the beginning of the wait queue.
@@ -934,8 +936,16 @@ size_t swift::swift_task_getJobFlags(AsyncTask *task) {
934936
return task->Flags.getOpaqueValue();
935937
}
936938

937-
AsyncTask *swift::swift_continuation_init(ContinuationAsyncContext *context,
938-
AsyncContinuationFlags flags) {
939+
SWIFT_CC(swift)
940+
static AsyncTask *swift_task_suspendImpl() {
941+
auto task = swift_task_getCurrent();
942+
task->flagAsSuspended();
943+
return task;
944+
}
945+
946+
SWIFT_CC(swift)
947+
static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
948+
AsyncContinuationFlags flags) {
939949
context->Flags = AsyncContextKind::Continuation;
940950
if (flags.canThrow()) context->Flags.setCanThrow(true);
941951
context->ErrorResult = nullptr;
@@ -957,9 +967,63 @@ AsyncTask *swift::swift_continuation_init(ContinuationAsyncContext *context,
957967
task->ResumeContext = context;
958968
task->ResumeTask = context->ResumeParent;
959969

970+
// A preawait immediately suspends the task.
971+
if (flags.isPreawaited()) {
972+
task->flagAsSuspended();
973+
}
974+
960975
return task;
961976
}
962977

978+
SWIFT_CC(swiftasync)
979+
static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
980+
#ifndef NDEBUG
981+
auto task = swift_task_getCurrent();
982+
assert(task && "awaiting continuation without a task");
983+
assert(task->ResumeContext == context);
984+
assert(task->ResumeTask == context->ResumeParent);
985+
#endif
986+
987+
auto &sync = context->AwaitSynchronization;
988+
989+
auto oldStatus = sync.load(std::memory_order_acquire);
990+
assert((oldStatus == ContinuationStatus::Pending ||
991+
oldStatus == ContinuationStatus::Resumed) &&
992+
"awaiting a corrupt or already-awaited continuation");
993+
994+
// If the status is already Resumed, we can resume immediately.
995+
// Comparing against Pending may be very slightly more compact.
996+
if (oldStatus != ContinuationStatus::Pending)
997+
// This is fine given how swift_continuation_init sets it up.
998+
return context->ResumeParent(context);
999+
1000+
// Load the current task (we alreaady did this in assertions builds).
1001+
#ifdef NDEBUG
1002+
auto task = swift_task_getCurrent();
1003+
#endif
1004+
1005+
// Flag the task as suspended.
1006+
task->flagAsSuspended();
1007+
1008+
// Try to transition to Awaited.
1009+
bool success =
1010+
sync.compare_exchange_strong(oldStatus, ContinuationStatus::Awaited,
1011+
/*success*/ std::memory_order_release,
1012+
/*failure*/ std::memory_order_acquire);
1013+
1014+
// If that succeeded, we have nothing to do.
1015+
if (success) return;
1016+
1017+
// If it failed, it should be because someone concurrently resumed
1018+
// (note that the compare-exchange above is strong).
1019+
assert(oldStatus == ContinuationStatus::Resumed &&
1020+
"continuation was concurrently corrupted or awaited");
1021+
1022+
// Restore the running state of the task and resume it.
1023+
task->flagAsRunning();
1024+
return task->runInFullyEstablishedContext();
1025+
}
1026+
9631027
static void resumeTaskAfterContinuation(AsyncTask *task,
9641028
ContinuationAsyncContext *context) {
9651029
auto &sync = context->AwaitSynchronization;

0 commit comments

Comments
 (0)