Skip to content

Commit b2f51dd

Browse files
committed
Implement continuations in task-to-thread model.
This is done using a condition variable upon which the awaiting thread will block if the continuation has not be resumed by the point of await. The resuming thread will signal this condition variable, thereby unblocking the awaiting thread. Rdar://99977665
1 parent 94b1759 commit b2f51dd

File tree

5 files changed

+175
-23
lines changed

5 files changed

+175
-23
lines changed

include/swift/ABI/Task.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include "swift/Runtime/Config.h"
2626
#include "swift/Runtime/VoucherShims.h"
2727
#include "swift/Basic/STLExtras.h"
28+
#include "swift/Threading/ConditionVariable.h"
29+
#include "swift/Threading/Mutex.h"
2830
#include "bitset"
2931
#include "queue" // TODO: remove and replace with our own mpsc
3032

@@ -711,6 +713,16 @@ class ContinuationAsyncContext : public AsyncContext {
711713
/// Public ABI.
712714
ExecutorRef ResumeToExecutor;
713715

716+
#if defined(SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY)
717+
/// In a task-to-thread model, instead of voluntarily descheduling the task
718+
/// from the thread, we will block the thread (and therefore task).
719+
/// This condition variable is lazily allocated on the stack only if the
720+
/// continuation has not been resumed by the point of await. The mutex in the
721+
/// condition variable is therefore not really protecting any state as all
722+
/// coordination is done via the AwaitSynchronization atomic
723+
ConditionVariable *Cond;
724+
#endif
725+
714726
void setErrorResult(SwiftError *error) {
715727
ErrorResult = error;
716728
}

lib/IRGen/IRGenModule.cpp

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -690,14 +690,28 @@ IRGenModule::IRGenModule(IRGenerator &irgen,
690690
*this, "swift.async_task_and_context",
691691
{ SwiftTaskPtrTy, SwiftContextPtrTy });
692692

693-
ContinuationAsyncContextTy = createStructType(
694-
*this, "swift.continuation_context",
695-
{SwiftContextTy, // AsyncContext header
696-
SizeTy, // flags
697-
SizeTy, // await synchronization
698-
ErrorPtrTy, // error result pointer
699-
OpaquePtrTy, // normal result address
700-
SwiftExecutorTy}); // resume to executor
693+
if (Context.LangOpts.isConcurrencyModelTaskToThread()) {
694+
ContinuationAsyncContextTy = createStructType(
695+
*this, "swift.continuation_context",
696+
{SwiftContextTy, // AsyncContext header
697+
SizeTy, // flags
698+
SizeTy, // await synchronization
699+
ErrorPtrTy, // error result pointer
700+
OpaquePtrTy, // normal result address
701+
SwiftExecutorTy, // resume to executor
702+
SizeTy // pointer to condition variable
703+
});
704+
} else {
705+
ContinuationAsyncContextTy = createStructType(
706+
*this, "swift.continuation_context",
707+
{SwiftContextTy, // AsyncContext header
708+
SizeTy, // flags
709+
SizeTy, // await synchronization
710+
ErrorPtrTy, // error result pointer
711+
OpaquePtrTy, // normal result address
712+
SwiftExecutorTy // resume to executor
713+
});
714+
}
701715
ContinuationAsyncContextPtrTy =
702716
ContinuationAsyncContextTy->getPointerTo(DefaultAS);
703717

stdlib/public/Concurrency/Actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ class DefaultActorImpl : public HeapObject {
845845
// the future
846846
alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
847847
#endif
848-
// TODO(rokhinip): Make this a flagset
848+
// TODO (rokhinip): Make this a flagset
849849
bool isDistributedRemoteActor;
850850

851851
public:

stdlib/public/Concurrency/Task.cpp

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,9 @@ static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
13111311
? ContinuationStatus::Awaited
13121312
: ContinuationStatus::Pending,
13131313
std::memory_order_relaxed);
1314-
1314+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1315+
context->Cond = nullptr;
1316+
#endif
13151317
AsyncTask *task;
13161318

13171319
// A preawait immediately suspends the task.
@@ -1351,8 +1353,7 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
13511353
"awaiting a corrupt or already-awaited continuation");
13521354

13531355
// If the status is already Resumed, we can resume immediately.
1354-
// Comparing against Pending may be very slightly more compact.
1355-
if (oldStatus != ContinuationStatus::Pending) {
1356+
if (oldStatus == ContinuationStatus::Resumed) {
13561357
if (context->isExecutorSwitchForced())
13571358
return swift_task_switch(context, context->ResumeParent,
13581359
context->ResumeToExecutor);
@@ -1364,28 +1365,72 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
13641365
auto task = swift_task_getCurrent();
13651366
#endif
13661367

1368+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1369+
// In the task to thread model, we do not suspend the task that is waiting on
1370+
// the continuation resumption. Instead we simply block the thread on a
1371+
// condition variable keep the task alive on the thread.
1372+
//
1373+
// This condition variable can be allocated on the stack of the blocking
1374+
// thread - with the address of it published to the resuming thread via the
1375+
// context.
1376+
ConditionVariable Cond;
1377+
1378+
context->Cond = &Cond;
1379+
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
13671380
// Flag the task as suspended.
13681381
task->flagAsSuspended();
1382+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
13691383

1370-
// Try to transition to Awaited.
1384+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1385+
// If the cmpxchg is successful, the store release also publishes the write to
1386+
// the Cond in the ContinuationAsyncContext to any concurrent accessing
1387+
// thread.
1388+
//
1389+
// If it failed, then someone concurrently resumed the continuation in which
1390+
// case, we don't care about publishing the Cond in the
1391+
// ContinuationAsyncContext anyway.
1392+
#endif
1393+
// Try to transition to Awaited
13711394
bool success =
13721395
sync.compare_exchange_strong(oldStatus, ContinuationStatus::Awaited,
13731396
/*success*/ std::memory_order_release,
13741397
/*failure*/ std::memory_order_acquire);
13751398

1376-
// If that succeeded, we have nothing to do.
13771399
if (success) {
1400+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1401+
// This lock really protects nothing but we need to hold it
1402+
// while calling the condition wait
1403+
Cond.lock();
1404+
1405+
// Condition variables can have spurious wakeups so we need to check this in
1406+
// a do-while loop.
1407+
do {
1408+
Cond.wait();
1409+
oldStatus = sync.load(std::memory_order_relaxed);
1410+
} while (oldStatus != ContinuationStatus::Resumed);
1411+
1412+
Cond.unlock();
1413+
#else
1414+
// If that succeeded, we have nothing to do since we've successfully
1415+
// suspended the task
13781416
_swift_task_clearCurrent();
13791417
return;
1418+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
13801419
}
13811420

13821421
// If it failed, it should be because someone concurrently resumed
13831422
// (note that the compare-exchange above is strong).
13841423
assert(oldStatus == ContinuationStatus::Resumed &&
13851424
"continuation was concurrently corrupted or awaited");
13861425

1426+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1427+
// Since the condition variable is stack allocated, we don't need to do
1428+
// anything here to clean up
1429+
#else
13871430
// Restore the running state of the task and resume it.
13881431
task->flagAsRunning();
1432+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
1433+
13891434
if (context->isExecutorSwitchForced())
13901435
return swift_task_switch(context, context->ResumeParent,
13911436
context->ResumeToExecutor);
@@ -1397,6 +1442,7 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
13971442
continuationChecking::willResume(context);
13981443

13991444
auto &sync = context->AwaitSynchronization;
1445+
14001446
auto status = sync.load(std::memory_order_acquire);
14011447
assert(status != ContinuationStatus::Resumed &&
14021448
"continuation was already resumed");
@@ -1405,27 +1451,41 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
14051451
// restarting.
14061452
_swift_tsan_release(static_cast<Job *>(task));
14071453

1408-
// The status should be either Pending or Awaited. If it's Awaited,
1409-
// which is probably the most likely option, then we should immediately
1410-
// enqueue; we don't need to update the state because there shouldn't
1411-
// be a racing attempt to resume the continuation. If it's Pending,
1412-
// we need to set it to Resumed; if that fails (with a strong cmpxchg),
1413-
// it should be because the original thread concurrently set it to
1414-
// Awaited, and so we need to enqueue.
1454+
// The status should be either Pending or Awaited.
1455+
//
1456+
// Case 1: Status is Pending
1457+
// No one has awaited us, we just need to set it to Resumed; if that fails
1458+
// (with a strong cmpxchg), it should be because the original thread
1459+
// concurrently set it to Awaited, in which case, we fall into Case 2.
1460+
//
1461+
// Case 2: Status is Awaited
1462+
// This is probably the more frequently hit case.
1463+
// In task-to-thread model, we update status to be Resumed and signal the
1464+
// waiting thread. In regular model, we immediately enqueue the task and can
1465+
// skip updates to the continuation state since there shouldn't be a racing
1466+
// attempt to resume the continuation.
14151467
if (status == ContinuationStatus::Pending &&
14161468
sync.compare_exchange_strong(status, ContinuationStatus::Resumed,
14171469
/*success*/ std::memory_order_release,
1418-
/*failure*/ std::memory_order_relaxed)) {
1470+
/*failure*/ std::memory_order_acquire)) {
14191471
return;
14201472
}
14211473
assert(status == ContinuationStatus::Awaited &&
14221474
"detected concurrent attempt to resume continuation");
1475+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1476+
// If we see status == ContinuationStatus::Awaited, then we should also be
1477+
// seeing a pointer to the cond var since we're doing a load acquire on sync
1478+
// which pairs with the store release in swift_continuation_awaitImpl
1479+
assert(context->Cond != nullptr);
14231480

1481+
sync.store(ContinuationStatus::Resumed, std::memory_order_relaxed);
1482+
context->Cond->signal();
1483+
#else
14241484
// TODO: maybe in some mode we should set the status to Resumed here
14251485
// to make a stronger best-effort attempt to catch racing attempts to
14261486
// resume the continuation?
1427-
14281487
task->flagAsAndEnqueueOnExecutor(context->ResumeToExecutor);
1488+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
14291489
}
14301490

14311491
SWIFT_CC(swift)
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// RUN: %target-run-simple-swift(-parse-as-library -Xfrontend -disable-availability-checking -Xfrontend -concurrency-model=task-to-thread -g -Xlinker -object_path_lto -Xlinker /tmp/abc.o)
2+
// REQUIRES: concurrency
3+
// REQUIRES: executable_test
4+
// REQUIRES: concurrency_runtime
5+
// REQUIRES: freestanding
6+
// UNSUPPORTED: threading_none
7+
8+
@_spi(_TaskToThreadModel) import _Concurrency
9+
import StdlibUnittest
10+
import Darwin
11+
12+
var globalContinuation : CheckedContinuation<Int, Never>? = nil
13+
14+
func waitOnContinuation(_unused : UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
15+
Task.runInline {
16+
let result = await withCheckedContinuation { continuation in
17+
globalContinuation = continuation
18+
}
19+
print("Continuation successfully resumed")
20+
expectEqual(result, 10)
21+
}
22+
return nil
23+
}
24+
25+
func resumeContinuation(_unused : UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
26+
Task.runInline {
27+
while (globalContinuation == nil) {}
28+
globalContinuation!.resume(returning: 10)
29+
}
30+
return nil
31+
}
32+
33+
@main struct Main {
34+
static func main() {
35+
36+
let tests = TestSuite("Continuations in task-to-thread")
37+
tests.test("Basic continuations - no blocking") {
38+
Task.runInline {
39+
await withCheckedContinuation { continuation in
40+
continuation.resume()
41+
}
42+
}
43+
}
44+
45+
tests.test("Continuations - with blocking") {
46+
var thread1 : pthread_t? = nil
47+
guard pthread_create(&thread1, nil, waitOnContinuation, nil) == 0 else {
48+
fatalError("pthread_create failed")
49+
}
50+
51+
var thread2 : pthread_t? = nil
52+
guard pthread_create(&thread2, nil, resumeContinuation, nil) == 0 else {
53+
fatalError("pthread_create failed")
54+
}
55+
56+
guard pthread_join(thread1!, nil) == 0 else {
57+
fatalError("pthread_join failed")
58+
}
59+
guard pthread_join(thread2!, nil) == 0 else {
60+
fatalError("pthread_join failed")
61+
}
62+
}
63+
64+
runAllTests()
65+
}
66+
}

0 commit comments

Comments
 (0)