Skip to content

Commit 85d003e

Browse files
committed
[Concurrency] Implement basic runtime support for task futures.
Extend AsyncTask and the concurrency runtime with basic support for task futures. AsyncTasks with futures contain a future fragment with information about the type produced by the future, and where the future will put the result value or the thrown error in the initial context. We still don't have the ability to schedule the waiting tasks on an executor when the future completes, so this isn't useful for anything just test, and we can only test limited code paths.
1 parent 9116af9 commit 85d003e

File tree

5 files changed

+467
-11
lines changed

5 files changed

+467
-11
lines changed

include/swift/ABI/Task.h

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "swift/Basic/RelativePointer.h"
2121
#include "swift/ABI/HeapObject.h"
22+
#include "swift/ABI/Metadata.h"
2223
#include "swift/ABI/MetadataValues.h"
2324
#include "swift/Runtime/Config.h"
2425
#include "swift/Basic/STLExtras.h"
@@ -29,6 +30,7 @@ class AsyncTask;
2930
class AsyncContext;
3031
class Executor;
3132
class Job;
33+
struct OpaqueValue;
3234
class TaskStatusRecord;
3335

3436
/// An ExecutorRef isn't necessarily just a pointer to an executor
@@ -188,12 +190,16 @@ class AsyncTask : public HeapObject, public Job {
188190
/// Reserved for the use of the task-local stack allocator.
189191
void *AllocatorPrivate[4];
190192

193+
/// The next task in the linked list of waiting tasks.
194+
AsyncTask *NextWaitingTask;
195+
191196
AsyncTask(const HeapMetadata *metadata, JobFlags flags,
192197
TaskContinuationFunction *run,
193198
AsyncContext *initialContext)
194199
: HeapObject(metadata), Job(flags, run),
195200
ResumeContext(initialContext),
196-
Status(ActiveTaskStatus()) {
201+
Status(ActiveTaskStatus()),
202+
NextWaitingTask(nullptr) {
197203
assert(flags.isAsyncTask());
198204
}
199205

@@ -230,23 +236,117 @@ class AsyncTask : public HeapObject, public Job {
230236
}
231237
};
232238

233-
bool isFuture() const { return Flags.task_isFuture(); }
234-
235239
bool hasChildFragment() const { return Flags.task_isChildTask(); }
236240
ChildFragment *childFragment() {
237241
assert(hasChildFragment());
238242
return reinterpret_cast<ChildFragment*>(this + 1);
239243
}
240244

241-
// TODO: Future fragment
245+
class FutureFragment {
246+
public:
247+
/// Describes the status of the future.
248+
///
249+
/// Futures always being in the "Executing" state, and will always
250+
/// make a single state change to either Success or Error.
251+
enum class Status : uintptr_t {
252+
/// The future is executing or ready to execute. The storage
253+
/// is not accessible.
254+
Executing = 0,
255+
256+
/// The future has completed with result (of type \c resultType).
257+
Success,
258+
259+
/// The future has completed by throwing an error (an \c Error
260+
/// existential).
261+
Error,
262+
};
263+
264+
private:
265+
/// Status of the future.
266+
std::atomic<Status> status;
267+
268+
/// Queue containing all of the tasks that are waiting in `get()`.
269+
/// FIXME: do we also need a context pointer for each?
270+
std::atomic<AsyncTask*> waitQueue;
271+
272+
/// The type of the result that will be produced by the future.
273+
const Metadata *resultType;
274+
275+
/// The offset of the result in the initial asynchronous context.
276+
unsigned resultOffset;
277+
278+
/// The offset of the error in the initial asynchronous context.
279+
unsigned errorOffset;
280+
281+
// Trailing storage for the result itself. The storage will be uninitialized,
282+
// contain an instance of \c resultType, or contaon an an \c Error.
283+
284+
friend class AsyncTask;
285+
286+
public:
287+
FutureFragment(
288+
const Metadata *resultType, size_t resultOffset, size_t errorOffset)
289+
: status(Status::Success), waitQueue(nullptr), resultType(resultType),
290+
resultOffset(resultOffset), errorOffset(errorOffset) { }
291+
292+
/// Destroy the storage associated with the future.
293+
void destroy();
294+
295+
/// Retrieve a pointer to the storage of result.
296+
OpaqueValue *getStoragePtr() {
297+
return reinterpret_cast<OpaqueValue *>(
298+
reinterpret_cast<char *>(this) + storageOffset(resultType));
299+
}
300+
301+
/// Retrieve a reference to the storage of the
302+
303+
/// Compute the offset of the storage from the base of the future
304+
/// fragment.
305+
static size_t storageOffset(const Metadata *resultType) {
306+
size_t offset = sizeof(FutureFragment);
307+
size_t alignment = resultType->vw_alignment();
308+
return (offset + alignment - 1) & ~(alignment - 1);
309+
}
310+
311+
/// Determine the size of the future fragment given a particular future
312+
/// result type.
313+
static size_t fragmentSize(const Metadata *resultType);
314+
};
315+
316+
bool isFuture() const { return Flags.task_isFuture(); }
317+
318+
FutureFragment *futureFragment() {
319+
assert(isFuture());
320+
if (hasChildFragment()) {
321+
return reinterpret_cast<FutureFragment *>(
322+
reinterpret_cast<ChildFragment*>(this + 1) + 1);
323+
}
324+
325+
return reinterpret_cast<FutureFragment *>(this + 1);
326+
}
327+
328+
/// Wait for this future to complete.
329+
///
330+
/// \returns the status of the future. If this result is
331+
/// \c Executing, then \c waitingTask has been added to the
332+
/// wait queue and will be scheduled when the future completes or
333+
/// is cancelled. Otherwise, the future has completed and can be
334+
/// queried.
335+
FutureFragment::Status waitFuture(AsyncTask *waitingTask);
336+
337+
/// Complete this future.
338+
void completeFuture(AsyncContext *context);
339+
340+
/// Schedule waiting tasks now that the future has completed.
341+
void scheduleWaitingTasks(ExecutorRef executor);
242342

243343
static bool classof(const Job *job) {
244344
return job->isAsyncTask();
245345
}
246346
};
247347

248348
// The compiler will eventually assume these.
249-
static_assert(sizeof(AsyncTask) == 12 * sizeof(void*),
349+
static_assert(sizeof(AsyncTask) == 14 * sizeof(void*),
250350
"AsyncTask size is wrong");
251351
static_assert(alignof(AsyncTask) == 2 * alignof(void*),
252352
"AsyncTask alignment is wrong");

include/swift/Runtime/Concurrency.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,33 @@ AsyncTaskAndContext swift_task_create_f(JobFlags flags,
4949
AsyncFunctionType<void()> *function,
5050
size_t initialContextSize);
5151

52+
/// Create a task object with a future which will run the given
53+
/// function.
54+
///
55+
/// The task is not yet scheduled.
56+
///
57+
/// If a parent task is provided, flags.task_hasChildFragment() must
58+
/// be true, and this must be called synchronously with the parent.
59+
/// The parent is responsible for creating a ChildTaskStatusRecord.
60+
/// TODO: should we have a single runtime function for creating a task
61+
/// and doing this child task status record management?
62+
///
63+
/// flags.task_isFuture must be set. \c futureResultType is the type
64+
///
65+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
66+
AsyncTaskAndContext swift_task_create_future(
67+
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
68+
const AsyncFunctionPointer<void()> *function,
69+
size_t resultOffset, size_t errorOffset);
70+
71+
/// Create a task object with no future which will run the given
72+
/// function.
73+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
74+
AsyncTaskAndContext swift_task_create_future_f(
75+
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
76+
AsyncFunctionType<void()> *function, size_t initialContextSize,
77+
size_t resultOffset, size_t errorOffset);
78+
5279
/// Allocate memory in a task.
5380
///
5481
/// This must be called synchronously with the task.
@@ -83,6 +110,34 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
83110
JobPriority
84111
swift_task_escalate(AsyncTask *task, JobPriority newPriority);
85112

113+
/// The result of waiting for a task future.
114+
struct TaskFutureWaitResult {
115+
enum Kind : uintptr_t {
116+
/// The waiting task has been added to the future's wait queue, and will
117+
/// be scheduled once the future has completed.
118+
Waiting,
119+
120+
/// The future succeeded and produced a result value. \c storage points
121+
/// at that value.
122+
Success,
123+
124+
/// The future finished by throwing an error. \c storage is that error
125+
/// existential.
126+
Error,
127+
};
128+
129+
Kind kind;
130+
OpaqueValue *storage;
131+
};
132+
133+
/// Wait for a future task to complete.
134+
///
135+
/// This can be called from any thread.
136+
///
137+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
138+
TaskFutureWaitResult
139+
swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask);
140+
86141
/// Add a status record to a task. The record should not be
87142
/// modified while it is registered with a task.
88143
///

0 commit comments

Comments
 (0)