Skip to content

Commit 520b513

Browse files
committed
[Concurrency] Task: isCancelled,checkCancelled implementation
move comments to the wired up continuations remove duplicated continuations; leep the wired up ones before moving to C++ for queue impl trying to next wait via channel_poll submitting works; need to impl next()
1 parent 9162d40 commit 520b513

21 files changed

+1432
-170
lines changed

docs/SIL.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ the coroutine until the continuation is invoked to resume it. A use of
656656

657657
func waitForCallback() async -> Int {
658658
return await withUnsafeContinuation { cc in
659-
registerCallback { cc.resume($0) }
659+
registerCallback { cc.resume(returning: $0) }
660660
}
661661
}
662662

include/swift/ABI/MetadataValues.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,8 +1925,15 @@ class JobFlags : public FlagSet<size_t> {
19251925

19261926
// Kind-specific flags.
19271927

1928-
Task_IsChildTask = 24,
1929-
Task_IsFuture = 25
1928+
Task_IsChildTask = 24,
1929+
Task_IsFuture = 25,
1930+
// A TaskGroup is a Channel Task which allows children to offer values into
1931+
// it as they complete
1932+
Task_IsChannel = 26,
1933+
// A child task started in a TaskGroup.
1934+
// Its parent MUST be the group's channel task,
1935+
// and the child task MUST offer its completion value into it when it completes.
1936+
Task_IsGroupChild = 27,
19301937
};
19311938

19321939
explicit JobFlags(size_t bits) : FlagSet(bits) {}
@@ -1954,6 +1961,13 @@ class JobFlags : public FlagSet<size_t> {
19541961
task_isFuture,
19551962
task_setIsFuture)
19561963

1964+
FLAGSET_DEFINE_FLAG_ACCESSORS(Task_IsChannel,
1965+
task_isChannel,
1966+
task_setIsChannel)
1967+
FLAGSET_DEFINE_FLAG_ACCESSORS(Task_IsGroupChild,
1968+
task_isGroupChild,
1969+
task_setGroupChild)
1970+
19571971
};
19581972

19591973
/// Kinds of task status record.

include/swift/ABI/Task.h

Lines changed: 202 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ class AsyncTask : public HeapObject, public Job {
161161
ResumeTask(this, currentExecutor, ResumeContext);
162162
}
163163

164-
/// Check whether this task has been cancelled. Checking this is,
165-
/// of course, inherently race-prone on its own.
164+
/// Check whether this task has been cancelled.
165+
/// Checking this is, of course, inherently race-prone on its own.
166166
bool isCancelled() const {
167167
return Status.load(std::memory_order_relaxed).isCancelled();
168168
}
@@ -190,12 +190,16 @@ class AsyncTask : public HeapObject, public Job {
190190
}
191191
};
192192

193+
// TODO: rename? all other functions are is... rather than has...Fragment
193194
bool hasChildFragment() const { return Flags.task_isChildTask(); }
195+
194196
ChildFragment *childFragment() {
195197
assert(hasChildFragment());
196198
return reinterpret_cast<ChildFragment*>(this + 1);
197199
}
198200

201+
// ==== Future ---------------------------------------------------------------
202+
199203
class FutureFragment {
200204
public:
201205
/// Describes the status of the future.
@@ -248,7 +252,7 @@ class AsyncTask : public HeapObject, public Job {
248252
const Metadata *resultType;
249253

250254
// Trailing storage for the result itself. The storage will be uninitialized,
251-
// contain an instance of \c resultType, or contaon an an \c Error.
255+
// contain an instance of \c resultType, or contain an an \c Error.
252256

253257
friend class AsyncTask;
254258

@@ -315,6 +319,197 @@ class AsyncTask : public HeapObject, public Job {
315319
/// executor.
316320
void completeFuture(AsyncContext *context, ExecutorRef executor);
317321

322+
// ==== Channel --------------------------------------------------------------
323+
324+
class ChannelFragment {
325+
public:
326+
/// Describes the status of the channel.
327+
enum class ReadyQueueStatus : uintptr_t {
328+
/// The channel is empty, no tasks are pending.
329+
/// Return immediately, there is no point in suspending.
330+
///
331+
/// The storage is not accessible.
332+
Empty = 0,
333+
334+
/// The channel has pending tasks
335+
///
336+
/// The storage is not accessible.
337+
Pending = 1,
338+
339+
/// The future has completed with result (of type \c resultType).
340+
Success = 2,
341+
342+
/// The future has completed by throwing an error (an \c Error
343+
/// existential).
344+
Error = 3,
345+
346+
// /// No tasks are ready, yet there exist pending tasks (they were
347+
// /// `add()`-ed and not completed yet). Suspend and await a wake-up
348+
// /// when any task completes.
349+
// Pending = 1,
350+
//
351+
// /// At least one task is completed and waiting in the queue.
352+
// /// Pull immediately without suspending to obtain it.
353+
// Ready = 2,
354+
};
355+
356+
/// Describes the status of the future.
357+
///
358+
/// Futures always begin in the "Executing" state, and will always
359+
/// make a single state change to either Success or Error.
360+
enum class Status : uintptr_t {
361+
/// No-one is waiting.
362+
/// The storage is not accessible.
363+
Pending = 0,
364+
365+
/// The future has completed with result (of type \c resultType).
366+
Success = 1,
367+
368+
/// The future has completed by throwing an error (an \c Error
369+
/// existential).
370+
Error = 2,
371+
};
372+
373+
/// An item within the message queue of a channel.
374+
struct ReadyQueueItem {
375+
/// Mask used for the low status bits in a message queue item.
376+
static const uintptr_t statusMask = 0x03;
377+
378+
uintptr_t storage;
379+
380+
ReadyQueueStatus getStatus() const {
381+
return static_cast<ReadyQueueStatus>(storage & statusMask);
382+
}
383+
384+
AsyncTask *getTask() const {
385+
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
386+
}
387+
388+
static ReadyQueueItem get(ReadyQueueStatus status, AsyncTask *task) {
389+
return ReadyQueueItem{
390+
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
391+
}
392+
};
393+
394+
/// An item within the wait queue, which includes the status and the
395+
/// head of the list of tasks.
396+
struct WaitQueueItem {
397+
/// Mask used for the low status bits in a wait queue item.
398+
static const uintptr_t statusMask = 0x03;
399+
400+
uintptr_t storage;
401+
402+
Status getStatus() const {
403+
return static_cast<Status>(storage & statusMask);
404+
}
405+
406+
AsyncTask *getTask() const {
407+
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
408+
}
409+
410+
static WaitQueueItem get(Status status, AsyncTask *task) {
411+
return WaitQueueItem{
412+
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
413+
}
414+
};
415+
416+
private:
417+
// TODO we likely can collapse these into one queue if we try hard enough
418+
419+
/// Queue containing completed tasks offered into this channel.
420+
///
421+
/// The low bits contain the status, the rest of the pointer is the
422+
/// AsyncTask.
423+
std::atomic<ReadyQueueItem> readyQueue;
424+
425+
/// Queue containing all of the tasks that are waiting in `get()`.
426+
///
427+
/// The low bits contain the status, the rest of the pointer is the
428+
/// AsyncTask.
429+
std::atomic<WaitQueueItem> waitQueue;
430+
431+
/// The type of the result that will be produced by the channel.
432+
const Metadata *resultType;
433+
434+
// FIXME: seems shady...?
435+
// Trailing storage for the result itself. The storage will be uninitialized.
436+
// Use the `readyQueue` to poll for values from the channel instead.
437+
friend class AsyncTask; // TODO: remove this?
438+
439+
public:
440+
explicit ChannelFragment(const Metadata *resultType)
441+
: readyQueue(ReadyQueueItem::get(ReadyQueueStatus::Empty, nullptr)),
442+
waitQueue(WaitQueueItem::get(Status::Pending, nullptr)),
443+
resultType(resultType) { }
444+
445+
/// Destroy the storage associated with the channel.
446+
void destroy();
447+
448+
// /// Offer a completed task to the channel to enable polling for it.
449+
// void offer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
450+
//
451+
// /// Poll the channel for any
452+
// ChannelFragment::ReadyQueueStatus channelPoll(AsyncTask *waitingTask);
453+
454+
// /// Retrieve a pointer to the storage of result.
455+
// OpaqueValue *getStoragePtr() {
456+
// return reinterpret_cast<OpaqueValue *>(
457+
// reinterpret_cast<char *>(this) + storageOffset(resultType));
458+
// }
459+
//
460+
// /// Retrieve the error.
461+
// SwiftError *&getError() {
462+
// return *reinterpret_cast<SwiftError **>(
463+
// reinterpret_cast<char *>(this) + storageOffset(resultType));
464+
// }
465+
466+
// /// Compute the offset of the storage from the base of the channel
467+
// /// fragment.
468+
// static size_t storageOffset(const Metadata *resultType) {
469+
// size_t offset = sizeof(ChannelFragment);
470+
// size_t alignment =
471+
// std::max(resultType->vw_alignment(), alignof(SwiftError *));
472+
// return (offset + alignment - 1) & ~(alignment - 1);
473+
// }
474+
//
475+
// /// Determine the size of the channel fragment given a particular channel
476+
// /// result type.
477+
// static size_t fragmentSize(const Metadata *resultType) {
478+
// return storageOffset(resultType) +
479+
// std::max(resultType->vw_size(), sizeof(SwiftError *));
480+
// }
481+
};
482+
483+
bool isChannel() const { return Flags.task_isChannel(); }
484+
485+
ChannelFragment *channelFragment() {
486+
assert(isChannel());
487+
// FIXME: isn't it also a future at the same time?
488+
if (hasChildFragment()) { // TODO: make sure those all are correct; we can have many fragments
489+
return reinterpret_cast<ChannelFragment *>(
490+
reinterpret_cast<ChildFragment*>(this + 1) + 1);
491+
}
492+
493+
return reinterpret_cast<ChannelFragment *>(this + 1);
494+
}
495+
496+
/// Offer result of a task into this channel.
497+
/// The value is enqueued at the end of the channel.
498+
///
499+
/// Upon enqueue, any waiting tasks will be scheduled on the given executor. // TODO: not precisely right
500+
void channelOffer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
501+
502+
/// Wait for for channel to become non-empty.
503+
///
504+
/// \returns the status of the queue. TODO more docs
505+
ChannelFragment::ReadyQueueStatus channelPoll(AsyncTask *waitingTask);
506+
507+
// ==== ----------------------------------------------------------------------
508+
509+
bool isGroupChild() const { return Flags.task_isGroupChild(); }
510+
511+
// ==== ----------------------------------------------------------------------
512+
318513
static bool classof(const Job *job) {
319514
return job->isAsyncTask();
320515
}
@@ -431,6 +626,10 @@ class FutureAsyncContext : public AsyncContext {
431626
SwiftError *errorResult = nullptr;
432627
OpaqueValue *indirectResult;
433628

629+
630+
// TODO: this is to support "offer into queue on complete"
631+
AsyncContext *parentChannel = nullptr; // TODO: no idea if we need this or not
632+
434633
using AsyncContext::AsyncContext;
435634
};
436635

include/swift/Runtime/Concurrency.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,26 @@ struct TaskFutureWaitResult {
133133
OpaqueValue *storage;
134134
};
135135

136+
/// The result of waiting on a Channel (TaskGroup).
137+
struct TaskChannelPollResult {
138+
/// Whether the storage represents an existing value, or "known no value,"
139+
/// instructing the `group.next()` call to return `nil` immediately.
140+
bool hadAnyResult;
141+
142+
/// Whether the storage represents the error result vs. the successful
143+
/// result.
144+
bool hadErrorResult;
145+
146+
/// Storage for the result of the future.
147+
///
148+
/// When the future completed normally, this is a pointer to the storage
149+
/// of the result value, which lives inside the future task itself.
150+
///
151+
/// When the future completed by throwing an error, this is the error
152+
/// object itself.
153+
OpaqueValue *storage;
154+
};
155+
136156
using TaskFutureWaitSignature =
137157
AsyncSignature<TaskFutureWaitResult(AsyncTask *), /*throws*/ false>;
138158

@@ -148,6 +168,18 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync)
148168
TaskFutureWaitSignature::FunctionType
149169
swift_task_future_wait;
150170

171+
/// Wait for a readyQueue of a Channel to become non empty.
172+
///
173+
/// This can be called from any thread. Its Swift signature is
174+
///
175+
/// \code
176+
/// func swift_task_channel_poll(on channelTask: Builtin.NativeObject) async
177+
/// -> RawTaskChannelPollResult?
178+
/// \endcode
179+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
180+
AsyncFunctionType<TaskChannelPollResult(AsyncTask *task)>
181+
swift_task_channel_poll;
182+
151183
/// Add a status record to a task. The record should not be
152184
/// modified while it is registered with a task.
153185
///
@@ -188,6 +220,9 @@ bool swift_task_removeStatusRecord(AsyncTask *task,
188220
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
189221
size_t swift_task_getJobFlags(AsyncTask* task);
190222

223+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
224+
bool swift_task_isCancelled(AsyncTask* task);
225+
191226
/// This should have the same representation as an enum like this:
192227
/// enum NearestTaskDeadline {
193228
/// case none

stdlib/public/Concurrency/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
4242
Task.swift
4343
TaskCancellation.swift
4444
_TimeTypes.swift
45+
__LocksTemporaryWorkaround.swift
4546
TaskAlloc.cpp
4647
TaskStatus.cpp
4748
TaskGroup.swift
49+
TaskChannel.cpp
4850
Mutex.cpp
4951
${swift_concurrency_objc_sources}
5052

0 commit comments

Comments
 (0)