Skip to content

Commit 627dc1f

Browse files
authored
Merge pull request #61911 from apple/rokhinip/99977665-continuations-support
Implement continuations in task-to-thread model.
2 parents 6676fbf + b2f51dd commit 627dc1f

File tree

5 files changed

+175
-23
lines changed

5 files changed

+175
-23
lines changed

include/swift/ABI/Task.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include "swift/Runtime/Config.h"
2626
#include "swift/Runtime/VoucherShims.h"
2727
#include "swift/Basic/STLExtras.h"
28+
#include "swift/Threading/ConditionVariable.h"
29+
#include "swift/Threading/Mutex.h"
2830
#include "bitset"
2931
#include "queue" // TODO: remove and replace with our own mpsc
3032

@@ -711,6 +713,16 @@ class ContinuationAsyncContext : public AsyncContext {
711713
/// Public ABI.
712714
ExecutorRef ResumeToExecutor;
713715

716+
#if defined(SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY)
717+
/// In a task-to-thread model, instead of voluntarily descheduling the task
718+
/// from the thread, we will block the thread (and therefore task).
719+
/// This condition variable is lazily allocated on the stack only if the
720+
/// continuation has not been resumed by the point of await. The mutex in the
721+
/// condition variable is therefore not really protecting any state as all
722+
/// coordination is done via the AwaitSynchronization atomic
723+
ConditionVariable *Cond;
724+
#endif
725+
714726
void setErrorResult(SwiftError *error) {
715727
ErrorResult = error;
716728
}

lib/IRGen/IRGenModule.cpp

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -683,14 +683,28 @@ IRGenModule::IRGenModule(IRGenerator &irgen,
683683
*this, "swift.async_task_and_context",
684684
{ SwiftTaskPtrTy, SwiftContextPtrTy });
685685

686-
ContinuationAsyncContextTy = createStructType(
687-
*this, "swift.continuation_context",
688-
{SwiftContextTy, // AsyncContext header
689-
SizeTy, // flags
690-
SizeTy, // await synchronization
691-
ErrorPtrTy, // error result pointer
692-
OpaquePtrTy, // normal result address
693-
SwiftExecutorTy}); // resume to executor
686+
if (Context.LangOpts.isConcurrencyModelTaskToThread()) {
687+
ContinuationAsyncContextTy = createStructType(
688+
*this, "swift.continuation_context",
689+
{SwiftContextTy, // AsyncContext header
690+
SizeTy, // flags
691+
SizeTy, // await synchronization
692+
ErrorPtrTy, // error result pointer
693+
OpaquePtrTy, // normal result address
694+
SwiftExecutorTy, // resume to executor
695+
SizeTy // pointer to condition variable
696+
});
697+
} else {
698+
ContinuationAsyncContextTy = createStructType(
699+
*this, "swift.continuation_context",
700+
{SwiftContextTy, // AsyncContext header
701+
SizeTy, // flags
702+
SizeTy, // await synchronization
703+
ErrorPtrTy, // error result pointer
704+
OpaquePtrTy, // normal result address
705+
SwiftExecutorTy // resume to executor
706+
});
707+
}
694708
ContinuationAsyncContextPtrTy =
695709
ContinuationAsyncContextTy->getPointerTo(DefaultAS);
696710

stdlib/public/Concurrency/Actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ class DefaultActorImpl : public HeapObject {
845845
// the future
846846
alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
847847
#endif
848-
// TODO(rokhinip): Make this a flagset
848+
// TODO (rokhinip): Make this a flagset
849849
bool isDistributedRemoteActor;
850850

851851
public:

stdlib/public/Concurrency/Task.cpp

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,9 @@ static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
13111311
? ContinuationStatus::Awaited
13121312
: ContinuationStatus::Pending,
13131313
std::memory_order_relaxed);
1314-
1314+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1315+
context->Cond = nullptr;
1316+
#endif
13151317
AsyncTask *task;
13161318

13171319
// A preawait immediately suspends the task.
@@ -1351,8 +1353,7 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
13511353
"awaiting a corrupt or already-awaited continuation");
13521354

13531355
// If the status is already Resumed, we can resume immediately.
1354-
// Comparing against Pending may be very slightly more compact.
1355-
if (oldStatus != ContinuationStatus::Pending) {
1356+
if (oldStatus == ContinuationStatus::Resumed) {
13561357
if (context->isExecutorSwitchForced())
13571358
return swift_task_switch(context, context->ResumeParent,
13581359
context->ResumeToExecutor);
@@ -1364,28 +1365,72 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
13641365
auto task = swift_task_getCurrent();
13651366
#endif
13661367

1368+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1369+
// In the task to thread model, we do not suspend the task that is waiting on
1370+
// the continuation resumption. Instead we simply block the thread on a
1371+
// condition variable keep the task alive on the thread.
1372+
//
1373+
// This condition variable can be allocated on the stack of the blocking
1374+
// thread - with the address of it published to the resuming thread via the
1375+
// context.
1376+
ConditionVariable Cond;
1377+
1378+
context->Cond = &Cond;
1379+
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
13671380
// Flag the task as suspended.
13681381
task->flagAsSuspended();
1382+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
13691383

1370-
// Try to transition to Awaited.
1384+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1385+
// If the cmpxchg is successful, the store release also publishes the write to
1386+
// the Cond in the ContinuationAsyncContext to any concurrent accessing
1387+
// thread.
1388+
//
1389+
// If it failed, then someone concurrently resumed the continuation in which
1390+
// case, we don't care about publishing the Cond in the
1391+
// ContinuationAsyncContext anyway.
1392+
#endif
1393+
// Try to transition to Awaited
13711394
bool success =
13721395
sync.compare_exchange_strong(oldStatus, ContinuationStatus::Awaited,
13731396
/*success*/ std::memory_order_release,
13741397
/*failure*/ std::memory_order_acquire);
13751398

1376-
// If that succeeded, we have nothing to do.
13771399
if (success) {
1400+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1401+
// This lock really protects nothing but we need to hold it
1402+
// while calling the condition wait
1403+
Cond.lock();
1404+
1405+
// Condition variables can have spurious wakeups so we need to check this in
1406+
// a do-while loop.
1407+
do {
1408+
Cond.wait();
1409+
oldStatus = sync.load(std::memory_order_relaxed);
1410+
} while (oldStatus != ContinuationStatus::Resumed);
1411+
1412+
Cond.unlock();
1413+
#else
1414+
// If that succeeded, we have nothing to do since we've successfully
1415+
// suspended the task
13781416
_swift_task_clearCurrent();
13791417
return;
1418+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
13801419
}
13811420

13821421
// If it failed, it should be because someone concurrently resumed
13831422
// (note that the compare-exchange above is strong).
13841423
assert(oldStatus == ContinuationStatus::Resumed &&
13851424
"continuation was concurrently corrupted or awaited");
13861425

1426+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1427+
// Since the condition variable is stack allocated, we don't need to do
1428+
// anything here to clean up
1429+
#else
13871430
// Restore the running state of the task and resume it.
13881431
task->flagAsRunning();
1432+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
1433+
13891434
if (context->isExecutorSwitchForced())
13901435
return swift_task_switch(context, context->ResumeParent,
13911436
context->ResumeToExecutor);
@@ -1397,6 +1442,7 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
13971442
continuationChecking::willResume(context);
13981443

13991444
auto &sync = context->AwaitSynchronization;
1445+
14001446
auto status = sync.load(std::memory_order_acquire);
14011447
assert(status != ContinuationStatus::Resumed &&
14021448
"continuation was already resumed");
@@ -1405,27 +1451,41 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
14051451
// restarting.
14061452
_swift_tsan_release(static_cast<Job *>(task));
14071453

1408-
// The status should be either Pending or Awaited. If it's Awaited,
1409-
// which is probably the most likely option, then we should immediately
1410-
// enqueue; we don't need to update the state because there shouldn't
1411-
// be a racing attempt to resume the continuation. If it's Pending,
1412-
// we need to set it to Resumed; if that fails (with a strong cmpxchg),
1413-
// it should be because the original thread concurrently set it to
1414-
// Awaited, and so we need to enqueue.
1454+
// The status should be either Pending or Awaited.
1455+
//
1456+
// Case 1: Status is Pending
1457+
// No one has awaited us, we just need to set it to Resumed; if that fails
1458+
// (with a strong cmpxchg), it should be because the original thread
1459+
// concurrently set it to Awaited, in which case, we fall into Case 2.
1460+
//
1461+
// Case 2: Status is Awaited
1462+
// This is probably the more frequently hit case.
1463+
// In task-to-thread model, we update status to be Resumed and signal the
1464+
// waiting thread. In regular model, we immediately enqueue the task and can
1465+
// skip updates to the continuation state since there shouldn't be a racing
1466+
// attempt to resume the continuation.
14151467
if (status == ContinuationStatus::Pending &&
14161468
sync.compare_exchange_strong(status, ContinuationStatus::Resumed,
14171469
/*success*/ std::memory_order_release,
1418-
/*failure*/ std::memory_order_relaxed)) {
1470+
/*failure*/ std::memory_order_acquire)) {
14191471
return;
14201472
}
14211473
assert(status == ContinuationStatus::Awaited &&
14221474
"detected concurrent attempt to resume continuation");
1475+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1476+
// If we see status == ContinuationStatus::Awaited, then we should also be
1477+
// seeing a pointer to the cond var since we're doing a load acquire on sync
1478+
// which pairs with the store release in swift_continuation_awaitImpl
1479+
assert(context->Cond != nullptr);
14231480

1481+
sync.store(ContinuationStatus::Resumed, std::memory_order_relaxed);
1482+
context->Cond->signal();
1483+
#else
14241484
// TODO: maybe in some mode we should set the status to Resumed here
14251485
// to make a stronger best-effort attempt to catch racing attempts to
14261486
// resume the continuation?
1427-
14281487
task->flagAsAndEnqueueOnExecutor(context->ResumeToExecutor);
1488+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
14291489
}
14301490

14311491
SWIFT_CC(swift)
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// RUN: %target-run-simple-swift(-parse-as-library -Xfrontend -disable-availability-checking -Xfrontend -concurrency-model=task-to-thread -g -Xlinker -object_path_lto -Xlinker /tmp/abc.o)
2+
// REQUIRES: concurrency
3+
// REQUIRES: executable_test
4+
// REQUIRES: concurrency_runtime
5+
// REQUIRES: freestanding
6+
// UNSUPPORTED: threading_none
7+
8+
@_spi(_TaskToThreadModel) import _Concurrency
9+
import StdlibUnittest
10+
import Darwin
11+
12+
var globalContinuation : CheckedContinuation<Int, Never>? = nil
13+
14+
func waitOnContinuation(_unused : UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
15+
Task.runInline {
16+
let result = await withCheckedContinuation { continuation in
17+
globalContinuation = continuation
18+
}
19+
print("Continuation successfully resumed")
20+
expectEqual(result, 10)
21+
}
22+
return nil
23+
}
24+
25+
func resumeContinuation(_unused : UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
26+
Task.runInline {
27+
while (globalContinuation == nil) {}
28+
globalContinuation!.resume(returning: 10)
29+
}
30+
return nil
31+
}
32+
33+
@main struct Main {
34+
static func main() {
35+
36+
let tests = TestSuite("Continuations in task-to-thread")
37+
tests.test("Basic continuations - no blocking") {
38+
Task.runInline {
39+
await withCheckedContinuation { continuation in
40+
continuation.resume()
41+
}
42+
}
43+
}
44+
45+
tests.test("Continuations - with blocking") {
46+
var thread1 : pthread_t? = nil
47+
guard pthread_create(&thread1, nil, waitOnContinuation, nil) == 0 else {
48+
fatalError("pthread_create failed")
49+
}
50+
51+
var thread2 : pthread_t? = nil
52+
guard pthread_create(&thread2, nil, resumeContinuation, nil) == 0 else {
53+
fatalError("pthread_create failed")
54+
}
55+
56+
guard pthread_join(thread1!, nil) == 0 else {
57+
fatalError("pthread_join failed")
58+
}
59+
guard pthread_join(thread2!, nil) == 0 else {
60+
fatalError("pthread_join failed")
61+
}
62+
}
63+
64+
runAllTests()
65+
}
66+
}

0 commit comments

Comments
 (0)