Skip to content

Commit 56c54d9

Browse files
committed
Track whether a task is actively running.
Tracking this as a single bit is actually largely uninteresting to the runtime. To handle priority escalation properly, we really need to track this at a finer grain of detail: recording that the task is running on a specific thread, enqueued on a specific actor, or so on. But starting by tracking a single bit is important for two reasons: - First, it's more realistic about the performance overheads of tasks: we're going to be doing this tracking eventually, and the cost of that tracking will be dominated by the atomic access, so doing that access now sets the baseline about right. - Second, it ensures that we've actually got runtime involvement in all the right places to do this tracking. A propos of the latter: there was no runtime involvement with awaiting a continuation, which is a point at which the task potentially transitions from running to suspended. We must do the tracking as part of this transition, rather than recognizing in the run-loops that a task is still active and treating it as having suspended, because the latter point potentially races with the resumption of the task. To do this, I've had to introduce a runtime function, swift_continuation_await, to do this awaiting rather than inlining the atomic operation on the continuation. As part of doing this work, I've also fixed a bug where we failed to load-acquire in swift_task_escalate before walking the task status records to invoke escalation actions. I've also fixed several places where the handling of task statuses may have accidentally allowed the task to revert to uncancelled.
1 parent 70279c0 commit 56c54d9

File tree

14 files changed

+401
-167
lines changed

14 files changed

+401
-167
lines changed

include/swift/ABI/Task.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,25 @@ class AsyncTask : public Job {
260260
void runInFullyEstablishedContext() {
261261
return ResumeTask(ResumeContext); // 'return' forces tail call
262262
}
263-
263+
264+
/// Flag that this task is now running. This can update
265+
/// the priority stored in the job flags if the priority has been
266+
/// escalated.
267+
///
268+
/// Generally this should be done immediately after updating
269+
/// ActiveTask.
270+
void flagAsRunning();
271+
void flagAsRunning_slow();
272+
273+
/// Flag that this task is now suspended. This can update the
274+
/// priority stored in the job flags if the priority hsa been
275+
/// escalated. Generally this should be done immediately after
276+
/// clearing ActiveTask and immediately before enqueuing the task
277+
/// somewhere. TODO: record where the task is enqueued if
278+
/// possible.
279+
void flagAsSuspended();
280+
void flagAsSuspended_slow();
281+
264282
/// Check whether this task has been cancelled.
265283
/// Checking this is, of course, inherently race-prone on its own.
266284
bool isCancelled() const;

include/swift/Runtime/Concurrency.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -580,20 +580,28 @@ void swift_defaultActor_deallocateResilient(HeapObject *actor);
580580
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
581581
void swift_defaultActor_enqueue(Job *job, DefaultActor *actor);
582582

583+
/// Do a primitive suspension of the current task, as if part of
584+
/// a continuation, although this does not provide any of the
585+
/// higher-level continuation semantics. The current task is returned;
586+
/// its ResumeFunction and ResumeContext will need to be initialized,
587+
/// and then it will need to be enqueued or run as a job later.
588+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
589+
AsyncTask *swift_task_suspend();
590+
583591
/// Prepare a continuation in the current task.
584592
///
585593
/// The caller should initialize the Parent, ResumeParent,
586594
/// and NormalResult fields. This function will initialize the other
587-
/// fields with appropriate defaaults; the caller may then overwrite
595+
/// fields with appropriate defaults; the caller may then overwrite
588596
/// them if desired.
589-
///
590-
/// This function is provided as a code-size and runtime-usage
591-
/// optimization; calling it is not required if code is willing to
592-
/// do all its work inline.
593597
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
594598
AsyncTask *swift_continuation_init(ContinuationAsyncContext *context,
595599
AsyncContinuationFlags flags);
596600

601+
/// Await an initialized continuation.
602+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync)
603+
void swift_continuation_await(ContinuationAsyncContext *continuationContext);
604+
597605
/// Resume a task from a non-throwing continuation, given a normal
598606
/// result which has already been stored into the continuation.
599607
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)

include/swift/Runtime/RuntimeFunctions.def

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,6 +1582,14 @@ FUNCTION(ContinuationInit,
15821582
ARGS(ContinuationAsyncContextPtrTy, SizeTy),
15831583
ATTRS(NoUnwind))
15841584

1585+
// void swift_continuation_await(AsyncContext *continuationContext);
1586+
FUNCTION(ContinuationAwait,
1587+
swift_continuation_await, SwiftAsyncCC,
1588+
ConcurrencyAvailability,
1589+
RETURNS(VoidTy),
1590+
ARGS(ContinuationAsyncContextPtrTy),
1591+
ATTRS(NoUnwind))
1592+
15851593
// void swift_continuation_resume(AsyncTask *continuation);
15861594
FUNCTION(ContinuationResume,
15871595
swift_continuation_resume, SwiftCC,

lib/IRGen/GenConcurrency.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "ExtraInhabitants.h"
2222
#include "GenProto.h"
2323
#include "GenType.h"
24+
#include "IRGenDebugInfo.h"
2425
#include "IRGenFunction.h"
2526
#include "IRGenModule.h"
2627
#include "LoadableTypeInfo.h"
@@ -241,3 +242,34 @@ void irgen::emitDestroyTaskGroup(IRGenFunction &IGF, llvm::Value *group) {
241242

242243
IGF.Builder.CreateLifetimeEnd(group);
243244
}
245+
246+
llvm::Function *IRGenModule::getAwaitAsyncContinuationFn() {
247+
StringRef name = "__swift_continuation_await_point";
248+
if (llvm::GlobalValue *F = Module.getNamedValue(name))
249+
return cast<llvm::Function>(F);
250+
251+
// The parameters here match the extra arguments passed to
252+
// @llvm.coro.suspend.async by emitAwaitAsyncContinuation.
253+
llvm::Type *argTys[] = { ContinuationAsyncContextPtrTy };
254+
auto *suspendFnTy =
255+
llvm::FunctionType::get(VoidTy, argTys, false /*vaargs*/);
256+
257+
llvm::Function *suspendFn =
258+
llvm::Function::Create(suspendFnTy, llvm::Function::InternalLinkage,
259+
name, &Module);
260+
suspendFn->setCallingConv(SwiftAsyncCC);
261+
suspendFn->setDoesNotThrow();
262+
IRGenFunction suspendIGF(*this, suspendFn);
263+
if (DebugInfo)
264+
DebugInfo->emitArtificialFunction(suspendIGF, suspendFn);
265+
auto &Builder = suspendIGF.Builder;
266+
267+
llvm::Value *context = suspendFn->getArg(0);
268+
auto *call = Builder.CreateCall(getContinuationAwaitFn(), { context });
269+
call->setDoesNotThrow();
270+
call->setCallingConv(SwiftAsyncCC);
271+
call->setTailCallKind(AsyncTailCallKind);
272+
273+
Builder.CreateRetVoid();
274+
return suspendFn;
275+
}

lib/IRGen/IRGenFunction.cpp

Lines changed: 4 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -665,50 +665,8 @@ void IRGenFunction::emitAwaitAsyncContinuation(
665665
assert(AsyncCoroutineCurrentContinuationContext && "no active continuation");
666666
auto pointerAlignment = IGM.getPointerAlignment();
667667

668-
// Check whether the continuation has already been resumed.
669-
// If so, we can just immediately continue with the control flow.
670-
// Otherwise, we need to suspend, and resuming the continuation will
671-
// trigger the function to resume.
672-
//
673-
// We do this by atomically trying to change the synchronization field
674-
// in the continuation context from 0 (the state it was initialized
675-
// with) to 1. If this fails, the continuation must already have been
676-
// resumed, so we can bypass the suspension point and immediately
677-
// start interpreting the result stored in the continuation.
678-
// Note that we use a strong compare-exchange (the default for the LLVM
679-
// cmpxchg instruction), so spurious failures are disallowed; we can
680-
// therefore trust that a failure means that the continuation has
681-
// already been resumed.
682-
683-
auto contAwaitSyncAddr =
684-
Builder.CreateStructGEP(AsyncCoroutineCurrentContinuationContext, 1);
685-
686-
auto pendingV = llvm::ConstantInt::get(
687-
contAwaitSyncAddr->getType()->getPointerElementType(),
688-
unsigned(ContinuationStatus::Pending));
689-
auto awaitedV = llvm::ConstantInt::get(
690-
contAwaitSyncAddr->getType()->getPointerElementType(),
691-
unsigned(ContinuationStatus::Awaited));
692-
auto results = Builder.CreateAtomicCmpXchg(
693-
contAwaitSyncAddr, pendingV, awaitedV,
694-
llvm::AtomicOrdering::Release /*success ordering*/,
695-
llvm::AtomicOrdering::Acquire /* failure ordering */,
696-
llvm::SyncScope::System);
697-
auto firstAtAwait = Builder.CreateExtractValue(results, 1);
698-
auto contBB = createBasicBlock("await.async.resume");
699-
auto abortBB = createBasicBlock("await.async.abort");
700-
Builder.CreateCondBr(firstAtAwait, abortBB, contBB);
701-
Builder.emitBlock(abortBB);
702-
{
703-
// We were the first to the sync point. "Abort" (return from the
704-
// coroutine partial function, without making a tail call to anything)
705-
// because the continuation result is not available yet. When the
706-
// continuation is later resumed, the task will get scheduled
707-
// starting from the suspension point.
708-
emitCoroutineOrAsyncExit();
709-
}
710-
711-
Builder.emitBlock(contBB);
668+
// Call swift_continuation_await to check whether the continuation
669+
// has already been resumed.
712670
{
713671
// Set up the suspend point.
714672
SmallVector<llvm::Value *, 8> arguments;
@@ -718,15 +676,10 @@ void IRGenFunction::emitAwaitAsyncContinuation(
718676
auto resumeProjFn = getOrCreateResumePrjFn();
719677
arguments.push_back(
720678
Builder.CreateBitOrPointerCast(resumeProjFn, IGM.Int8PtrTy));
721-
// The dispatch function just calls the resume point.
722-
auto resumeFnPtr =
723-
getFunctionPointerForResumeIntrinsic(AsyncCoroutineCurrentResume);
724679
arguments.push_back(Builder.CreateBitOrPointerCast(
725-
createAsyncDispatchFn(resumeFnPtr, {IGM.Int8PtrTy}),
680+
IGM.getAwaitAsyncContinuationFn(),
726681
IGM.Int8PtrTy));
727-
arguments.push_back(AsyncCoroutineCurrentResume);
728-
arguments.push_back(Builder.CreateBitOrPointerCast(
729-
AsyncCoroutineCurrentContinuationContext, IGM.Int8PtrTy));
682+
arguments.push_back(AsyncCoroutineCurrentContinuationContext);
730683
auto resultTy =
731684
llvm::StructType::get(IGM.getLLVMContext(), {IGM.Int8PtrTy}, false /*packed*/);
732685
emitSuspendAsyncCall(swiftAsyncContextIndex, resultTy, arguments);

lib/IRGen/IRGenModule.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,6 +1333,7 @@ private: \
13331333
llvm::Constant *getFixLifetimeFn();
13341334

13351335
llvm::Constant *getFixedClassInitializationFn();
1336+
llvm::Function *getAwaitAsyncContinuationFn();
13361337

13371338
/// The constructor used when generating code.
13381339
///

stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,21 @@ OVERRIDE_TASK(task_asyncMainDrainQueue, void,
159159
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::,
160160
, )
161161

162+
OVERRIDE_TASK(task_suspend, AsyncTask *,
163+
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
164+
swift::, ,)
165+
166+
OVERRIDE_TASK(continuation_init, AsyncTask *,
167+
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
168+
swift::, (ContinuationAsyncContext *context,
169+
AsyncContinuationFlags flags),
170+
(context, flags))
171+
172+
OVERRIDE_TASK(continuation_await, void,
173+
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swiftasync),
174+
swift::, (ContinuationAsyncContext *context),
175+
(context))
176+
162177
OVERRIDE_ASYNC_LET(asyncLet_wait, void, SWIFT_EXPORT_FROM(swift_Concurrency),
163178
SWIFT_CC(swiftasync), swift::,
164179
(OpaqueValue *result,

stdlib/public/Concurrency/Actor.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,11 @@ void swift::runJobInEstablishedExecutorContext(Job *job) {
222222
// Update the active task in the current thread.
223223
ActiveTask::set(task);
224224

225-
// FIXME: update the task status to say that it's running
226-
// on the current thread. If the task suspends itself to run
227-
// on an actor, it should update the task status appropriately;
228-
// we don't need to update it afterwards.
225+
// Update the task status to say that it's running on the
226+
// current thread. If the task suspends somewhere, it should
227+
// update the task status appropriately; we don't need to update
228+
// it afterwards.
229+
task->flagAsRunning();
229230

230231
task->runInFullyEstablishedContext();
231232

@@ -1790,6 +1791,9 @@ SWIFT_CC(swiftasync)
17901791
static void runOnAssumedThread(AsyncTask *task, ExecutorRef executor,
17911792
ExecutorTrackingInfo *oldTracking,
17921793
RunningJobInfo runner) {
1794+
// Note that this doesn't change the active task and so doesn't
1795+
// need to either update ActiveTask or flagAsRunning/flagAsSuspended.
1796+
17931797
// If there's alreaady tracking info set up, just change the executor
17941798
// there and tail-call the task. We don't want these frames to
17951799
// potentially accumulate linearly.
@@ -1872,6 +1876,7 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
18721876
#if SWIFT_TASK_PRINTF_DEBUG
18731877
fprintf(stderr, "[%p] switch failed, task %p enqueued on executor %p\n", pthread_self(), task, newExecutor.getIdentity());
18741878
#endif
1879+
task->flagAsSuspended();
18751880
swift_task_enqueue(task, newExecutor);
18761881
}
18771882

stdlib/public/Concurrency/Task.cpp

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
8787
fprintf(stderr, "[%p] task %p waiting on task %p, completed immediately\n", pthread_self(), waitingTask, this);
8888
#endif
8989
_swift_tsan_acquire(static_cast<Job *>(this));
90+
if (contextIntialized) waitingTask->flagAsRunning();
9091
// The task is done; we don't need to wait.
9192
return queueHead.getStatus();
9293

@@ -95,7 +96,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
9596
fprintf(stderr, "[%p] task %p waiting on task %p, going to sleep\n", pthread_self(), waitingTask, this);
9697
#endif
9798
_swift_tsan_release(static_cast<Job *>(waitingTask));
98-
// Task is now complete. We'll need to add ourselves to the queue.
99+
// Task is not complete. We'll need to add ourselves to the queue.
99100
break;
100101
}
101102

@@ -107,6 +108,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
107108
context->successResultPointer = result;
108109
context->ResumeParent = resumeFn;
109110
context->Parent = callerContext;
111+
waitingTask->flagAsSuspended();
110112
}
111113

112114
// Put the waiting task at the beginning of the wait queue.
@@ -928,8 +930,16 @@ size_t swift::swift_task_getJobFlags(AsyncTask *task) {
928930
return task->Flags.getOpaqueValue();
929931
}
930932

931-
AsyncTask *swift::swift_continuation_init(ContinuationAsyncContext *context,
932-
AsyncContinuationFlags flags) {
933+
SWIFT_CC(swift)
934+
static AsyncTask *swift_task_suspendImpl() {
935+
auto task = swift_task_getCurrent();
936+
task->flagAsSuspended();
937+
return task;
938+
}
939+
940+
SWIFT_CC(swift)
941+
static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
942+
AsyncContinuationFlags flags) {
933943
context->Flags = AsyncContextKind::Continuation;
934944
if (flags.canThrow()) context->Flags.setCanThrow(true);
935945
context->ErrorResult = nullptr;
@@ -951,9 +961,63 @@ AsyncTask *swift::swift_continuation_init(ContinuationAsyncContext *context,
951961
task->ResumeContext = context;
952962
task->ResumeTask = context->ResumeParent;
953963

964+
// A preawait immediately suspends the task.
965+
if (flags.isPreawaited()) {
966+
task->flagAsSuspended();
967+
}
968+
954969
return task;
955970
}
956971

972+
SWIFT_CC(swiftasync)
973+
static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
974+
#ifndef NDEBUG
975+
auto task = swift_task_getCurrent();
976+
assert(task && "awaiting continuation without a task");
977+
assert(task->ResumeContext == context);
978+
assert(task->ResumeTask == context->ResumeParent);
979+
#endif
980+
981+
auto &sync = context->AwaitSynchronization;
982+
983+
auto oldStatus = sync.load(std::memory_order_acquire);
984+
assert((oldStatus == ContinuationStatus::Pending ||
985+
oldStatus == ContinuationStatus::Resumed) &&
986+
"awaiting a corrupt or already-awaited continuation");
987+
988+
// If the status is already Resumed, we can resume immediately.
989+
// Comparing against Pending may be very slightly more compact.
990+
if (oldStatus != ContinuationStatus::Pending)
991+
// This is fine given how swift_continuation_init sets it up.
992+
return context->ResumeParent(context);
993+
994+
// Load the current task (we alreaady did this in assertions builds).
995+
#ifdef NDEBUG
996+
auto task = swift_task_getCurrent();
997+
#endif
998+
999+
// Flag the task as suspended.
1000+
task->flagAsSuspended();
1001+
1002+
// Try to transition to Awaited.
1003+
bool success =
1004+
sync.compare_exchange_strong(oldStatus, ContinuationStatus::Awaited,
1005+
/*success*/ std::memory_order_release,
1006+
/*failure*/ std::memory_order_acquire);
1007+
1008+
// If that succeeded, we have nothing to do.
1009+
if (success) return;
1010+
1011+
// If it failed, it should be because someone concurrently resumed
1012+
// (note that the compare-exchange above is strong).
1013+
assert(oldStatus == ContinuationStatus::Resumed &&
1014+
"continuation was concurrently corrupted or awaited");
1015+
1016+
// Restore the running state of the task and resume it.
1017+
task->flagAsRunning();
1018+
return task->runInFullyEstablishedContext();
1019+
}
1020+
9571021
static void resumeTaskAfterContinuation(AsyncTask *task,
9581022
ContinuationAsyncContext *context) {
9591023
auto &sync = context->AwaitSynchronization;

0 commit comments

Comments
 (0)