Skip to content

Commit af4b6bc

Browse files
committed
[Concurrency] Add asynchronous Task.sleep function
1 parent f1e532c commit af4b6bc

File tree

4 files changed

+164
-4
lines changed

4 files changed

+164
-4
lines changed

include/swift/Runtime/Concurrency.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,9 @@ void swift_task_enqueue(Job *job, ExecutorRef executor);
486486
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
487487
void swift_task_enqueueGlobal(Job *job);
488488

489+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
490+
void swift_task_enqueueGlobalWithDelay(unsigned long long delay, Job *job);
491+
489492
/// FIXME: only exists for the quick-and-dirty MainActor implementation.
490493
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
491494
void swift_task_enqueueMainExecutor(Job *job);
@@ -499,6 +502,11 @@ void swift_MainActor_register(HeapObject *actor);
499502
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
500503
void (*swift_task_enqueueGlobal_hook)(Job *job);
501504

505+
/// A hook to take over global enqueuing with delay.
506+
/// TODO: figure out a better abstraction plan than this.
507+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
508+
void (*swift_task_enqueueGlobalWithDelay_hook)(unsigned long long delay, Job *job);
509+
502510
/// Initialize the runtime storage for a default actor.
503511
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
504512
void swift_defaultActor_initialize(DefaultActor *actor);

stdlib/public/Concurrency/GlobalExecutor.cpp

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,27 @@ using namespace swift;
6363
SWIFT_CC(swift)
6464
void (*swift::swift_task_enqueueGlobal_hook)(Job *job) = nullptr;
6565

66+
SWIFT_CC(swift)
67+
void (*swift::swift_task_enqueueGlobalWithDelay_hook)(unsigned long long delay, Job *job) = nullptr;
68+
6669
#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR
70+
71+
#include <chrono>
72+
#include <thread>
73+
6774
static Job *JobQueue = nullptr;
6875

76+
class DelayedJob {
77+
public:
78+
Job *job;
79+
unsigned long long when;
80+
DelayedJob *next;
81+
82+
DelayedJob(Job *job, unsigned long long when) : job(job), when(when), next(nullptr) {}
83+
};
84+
85+
static DelayedJob *DelayedJobQueue = nullptr;
86+
6987
/// Get the next-in-queue storage slot.
7088
static Job *&nextInQueue(Job *cur) {
7189
return reinterpret_cast<Job*&>(cur->SchedulerPrivate);
@@ -89,13 +107,58 @@ static void insertIntoJobQueue(Job *newJob) {
89107
*position = newJob;
90108
}
91109

110+
static unsigned long long currentNanos() {
111+
auto now = std::chrono::steady_clock::now();
112+
auto nowNanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
113+
auto value = std::chrono::duration_cast<std::chrono::nanoseconds>(nowNanos.time_since_epoch());
114+
return value.count();
115+
}
116+
117+
/// Insert a job into the cooperative global queue.
118+
static void insertIntoDelayedJobQueue(unsigned long long delay, Job *job) {
119+
DelayedJob **position = &DelayedJobQueue;
120+
DelayedJob *newJob = new DelayedJob(job, currentNanos() + delay);
121+
122+
while (auto cur = *position) {
123+
// If we find a job with lower priority, insert here.
124+
if (cur->when > newJob->when) {
125+
newJob->next = cur;
126+
*position = newJob;
127+
return;
128+
}
129+
130+
// Otherwise, keep advancing through the queue.
131+
position = &cur->next;
132+
}
133+
*position = newJob;
134+
}
135+
92136
/// Claim the next job from the cooperative global queue.
93137
static Job *claimNextFromJobQueue() {
94-
if (auto job = JobQueue) {
95-
JobQueue = nextInQueue(job);
96-
return job;
138+
// Check delayed jobs first
139+
while (true) {
140+
if (auto delayedJob = DelayedJobQueue) {
141+
if (delayedJob->when < currentNanos()) {
142+
DelayedJobQueue = delayedJob->next;
143+
auto job = delayedJob->job;
144+
145+
delete delayedJob;
146+
147+
return job;
148+
}
149+
}
150+
if (auto job = JobQueue) {
151+
JobQueue = nextInQueue(job);
152+
return job;
153+
}
154+
// there are only delayed jobs left, but they are not ready,
155+
// so we sleep until the first one is
156+
if (auto delayedJob = DelayedJobQueue) {
157+
std::this_thread::sleep_for(std::chrono::nanoseconds(delayedJob->when - currentNanos()));
158+
continue;
159+
}
160+
return nullptr;
97161
}
98-
return nullptr;
99162
}
100163

101164
void swift::donateThreadToGlobalExecutorUntil(bool (*condition)(void *),
@@ -177,6 +240,30 @@ void swift::swift_task_enqueueGlobal(Job *job) {
177240
#endif
178241
}
179242

243+
void swift::swift_task_enqueueGlobalWithDelay(unsigned long long delay, Job *job) {
244+
assert(job && "no job provided");
245+
246+
// If the hook is defined, use it.
247+
if (swift_task_enqueueGlobalWithDelay_hook)
248+
return swift_task_enqueueGlobalWithDelay_hook(delay, job);
249+
250+
#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR
251+
insertIntoDelayedJobQueue(delay, job);
252+
#else
253+
254+
dispatch_function_t dispatchFunction = &__swift_run_job;
255+
void *dispatchContext = job;
256+
257+
JobPriority priority = job->getPriority();
258+
259+
// TODO: cache this to avoid the extra call
260+
auto queue = dispatch_get_global_queue((dispatch_qos_class_t) priority,
261+
/*flags*/ 0);
262+
dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delay);
263+
dispatch_after_f(when, queue, dispatchContext, dispatchFunction);
264+
#endif
265+
}
266+
180267

181268
/// Enqueues a task on the main executor.
182269
/// FIXME: only exists for the quick-and-dirty MainActor implementation.

stdlib/public/Concurrency/Task.swift

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,27 @@ public func _runAsyncHandler(operation: @escaping () async -> ()) {
472472
)
473473
}
474474

475+
476+
/// Suspends the current task for _at least_ the given duration
477+
/// in nanoseconds.
478+
///
479+
/// This function does _not_ block the underlying thread.
480+
public static func sleep(_ duration: UInt64) async {
481+
482+
// Set up the job flags for a new task.
483+
var flags = JobFlags()
484+
flags.kind = .task
485+
flags.priority = .default
486+
flags.isFuture = true
487+
488+
// Create the asynchronous task future.
489+
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, {})
490+
491+
// Enqueue the resulting job.
492+
_enqueueJobGlobalWithDelay(duration, Builtin.convertTaskToJob(task))
493+
494+
let _ = await Handle<Void, Never>(task).get()
495+
}
475496
// ==== UnsafeCurrentTask ------------------------------------------------------
476497

477498
extension Task {
@@ -571,6 +592,10 @@ func getJobFlags(_ task: Builtin.NativeObject) -> Task.JobFlags
571592
@usableFromInline
572593
func _enqueueJobGlobal(_ task: Builtin.Job)
573594

595+
@_silgen_name("swift_task_enqueueGlobalWithDelay")
596+
@usableFromInline
597+
func _enqueueJobGlobalWithDelay(_ delay: UInt64, _ task: Builtin.Job)
598+
574599
@available(*, deprecated)
575600
@_silgen_name("swift_task_runAndBlockThread")
576601
public func runAsyncAndBlock(_ asyncFun: @escaping () async -> ())
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library)
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
5+
import _Concurrency
6+
// FIXME: should not depend on Dispatch
7+
import Dispatch
8+
9+
@main struct Main {
10+
static let pause = 500_000_000 // 500ms
11+
12+
static func main() async {
13+
await testSleepDuration()
14+
await testSleepDoesNotBlock()
15+
}
16+
17+
static func testSleepDuration() async {
18+
let start = DispatchTime.now()
19+
20+
await Task.sleep(UInt64(pause))
21+
22+
let stop = DispatchTime.now()
23+
24+
// assert that at least the specified time passed since calling `sleep`
25+
assert(stop >= (start + .nanoseconds(pause)))
26+
}
27+
28+
static func testSleepDoesNotBlock() async {
29+
// FIXME: Should run on main executor
30+
let task = Task.runDetached {
31+
print("Run first")
32+
}
33+
34+
await Task.sleep(UInt64(pause))
35+
36+
print("Run second")
37+
38+
await task.get()
39+
}
40+
}

0 commit comments

Comments
 (0)