Skip to content

Commit b267778

Browse files
committed
Rebased to use new global executor
1 parent 9e1ecc5 commit b267778

19 files changed

+257
-796
lines changed

include/swift/ABI/Task.h

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ class alignas(2 * alignof(void*)) Job {
4343
enum {
4444
/// The next waiting task link, an AsyncTask that is waiting on a future.
4545
NextWaitingTaskIndex = 0,
46-
// /// The next completed task link, an AsyncTask that is completed however
47-
// /// has not been polled yet (by `group.next()`), so the channel task keeps
48-
// /// the list in completion order, such that they can be polled out one by
49-
// /// one.
50-
// NextChannelCompletedTaskIndex = 1,
5146
};
5247

5348
public:
@@ -218,7 +213,7 @@ class AsyncTask : public HeapObject, public Job {
218213
return reinterpret_cast<ChildFragment*>(this + 1);
219214
}
220215

221-
// ==== TaskGroup Channel ----------------------------------------------------
216+
// ==== TaskGroup ------------------------------------------------------------
222217

223218
class GroupFragment {
224219
public:
@@ -450,11 +445,6 @@ class AsyncTask : public HeapObject, public Job {
450445
NaiveQueue<ReadyQueueItem> readyQueue;
451446
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
452447

453-
// NOTE: this style of "queue" is not very nice for the group,
454-
// because it acts more like a stack, and we really want completion order
455-
// for the task group, thus not using this style (which the wait queue does)
456-
// std::atomic<ReadyQueueItem> readyQueue;
457-
458448
/// Queue containing all of the tasks that are waiting in `get()`.
459449
///
460450
/// A task group is also a future, and awaits on the group's result *itself*
@@ -481,13 +471,6 @@ class AsyncTask : public HeapObject, public Job {
481471
return oldStatus.pendingTasks() == 0;
482472
}
483473

484-
GroupStatus statusLoad() {
485-
return GroupStatus {
486-
// status.load(std::memory_order_acquire)
487-
status.load(std::memory_order_seq_cst) // TODO: acquire instead
488-
};
489-
}
490-
491474
/// Returns *assumed* new status, including the just performed +1.
492475
GroupStatus statusAddReadyTaskAcquire() {
493476
auto old = status.fetch_add(GroupStatus::oneReadyTask, std::memory_order_acquire);
@@ -706,12 +689,6 @@ class AsyncTask : public HeapObject, public Job {
706689
SchedulerPrivate[NextWaitingTaskIndex]);
707690
}
708691

709-
// /// Access the next completed task, which establishes a singly linked list of
710-
// /// tasks that are waiting to be polled from a task group channel.
711-
// AsyncTask *&getNextChannelReadyTask() {
712-
// return reinterpret_cast<AsyncTask *&>(
713-
// SchedulerPrivate[NextChannelCompletedTaskIndex]);
714-
// }
715692
};
716693

717694
// The compiler will eventually assume these.

include/swift/Runtime/Concurrency.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ struct TaskFutureWaitResult {
133133
OpaqueValue *storage;
134134
};
135135

136+
using TaskFutureWaitSignature =
137+
AsyncSignature<TaskFutureWaitResult(AsyncTask *), /*throws*/ false>;
138+
136139
/// Wait for a future task to complete.
137140
///
138141
/// This can be called from any thread. Its Swift signature is
@@ -151,10 +154,10 @@ swift_task_future_wait;
151154
///
152155
/// \code
153156
/// func swift_task_group_wait_next(on groupTask: Builtin.NativeObject) async
154-
/// -> RawGroupPollResult?
157+
/// -> (hadErrorResult: Bool, storage: UnsafeRawPointer?)
155158
/// \endcode
156-
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
157-
AsyncFunctionType<TaskFutureWaitResult(AsyncTask *task)>
159+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync)
160+
TaskFutureWaitSignature::FunctionType
158161
swift_task_group_wait_next;
159162

160163
/// This can be called from any thread. Its Swift signature is

stdlib/public/Concurrency/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,8 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
4444
_TimeTypes.swift
4545
TaskAlloc.cpp
4646
TaskStatus.cpp
47-
TaskGroup.swift
48-
TaskQueues.h
4947
TaskGroup.cpp
48+
TaskGroup.swift
5049
Mutex.cpp
5150
${swift_concurrency_objc_sources}
5251

stdlib/public/Concurrency/Task.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ static void completeTask(AsyncTask *task, ExecutorRef executor,
182182

183183
// Release the task, balancing the retain that a running task has on itself.
184184
// If it was a group child task, it will remain until the group returns it.
185-
assert(task && "task to be released MUST NOT BE NULL");
186185
swift_release(task);
187186
}
188187

stdlib/public/Concurrency/Task.swift

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,28 @@ public func _taskFutureGetThrowing<T>(
353353
}
354354

355355
public func _runChildTask<T>(
356+
operation: @escaping () async throws -> T
357+
) async -> Builtin.NativeObject {
358+
let currentTask = Builtin.getCurrentAsyncTask()
359+
360+
// Set up the job flags for a new task.
361+
var flags = Task.JobFlags()
362+
flags.kind = .task
363+
flags.priority = getJobFlags(currentTask).priority
364+
flags.isFuture = true
365+
flags.isChildTask = true
366+
367+
// Create the asynchronous task future.
368+
let (task, _) = Builtin.createAsyncTaskFuture(
369+
flags.bits, currentTask, operation)
370+
371+
// Enqueue the resulting job.
372+
_enqueueJobGlobal(Builtin.convertTaskToJob(task))
373+
374+
return task
375+
}
376+
377+
public func _runGroupChildTask<T>(
356378
overridingPriority priorityOverride: Task.Priority? = nil,
357379
operation: @escaping () async throws -> T
358380
) async -> Builtin.NativeObject {

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 13 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
#include "swift/ABI/Metadata.h"
2020
#include "swift/Runtime/HeapObject.h"
2121
#include "TaskPrivate.h"
22-
#include <iostream>
23-
#include <pthread.h>
24-
#include <stdio.h>
2522

2623
using namespace swift;
2724
using GroupFragment = AsyncTask::GroupFragment;
@@ -34,52 +31,24 @@ using GroupPollResult = GroupFragment::GroupPollResult;
3431
// =============================================================================
3532
// ==== destroy ----------------------------------------------------------------
3633

37-
// FIXME: implement this properly, unsure if this is correct
3834
void GroupFragment::destroy() {
3935
// TODO: need to release all waiters as well
4036
// auto waitHead = waitQueue.load(std::memory_order_acquire);
4137
// switch (waitHead.getStatus()) {
4238
// case GroupFragment::WaitStatus::Waiting:
43-
// assert(false && "destroying a task that never completed");
44-
//
45-
// case GroupFragment::WaitStatus::Success:
46-
// resultType->vw_destroy(getStoragePtr());
47-
// break;
48-
//
49-
// case GroupFragment::WaitStatus::Error:
50-
// swift_unknownObjectRelease(reinterpret_cast<OpaqueValue *>(getError()));
51-
// break;
52-
// }
53-
//
54-
// // FIXME: all tasks still in the readyQueue must be: swift_release(...)
55-
// auto readyHead = readyQueue.load(std::memory_order_acquire);
56-
// while (auto readyTask = readyHead.getTask()) {
57-
// // Find the next waiting task.
58-
// auto newReadyTask = readyTask->getNextChannelReadyTask();
59-
// auto nextReadyQueueItem = GroupFragment::ReadyQueueItem::get(
60-
// GroupFragment::ReadyStatus::Executing,
61-
// newReadyTask
62-
// );
63-
//
64-
// // Attempt to claim it, we are the future that is going to complete it.
65-
// // TODO: there may be other futures trying to do the same right now? FIXME: not really because the status right?
66-
// if (fragment->waitQueue.compare_exchange_weak(readyHead, nextReadyQueueItem,
67-
// /*success*/ std::memory_order_release,
68-
// /*failure*/ std::memory_order_acquire)) {
69-
// switch (readyHead.getStatus()) {
70-
// case GroupFragment::ReadyStatus::Empty:
71-
// return;
72-
//
73-
// case GroupFragment::WaitStatus::Success:
74-
// swift_unknownObjectRelease(reinterpret_cast<OpaqueValue *>(getError()));
75-
// break;
76-
//
77-
// case GroupFragment::WaitStatus::Error:
78-
// swift_unknownObjectRelease(reinterpret_cast<OpaqueValue *>(getError()));
79-
// break;
80-
// }
81-
// }
39+
// assert(false && "destroying a task group that still has waiting tasks");
8240
// }
41+
42+
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
43+
// Release all ready tasks which are kept retained, the group destroyed,
44+
// so no other task will ever await on them anymore;
45+
ReadyQueueItem item;
46+
bool taskDequeued = readyQueue.dequeue(item);
47+
while (taskDequeued) {
48+
swift_release(item.getTask());
49+
bool taskDequeued = readyQueue.dequeue(item);
50+
}
51+
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
8352
}
8453

8554
// =============================================================================
@@ -209,7 +178,7 @@ void swift::swift_task_group_wait_next(
209178
GroupFragment::GroupPollResult AsyncTask::groupPoll(AsyncTask *waitingTask) {
210179
assert(isTaskGroup());
211180
auto fragment = groupFragment();
212-
fragment->mutex.lock();
181+
fragment->mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
213182

214183
// immediately update the status counter
215184
auto assumed = fragment->statusAddWaitingTaskAcquire();

stdlib/public/Concurrency/TaskGroup.swift

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@
1111
////===----------------------------------------------------------------------===//
1212

1313
import Swift
14-
import Dispatch
1514
@_implementationOnly import _SwiftConcurrencyShims
16-
import Darwin
17-
import Foundation
1815

1916
// ==== Task Group -------------------------------------------------------------
2017

@@ -78,7 +75,6 @@ extension Task {
7875
groupFlags.isTaskGroup = true
7976
groupFlags.isFuture = true
8077

81-
// 1. Prepare the Group task
8278
let (groupTask, _) =
8379
Builtin.createAsyncTaskFuture(groupFlags.bits, parent) { () async throws -> BodyResult in
8480
let task = Builtin.getCurrentAsyncTask()
@@ -98,15 +94,11 @@ extension Task {
9894

9995
return result
10096
}
101-
let groupHandle = Handle<BodyResult>(task: groupTask)
10297

103-
// 2.0) Run the task!
104-
DispatchQueue.global(priority: .default).async { // FIXME: use executors when they land
105-
groupHandle.run()
106-
}
98+
// Enqueue the resulting job.
99+
_enqueueJobGlobal(Builtin.convertTaskToJob(groupTask))
107100

108-
// 2.2) Await the group completing
109-
return await try groupHandle.get()
101+
return await try Handle<BodyResult>(task: groupTask).get()
110102
}
111103

112104
/// A task group serves as storage for dynamically started tasks.
@@ -121,6 +113,7 @@ extension Task {
121113
/// No public initializers
122114
init(task: Builtin.NativeObject) {
123115
self.task = task
116+
_swiftRetain(task)
124117
}
125118

126119
// Swift will statically prevent this type from being copied or moved.
@@ -143,25 +136,15 @@ extension Task {
143136
overridingPriority priorityOverride: Priority? = nil,
144137
operation: @escaping () async throws -> TaskResult
145138
) async -> Task.Handle<TaskResult> {
146-
// 1) Increment the number of pending tasks immediately;
139+
// Increment the number of pending tasks immediately;
147140
// We don't need to know which specific task is pending, just that pending
148141
// ones exist, so that next() can know if to wait or return nil.
149-
let groupTask = Builtin.getCurrentAsyncTask()
150-
_taskGroupAddPendingTask(groupTask)
142+
_taskGroupAddPendingTask(self.task)
151143

152-
// 2) Create and run the child task
153144
let childTask =
154-
await _runChildTask(overridingPriority: priorityOverride, operation: operation)
155-
156-
let handle = Handle<TaskResult>(task: childTask)
145+
await _runGroupChildTask(overridingPriority: priorityOverride, operation: operation)
157146

158-
// FIXME: use executors or something else to launch the task
159-
DispatchQueue.global(priority: .default).async {
160-
// print(">>> run (task added at \(file):\(line))")
161-
handle.run()
162-
}
163-
164-
return handle
147+
return Handle<TaskResult>(task: childTask)
165148
}
166149

167150
/// Wait for the a child task that was added to the group to complete,
@@ -208,21 +191,18 @@ extension Task {
208191
if rawResult.hadErrorResult {
209192
// Throw the result on error.
210193
let error = unsafeBitCast(rawResult.storage, to: Error.self)
211-
fputs("error: next[\(#file):\(#line)]: after await, error: \(rawResult) -> \(error)\n", stderr)
212194
throw error
213195
}
214196

215197
guard let storage = rawResult.storage else {
216198
// The group was empty, return nil
217-
fputs("error: next[\(#file):\(#line)]: after await, result: \(rawResult) -> nil\n", stderr)
218199
return nil
219200
}
220201

221202
// Take the value on success.
222203
let storagePtr =
223204
storage.bindMemory(to: TaskResult.self, capacity: 1)
224205
let value = UnsafeMutablePointer<TaskResult>(mutating: storagePtr).pointee
225-
fputs("error: next[\(pthread_self()) \(#file):\(#line)]: after await, result: \(rawResult) -> \(value)\n", stderr)
226206
return value
227207
}
228208

@@ -273,7 +253,6 @@ extension Task.Group {
273253
//
274254
// Failures of tasks are ignored.
275255
while !self.isEmpty {
276-
print("[\(#function)] self.isEmpty == \(self.isEmpty)")
277256
_ = await try? self.next()
278257
// TODO: Should a failure cause a cancellation of the task group?
279258
// This looks very much like supervision trees,
@@ -285,6 +264,16 @@ extension Task.Group {
285264

286265
/// ==== -----------------------------------------------------------------------
287266

267+
@_silgen_name("swift_retain")
268+
func _swiftRetain(
269+
_ task: Builtin.NativeObject
270+
)
271+
272+
@_silgen_name("swift_task_group_add_pending")
273+
func _taskGroupAddPendingTask(
274+
_ groupTask: Builtin.NativeObject
275+
)
276+
288277
@_silgen_name("swift_task_group_offer")
289278
func taskGroupOffer(
290279
group: Builtin.NativeObject,
@@ -296,12 +285,6 @@ func _taskGroupWaitNext(
296285
on groupTask: Builtin.NativeObject
297286
) async -> (hadErrorResult: Bool, storage: UnsafeRawPointer?)
298287

299-
/// SeeAlso: GroupPollResult
300-
struct RawGroupPollResult {
301-
let status: GroupPollStatus
302-
let storage: UnsafeRawPointer
303-
}
304-
305288
enum GroupPollStatus: Int {
306289
case empty = 0
307290
case waiting = 1
@@ -313,8 +296,3 @@ enum GroupPollStatus: Int {
313296
func _taskGroupIsEmpty(
314297
_ groupTask: Builtin.NativeObject
315298
) -> Bool
316-
317-
@_silgen_name("swift_task_group_add_pending")
318-
func _taskGroupAddPendingTask(
319-
_ groupTask: Builtin.NativeObject
320-
)

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
#include "swift/ABI/Task.h"
2222
#include "swift/ABI/Metadata.h"
2323
#include "swift/Runtime/HeapObject.h"
24-
#include <pthread.h>
25-
#include <stdio.h>
2624

2725
namespace swift {
2826

@@ -134,10 +132,11 @@ static void runTaskWithGroupPollResult(
134132
// TODO: schedule this task on the executor rather than running it directly.
135133
waitingTask->run(executor);
136134

135+
// TODO: Not entirely sure when to release; we synchronously run the code above so we can't before
137136
// if we need to, release the now completed task so it can be destroyed
138-
if (result.retainedTask) {
139-
swift_release(result.retainedTask);
140-
}
137+
// if (result.retainedTask) {
138+
// swift_release(result.retainedTask);
139+
// }
141140
}
142141

143142
} // end namespace swift

0 commit comments

Comments
 (0)