Skip to content

Commit 008ab08

Browse files
authored
Affine on wording (#59)
* started to add wording to the affinity paper
1 parent 911202a commit 008ab08

File tree

12 files changed

+322
-318
lines changed

12 files changed

+322
-318
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ FetchContent_Declare(
3636
execution
3737
# SOURCE_DIR ${CMAKE_SOURCE_DIR}/../execution
3838
GIT_REPOSITORY https://github.com/bemanproject/execution
39-
GIT_TAG 7451ece
39+
GIT_TAG a098f90
4040
)
4141
FetchContent_MakeAvailable(execution)
4242

docs/P3941-affinity.md

Lines changed: 310 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
title: Scheduler Affinity
3-
document: P3941R0
4-
date: 2025-12-14
3+
document: P3941R1
4+
date: 2026-01-14
55
audience:
66
- Concurrency Working Group (SG1)
77
- Library Evolution Working Group (LEWG)
@@ -31,6 +31,14 @@ meet its objective at run-time.
3131

3232
# Change History
3333

34+
## R2
35+
36+
- fixed typo in the wording for `affine_on::transform_sender`
37+
38+
## R1
39+
40+
- added wording
41+
3442
## R0 Initial Revision
3543

3644
# Overview of Changes
@@ -495,6 +503,304 @@ resolution in this issue is incomplete).
495503
The name `affine_on` isn't great. It may be worth giving the
496504
algorithm a better name.
497505
498-
# Wording Changes: TODO
506+
# Wording Changes
507+
508+
::: ednote
509+
Change [exec.affine.on] to use only one parameter, require an
510+
infallible scheduler from the receiver, and add a default implementation
511+
which allows customization of `affine_on` for child senders:
512+
:::
513+
514+
[1]{.pnum}
515+
`affine_on` adapts a sender into one that completes on a [specified
516+
scheduler]{.rm}[receiver's scheduler]{.add}. If the algorithm
517+
determines that the adapted sender already completes on the correct
518+
scheduler it can avoid any scheduling operation.
519+
520+
[2]{.pnum}
521+
The name `affine_on` denotes a pipeable sender adaptor object. For
522+
[a ]{.add} subexpression[s sch and]{.rm} `sndr`, if [`decltype((sch))`
523+
does not satisfy scheduler, or]{.rm} `decltype((sndr))` does not
524+
satisfy sender, <code>affine_on(sndr[, sch]{.rm})</code> is ill-formed.
525+
526+
[3]{.pnum}
527+
Otherwise, the expression <code>affine_on(sndr[, sch]{.rm})</code>
528+
is expression-equivalent to:
529+
<code>transform_sender(_get-domain-early_(sndr), _make-sender_(affine_on,
530+
[sch]{.rm}[env&lt;&gt;()]{.add}, sndr))</code> except that `sndr`
531+
is evaluated only once.
532+
533+
[4]{.pnum}
534+
The exposition-only class template <code>_impls-for_</code>
535+
([exec.snd.expos]) is specialized for `affine_on_t` as follows:
536+
537+
```c++
538+
namespace std::execution {
539+
template<>
540+
struct impls-for<affine_on_t> : default-impls {
541+
static constexpr auto get-attrs =
542+
[](const auto&@[ data]{.rm}]@, const auto& child) noexcept -> decltype(auto) {
543+
return @[_JOIN-ENV_(_SCHED-ATTRS_(data),_FWD-ENV_(]{.rm}@get_env(child)@[))]{.rm}@;
544+
};
545+
};
546+
}
547+
```
548+
549+
:::{.add}
550+
[?]{.pnum}
551+
Let `sndr` and `ev` be subexpressions such that `Sndr` is
552+
`decltype((sndr))`. If <code><i>sender-for</i>&lt;Sndr,
553+
affine_on_t&gt;</code> is `false`, then the expression
554+
`affine_on.transform_sender(sndr, ev)` is ill-formed; otherwise,
555+
it is equal to:
556+
557+
```
558+
auto&[_, _, child] = sndr;
559+
using child_tag_t = tag_of_t<remove_cvref_t<decltype(child)>>;
560+
if constexpr (requires(const child_tag_t& t){ t.affine_on(child, ev); })
561+
return t.affine_on(child, ev);
562+
else
563+
return write_env(
564+
schedule_from(get_scheduler(get_env(ev)), write_env(std::move(child), ev)),
565+
JOIN-ENV(env{prop{get_stop_token, never_stop_token()}}, ev)
566+
);
567+
```
568+
569+
[Note 1: This causes the `affine_on(sndr)` sender to become
570+
`schedule_from(sch, sndr)` when it is connected with a receiver
571+
`rcvr` whose execution domain does not customize `affine_on`,
572+
for which `get_scheduler(get_env(rcvr))` is `sch`, and `affine_on`
573+
isn't specialized for the child sender.
574+
end note]
575+
576+
[?]{.pnum}
577+
_Recommended Practice_: Implementations should provide `affine_on`
578+
member functions for senders which are known to resume on the
579+
scheduler where they were started. Example senders for which that
580+
is the case are `just`, `just_error`, `just_stopped`, `read_env`,
581+
and `write_env`.
582+
583+
:::
584+
585+
[5]{.pnum}
586+
Let <code>_out_sndr_</code> be a subexpression denoting a sender
587+
returned from <code>affine_on(sndr[, sch]{.rm})</code> or one equal
588+
to such, and let <code>_OutSndr_</code> be the type
589+
<code>decltype((_out_sndr_))</code>. Let <code>_out_rcvr_</code>
590+
be a subexpression denoting a receiver that has an environment of
591+
type `Env` such that <code>sender_in&lt;_OutSndr_, Env&gt;</code>
592+
is `true`. [Let <code>_sch_</code> be the result of the expression
593+
<code>get_scheduler(get_env(_out_rcvr_))</code>. If the completion
594+
signatures of <code>schedule(_sch_)</code> contain a different
595+
completion signature than `set_value_t()` when using an environment
596+
where `get_stop_token()` returns an `unstoppable_token`, the
597+
expression <code>connect(<i>out_sndr</i>, <i>out_rcvr</i>)</code> is
598+
ill-formed.]{.add} Let `op` be an lvalue referring to the operation
599+
state that results from connecting <code>_out_sndr_</code> to
600+
<code>_out_rcvr_</code>. Calling <code>start(_op_)</code> will
601+
start `sndr` on the current execution agent and execute completion
602+
operations on <code>_out_rcvr_</code> on an execution agent of the
603+
execution resource associated with [`sch`]{.rm}[<code>_sch_</code>]{.add}.
604+
If the current execution resource is the same as the execution
605+
resource associated with [`sch`]{.rm}[<code>_sch_</code>]{.add},
606+
the completion operation on <code>_out_rcvr_</code> may be called
607+
before <code>start(_op_)</code> completes. [If scheduling onto `sch`
608+
fails, an error completion on <code>_out_rcvr_</code> shall be
609+
executed on an unspecified execution agent.]{.rm}
610+
611+
::: ednote
612+
Remove `change_coroutine_scheduler` from [execution.syn]:
613+
:::
614+
615+
```
616+
namespace std::execution {
617+
...
618+
// [exec.task.scheduler], task scheduler
619+
class task_scheduler;
620+
621+
template<class E>
622+
struct with_error {
623+
using type = remove_cvref_t<E>;
624+
type error;
625+
};
626+
template<class E>
627+
with_error(E) -> with_error<E>;
628+
```
629+
::: rm
630+
```
631+
template<scheduler Sch>
632+
struct change_coroutine_scheduler {
633+
using type = remove_cvref_t<Sch>;
634+
type scheduler;
635+
};
636+
template<scheduler Sch>
637+
change_coroutine_scheduler(Sch) -> change_coroutine_scheduler<Sch>;
638+
```
639+
:::
640+
```
641+
// [exec.task], class template task
642+
template<class T, class Environment>
643+
class task;
644+
...
645+
}
646+
```
647+
648+
::: ednote
649+
Adjust the use of `affine_on` and remove `change_coroutine_scheduler` from [task.promise]:
650+
:::
651+
652+
```
653+
namespace std::execution {
654+
template<class T, class Environment>
655+
class task<T, Environment>::promise_type {
656+
public:
657+
...
658+
659+
template<class A>
660+
auto await_transform(A&& a);
661+
```
662+
::: rm
663+
```
664+
template<class Sch>
665+
auto await_transform(change_coroutine_scheduler<Sch> sch);
666+
```
667+
:::
668+
```
669+
670+
@_unspecified_@ get_env() const noexcept;
671+
672+
...
673+
}
674+
};
675+
```
676+
...
677+
678+
```
679+
template<sender Sender>
680+
auto await_transform(Sender&& sndr) noexcept;
681+
```
682+
[9]{.pnum}
683+
_Returns_: If `same_as<inline_scheduler, scheduler_type>` is `true` returns `as_awaitable(​std​::​​forward<Sender>(sndr), *this);` otherwise returns `as_awaitable(affine_on(​std​::​​forward<Sender>(sndr)@[, SCHED(*this)]{.rm}@), *this)`.
684+
685+
::: rm
686+
```
687+
template<class Sch>
688+
auto await_transform(change_coroutine_scheduler<Sch> sch) noexcept;
689+
```
690+
[10]{.pnum}
691+
_Effects_: Equivalent to:
692+
```
693+
return await_transform(just(exchange(SCHED(*this), scheduler_type(sch.scheduler))), *this);
694+
```
695+
:::
696+
697+
```
698+
void unhandled_exception();
699+
```
700+
[11]{.pnum}
701+
_Effects_: If the signature `set_error_t(exception_ptr)` is not an element of `error_types`, calls `terminate()` ([except.terminate]). Otherwise, stores `current_exception()` into <code>_errors_</code>.
702+
703+
...
704+
705+
::: ednote
706+
In [exec.task.scheduler] change the constructor of `task_scheduler` to require that the scheduler passed
707+
is infallible
708+
:::
709+
710+
```
711+
template<class Sch, class Allocator = allocator<void>>
712+
requires(!same_as<task_scheduler, remove_cvref_t<Sch>>) && scheduler<Sch>
713+
explicit task_scheduler(Sch&& sch, Allocator alloc = {});
714+
```
715+
716+
::: add
717+
[?]{.pnum}
718+
_Mandates_: Let `e` be an environment and let `E` be `decltype(e)`.
719+
If `unstoppable_token<decltype(get_stop_token(e))>` is `true`, then
720+
the type `completion_signatures_of_t<decltype(schedule(sch)), E>`
721+
only includes `set_value_t()`, otherwise it may additionally include
722+
`set_stopped_t()`.
723+
:::
724+
725+
[2]{.pnum}
726+
_Effects_: Initialize <code><i>sch_</i></code> with `allocate_shared<remove_cvref_t<Sch>>(alloc,​ std​::​forward<Sch>​(sch))`.
727+
728+
[3]{.pnum}
729+
_Recommended practice_: Implementations should avoid the use of
730+
dynamically allocated memory for small scheduler objects.
731+
732+
[4]{.pnum}
733+
_Remarks_: Any allocations performed by construction of
734+
<code>_ts-sender_</code> or <code>_state_</code> objects resulting
735+
from calls on `*this` are performed using a copy of `alloc`.
736+
737+
::: ednote
738+
In [exec.task.scheduler] change the <code><i>ts-sender</i></code> completion signatures
739+
to indicate that `task_scheduler` is infallible:
740+
:::
741+
742+
[8]{.pnum}
743+
```
744+
namespace std::execution {
745+
class task_scheduler::@_ts-sender_@ { // @_exposition only_@
746+
public:
747+
using sender_concept = sender_t;
748+
749+
template<receiver Rcvr>
750+
@_state_@<Rcvr> connect(Rcvr&& rcvr) &&;
751+
};
752+
}
753+
```
754+
755+
<code><i>ts-sender</i></code> is an exposition-only class that
756+
models `sender` ([exec.snd]) and for which
757+
<code>completion_signatures_of_t&lt;<i>ts-sender</i>[, E]{.add}&gt;</code>
758+
denotes[:]{.rm}[ `completion_signatures<set_value_t()>` if `unstoppable_token<decltype(get_stop_token(declval<E>()))>` is `true`, and
759+
otherwise `completion_signatures<set_value_t(), set_stopped_t()>`.]{.add}
760+
761+
::: rm
762+
```
763+
completion_signatures<
764+
set_value_t(),
765+
set_error_t(error_code),
766+
set_error_t(exception_ptr),
767+
set_stopped_t()>
768+
```
769+
:::
770+
771+
::: ednote
772+
In [exec.run.loop.types] change the paragraph defining the completion signatures:
773+
:::
774+
775+
...
776+
777+
```
778+
class run-loop-sender;
779+
```
780+
781+
[5]{.pnum}
782+
<code><i>run-loop-sender</i></code> is an exposition-only type that satisfies `sender`.
783+
[Let `E` be the type of an environment. If `unstoppable_token<decltype(get_stop_token(declval<E>()))>` is `true`,
784+
then ]{.add} <code>completion_signatures_of_t&lt;<i>run-loop-sender</i>[, E]{.add}&gt;</code> is
785+
786+
::: rm
787+
```
788+
completion_signatures<set_value_t(), set_error_t(exception_ptr), set_stopped_t()>
789+
```
790+
:::
791+
792+
::: add
793+
```
794+
completion_signatures<set_value_t()>
795+
```
796+
Otherwise it is
797+
```
798+
completion_signatures<set_value_t(), set_stopped_t()>
799+
```
800+
:::
801+
802+
[6]{.pnum} An instance of <code><i>run-loop-sender</i></code> remains
803+
valid until the end of the lifetime of its associated `run_loop`
804+
instance.
499805

500-
To be done.
806+
...

examples/bulk.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ int main() {
2525
ex::sync_wait(ex::write_env(ex::bulk(ex::just(), 16u, work{}), env{}));
2626

2727
ex::sync_wait(
28-
ex::write_env([]() -> ex::task<void, ex::empty_env> { co_await ex::bulk(ex::just(), 16u, work{}); }(), env{}));
28+
ex::write_env([]() -> ex::task<void, ex::env<>> { co_await ex::bulk(ex::just(), 16u, work{}); }(), env{}));
2929
}

examples/c++now-errors.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ using identity_or_none_t = typename identity_or_none<T...>::type;
3636
#if 202202 <= __cpp_lib_expected
3737
template <ex::sender Sender>
3838
auto as_expected(Sender&& sndr) {
39-
using value_type = ex::value_types_of_t<Sender, ex::empty_env, std::tuple, identity_or_none_t>;
40-
using error_type = ex::error_types_of_t<Sender, ex::empty_env, identity_or_none_t>;
39+
using value_type = ex::value_types_of_t<Sender, ex::env<>, std::tuple, identity_or_none_t>;
40+
using error_type = ex::error_types_of_t<Sender, ex::env<>, identity_or_none_t>;
4141
using result_type = std::expected<value_type, error_type>;
4242

4343
return std::forward<Sender>(sndr) |

examples/dangling-references.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ namespace ex = beman::execution;
1212
// ----------------------------------------------------------------------------
1313

1414
namespace {
15-
ex::task<int, ex::empty_env> do_work(std::string) { /* work */ co_return 0; };
16-
ex::task<void, ex::empty_env> execute_all() {
15+
ex::task<int, ex::env<>> do_work(std::string) { /* work */ co_return 0; };
16+
ex::task<void, ex::env<>> execute_all() {
1717
co_await ex::when_all(do_work("arguments 1"), do_work("arguments 2"));
1818
co_return;
1919
}
@@ -27,8 +27,8 @@ int main() {
2727
ex::sync_wait([]() -> ex::task<ex::with_error<int>, error_env> { co_return ex::with_error<int>{42}; }());
2828

2929
ex::sync_wait(execute_all());
30-
ex::sync_wait([]() -> ex::task<void, ex::empty_env> {
31-
auto t = [](const int /* this would be added: &*/ v) -> ex::task<int, ex::empty_env> { co_return v; }(42);
30+
ex::sync_wait([]() -> ex::task<void, ex::env<>> {
31+
auto t = [](const int /* this would be added: &*/ v) -> ex::task<int, ex::env<>> { co_return v; }(42);
3232
[[maybe_unused]] auto v = co_await std::move(t);
3333
}());
3434
}

examples/issue-start-reschedules.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ ex::task<> test(auto sched) {
1313
std::cout << "init =" << std::this_thread::get_id() << "\n";
1414
co_await ex::starts_on(sched, ex::just());
1515
// static_assert(std::same_as<void, decltype(ex::get_completion_signatures(ex::starts_on(sched, ex::just()),
16-
// ex::empty_env{}))>);
16+
// ex::env<>{}))>);
1717
co_await ex::just();
1818
std::cout << "final=" << std::this_thread::get_id() << "\n";
1919
}

examples/rvalue-task.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ void test(T&& task) {
3232
} // namespace
3333

3434
int main() {
35-
auto task = []() -> ex::task<void, ex::empty_env> { co_return; }();
35+
auto task = []() -> ex::task<void, ex::env<>> { co_return; }();
3636
test(std::move(task));
3737
// test(task);
3838
}

0 commit comments

Comments
 (0)