Skip to content

Commit 9e1ecc5

Browse files
committed
[Concurrency] guard offer/poll with a lock for now; cleanups
1 parent 7b37554 commit 9e1ecc5

16 files changed

+250
-515
lines changed

include/swift/ABI/Task.h

Lines changed: 85 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
#include "swift/ABI/MetadataValues.h"
2525
#include "swift/Runtime/Config.h"
2626
#include "swift/Basic/STLExtras.h"
27-
#include "TaskQueues.h"
2827
#include "bitset"
2928
#include "string"
29+
#include "queue"
3030

3131
namespace swift {
3232
class AsyncTask;
@@ -43,11 +43,11 @@ class alignas(2 * alignof(void*)) Job {
4343
enum {
4444
/// The next waiting task link, an AsyncTask that is waiting on a future.
4545
NextWaitingTaskIndex = 0,
46-
/// The next completed task link, an AsyncTask that is completed however
47-
/// has not been polled yet (by `group.next()`), so the channel task keeps
48-
/// the list in completion order, such that they can be polled out one by
49-
/// one.
50-
NextChannelCompletedTaskIndex = 1,
46+
// /// The next completed task link, an AsyncTask that is completed however
47+
// /// has not been polled yet (by `group.next()`), so the channel task keeps
48+
// /// the list in completion order, such that they can be polled out one by
49+
// /// one.
50+
// NextChannelCompletedTaskIndex = 1,
5151
};
5252

5353
public:
@@ -223,7 +223,7 @@ class AsyncTask : public HeapObject, public Job {
223223
class GroupFragment {
224224
public:
225225
/// Describes the status of the channel.
226-
enum class ReadyQueueStatus : uintptr_t {
226+
enum class ReadyStatus : uintptr_t {
227227
/// The channel is empty, no tasks are pending.
228228
/// Return immediately, there is no point in suspending.
229229
///
@@ -238,23 +238,12 @@ class AsyncTask : public HeapObject, public Job {
238238
Error = 0b11,
239239
};
240240

241-
/// Describes the status of the future.
242-
///
243-
/// Futures always begin in the "Executing" state, and will always
244-
/// make a single state change to either Success or Error.
241+
/// Describes the status of the waiting task that is suspended on `next()`.
245242
enum class WaitStatus : uintptr_t {
246-
/// The storage is not accessible.
247-
Executing = 0,
248-
249-
/// The future has completed with result (of type \c resultType).
250-
Success = 1,
251-
252-
/// The future has completed by throwing an error (an \c Error
253-
/// existential).
254-
Error = 2,
243+
Waiting = 0,
255244
};
256245

257-
enum class ChannelPollStatus : uintptr_t {
246+
enum class GroupPollStatus : uintptr_t {
258247
/// The channel is known to be empty and we can immediately return nil.
259248
Empty = 0,
260249

@@ -271,7 +260,7 @@ class AsyncTask : public HeapObject, public Job {
271260

272261
/// The result of waiting on a Channel (TaskGroup).
273262
struct GroupPollResult {
274-
ChannelPollStatus status; // TODO: pack it into storage pointer or not worth it?
263+
GroupPollStatus status; // TODO: pack it into storage pointer or not worth it?
275264

276265
/// Storage for the result of the future.
277266
///
@@ -292,18 +281,18 @@ class AsyncTask : public HeapObject, public Job {
292281
AsyncTask *retainedTask;
293282

294283
bool isStorageAccessible() {
295-
return status == ChannelPollStatus::Success ||
296-
status == ChannelPollStatus::Error ||
297-
status == ChannelPollStatus::Empty;
284+
return status == GroupPollStatus::Success ||
285+
status == GroupPollStatus::Error ||
286+
status == GroupPollStatus::Empty;
298287
}
299288

300289
static GroupPollResult get(AsyncTask *asyncTask, bool hadErrorResult,
301290
bool needsSwiftRelease) {
302291
auto fragment = asyncTask->futureFragment();
303292
return GroupPollResult{
304293
/*status*/ hadErrorResult ?
305-
GroupFragment::ChannelPollStatus::Error :
306-
GroupFragment::ChannelPollStatus::Success,
294+
GroupFragment::GroupPollStatus::Error :
295+
GroupFragment::GroupPollStatus::Success,
307296
/*storage*/ hadErrorResult ?
308297
reinterpret_cast<OpaqueValue *>(fragment->getError()) :
309298
fragment->getStoragePtr(),
@@ -321,15 +310,15 @@ class AsyncTask : public HeapObject, public Job {
321310

322311
uintptr_t storage;
323312

324-
ReadyQueueStatus getStatus() const {
325-
return static_cast<ReadyQueueStatus>(storage & statusMask);
313+
ReadyStatus getStatus() const {
314+
return static_cast<ReadyStatus>(storage & statusMask);
326315
}
327316

328317
AsyncTask *getTask() const {
329318
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
330319
}
331320

332-
static ReadyQueueItem get(ReadyQueueStatus status, AsyncTask *task) {
321+
static ReadyQueueItem get(ReadyStatus status, AsyncTask *task) {
333322
assert(task == nullptr || task->isFuture());
334323
return ReadyQueueItem{
335324
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
@@ -379,7 +368,7 @@ class AsyncTask : public HeapObject, public Job {
379368
}
380369

381370
unsigned int waitingTasks() {
382-
return status & maskWaiting; // consider only `maskWaiting` bits
371+
return status & maskWaiting;
383372
}
384373

385374
bool isEmpty() {
@@ -416,20 +405,55 @@ class AsyncTask : public HeapObject, public Job {
416405
};
417406
};
418407

408+
template<typename T>
409+
class NaiveQueue {
410+
std::queue<T> queue;
411+
412+
public:
413+
NaiveQueue() = default;
414+
NaiveQueue(const NaiveQueue<T> &) = delete ;
415+
NaiveQueue& operator=(const NaiveQueue<T> &) = delete ;
416+
417+
NaiveQueue(NaiveQueue<T>&& other) {
418+
queue = std::move(other.queue);
419+
}
420+
421+
virtual ~NaiveQueue() { }
422+
423+
bool dequeue(T &output) {
424+
if (queue.empty()) {
425+
return false;
426+
}
427+
output = queue.front();
428+
queue.pop();
429+
return true;
430+
}
431+
432+
void enqueue(const T item) {
433+
queue.push(item);
434+
}
435+
436+
};
437+
419438
private:
420439

440+
// TODO: move to lockless via the status atomic
441+
mutable std::mutex mutex;
442+
421443
/// Used for queue management, counting number of waiting and ready tasks
422-
// TODO: we likely can collapse these into the wait queue if we try hard enough?
423-
// but we'd lose the ability to get counts I think.
424444
std::atomic<unsigned long> status;
425445

426446
/// Queue containing completed tasks offered into this channel.
427447
///
428448
/// The low bits contain the status, the rest of the pointer is the
429449
/// AsyncTask.
430-
// mpsc_queue_t<ReadyQueueItem> readyQueue;
431-
MutexQueue<ReadyQueueItem> readyQueue;
432-
// TODO: Try the same queue strategy as the waitQueue
450+
NaiveQueue<ReadyQueueItem> readyQueue;
451+
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
452+
453+
// NOTE: this style of "queue" is not very nice for the group,
454+
// because it acts more like a stack, and we really want completion order
455+
// for the task group, thus not using this style (which the wait queue does)
456+
// std::atomic<ReadyQueueItem> readyQueue;
433457

434458
/// Queue containing all of the tasks that are waiting in `get()`.
435459
///
@@ -440,15 +464,14 @@ class AsyncTask : public HeapObject, public Job {
440464
/// AsyncTask.
441465
std::atomic<WaitQueueItem> waitQueue;
442466

443-
// Trailing storage for the result itself. The storage will be uninitialized.
444-
// Use the `readyQueue` to poll for values from the channel instead.
445467
friend class AsyncTask;
446468

447469
public:
448470
explicit GroupFragment()
449471
: status(GroupStatus::initial().status),
450472
readyQueue(),
451-
waitQueue(WaitQueueItem::get(WaitStatus::Executing, nullptr)) {}
473+
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
474+
waitQueue(WaitQueueItem::get(WaitStatus::Waiting, nullptr)) {}
452475

453476
/// Destroy the storage associated with the channel.
454477
void destroy();
@@ -460,35 +483,29 @@ class AsyncTask : public HeapObject, public Job {
460483

461484
GroupStatus statusLoad() {
462485
return GroupStatus {
463-
// status.load(std::memory_order_relaxed)
464-
status.load(std::memory_order_acquire)
486+
// status.load(std::memory_order_acquire)
487+
status.load(std::memory_order_seq_cst) // TODO: acquire instead
465488
};
466489
}
467490

468491
/// Returns *assumed* new status, including the just performed +1.
469-
GroupStatus statusAddReadyTaskLoad() {
470-
auto s = GroupStatus {
471-
// status.fetch_add(GroupStatus::oneReadyTask, std::memory_order_relaxed)
472-
status.fetch_add(GroupStatus::oneReadyTask, std::memory_order_release) + GroupStatus::oneReadyTask
473-
};
492+
GroupStatus statusAddReadyTaskAcquire() {
493+
auto old = status.fetch_add(GroupStatus::oneReadyTask, std::memory_order_acquire);
494+
auto s = GroupStatus {old + GroupStatus::oneReadyTask };
474495
assert(s.readyTasks() <= s.pendingTasks());
475496
return s;
476497
}
477498

478499
/// Returns *assumed* new status, including the just performed +1.
479-
GroupStatus statusAddPendingTaskLoad() {
480-
return GroupStatus {
481-
// status.fetch_add(GroupStatus::onePendingTask, std::memory_order_relaxed)
482-
status.fetch_add(GroupStatus::onePendingTask, std::memory_order_release) + GroupStatus::onePendingTask
483-
};
500+
GroupStatus statusAddPendingTaskRelaxed() {
501+
auto old = status.fetch_add(GroupStatus::onePendingTask, std::memory_order_relaxed);
502+
return GroupStatus {old + GroupStatus::onePendingTask };
484503
}
485504

486505
/// Returns *assumed* new status, including the just performed +1.
487-
GroupStatus statusAddWaitingTaskLoad() {
488-
return GroupStatus {
489-
// status.fetch_add(GroupStatus::oneWaitingTask, std::memory_order_relaxed)
490-
status.fetch_add(GroupStatus::oneWaitingTask, std::memory_order_release) + GroupStatus::oneWaitingTask
491-
};
506+
GroupStatus statusAddWaitingTaskAcquire() {
507+
auto old = status.fetch_add(GroupStatus::oneWaitingTask, std::memory_order_acquire);
508+
return GroupStatus { old + GroupStatus::oneWaitingTask };
492509
}
493510

494511
/// Remove waiting task, without taking any pending task.
@@ -526,22 +543,16 @@ class AsyncTask : public HeapObject, public Job {
526543

527544
/// Offer result of a task into this channel.
528545
/// The value is enqueued at the end of the channel.
529-
///
530-
/// Upon enqueue, any waiting tasks will be scheduled on the given executor. // TODO: not precisely right
531-
void
532-
groupOffer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
533-
534-
// FutureFragment::Status
535-
// AsyncTask::waitGroupNext(AsyncTask *waitingTask);
546+
void groupOffer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
536547

537548
/// Attempt to dequeue ready tasks and complete the waitingTask.
538549
///
539550
/// If unable to complete the waiting task immediately (with an readily
540-
/// available completed task), either return ChannelPollStatus::Empty if it is known
541-
/// that no tasks are in flight, or ChannelPollStatus::Waiting if there are
542-
/// tasks in flight and we'll eventually be woken up by a completion.
543-
GroupFragment::GroupPollResult
544-
groupPoll(AsyncTask *waitingTask);
551+
/// available completed task), either returns an `GroupPollStatus::Empty`
552+
/// result if it is known that no pending tasks in the group,
553+
/// or a `GroupPollStatus::Waiting` result if there are tasks in flight
554+
/// and the waitingTask eventually be woken up by a completion.
555+
GroupFragment::GroupPollResult groupPoll(AsyncTask *waitingTask);
545556

546557
// ==== TaskGroup Child ------------------------------------------------------
547558

@@ -695,13 +706,12 @@ class AsyncTask : public HeapObject, public Job {
695706
SchedulerPrivate[NextWaitingTaskIndex]);
696707
}
697708

698-
/// Access the next completed task, which establishes a singly linked list of
699-
/// tasks that are waiting to be polled from a task group channel.
700-
// FIXME: remove and replace with a fifo queue in the Channel task itself.
701-
AsyncTask *&getNextChannelCompletedTask() {
702-
return reinterpret_cast<AsyncTask *&>(
703-
SchedulerPrivate[NextChannelCompletedTaskIndex]);
704-
}
709+
// /// Access the next completed task, which establishes a singly linked list of
710+
// /// tasks that are waiting to be polled from a task group channel.
711+
// AsyncTask *&getNextChannelReadyTask() {
712+
// return reinterpret_cast<AsyncTask *&>(
713+
// SchedulerPrivate[NextChannelCompletedTaskIndex]);
714+
// }
705715
};
706716

707717
// The compiler will eventually assume these.
@@ -807,10 +817,6 @@ class FutureAsyncContext : public AsyncContext {
807817
SwiftError *errorResult = nullptr;
808818
OpaqueValue *indirectResult;
809819

810-
811-
// TODO: this is to support "offer into queue on complete"
812-
AsyncContext *parentChannel = nullptr; // TODO: no idea if we need this or not
813-
814820
using AsyncContext::AsyncContext;
815821
};
816822

include/swift/Runtime/Concurrency.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,12 @@ swift_task_group_wait_next;
161161
///
162162
/// \code
163163
/// func swift_task_group_add_pending(
164-
/// _ groupTask: Builtin.NativeObject),
165-
/// _ childTask: Builtin.NativeObject
164+
/// _ groupTask: Builtin.NativeObject)
166165
/// )
167166
/// \endcode
168167
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
169168
void
170-
swift_task_group_add_pending(AsyncTask *groupTask, AsyncTask *childTask);
171-
172-
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
173-
void
174-
swift_task_print_ID(const char* name, const char* file, int line, AsyncTask *task);
169+
swift_task_group_add_pending(AsyncTask *groupTask);
175170

176171
/// Check the readyQueue of a Channel, return true if it has no pending tasks.
177172
///

lib/IRGen/IRGenSIL.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,8 +2279,9 @@ void IRGenSILFunction::visitFunctionRefBaseInst(FunctionRefBaseInst *i) {
22792279
isa<PreviousDynamicFunctionRefInst>(i));
22802280
llvm::Value *value;
22812281
auto isSpecialAsyncWithoutCtxtSize =
2282-
fn->isAsync() && (fn->getName().equals("swift_task_future_wait")
2283-
|| fn->getName().equals("swift_task_group_wait_next"));
2282+
fn->isAsync() && (
2283+
fn->getName().equals("swift_task_future_wait") ||
2284+
fn->getName().equals("swift_task_group_wait_next"));
22842285
if (fn->isAsync() && !isSpecialAsyncWithoutCtxtSize) {
22852286
value = IGM.getAddrOfAsyncFunctionPointer(fn);
22862287
value = Builder.CreateBitCast(value, fnPtr->getType());

0 commit comments

Comments
 (0)