2020
2121#include " rpc_util.h"
2222#include " src/__support/CPP/optional.h"
23- #include " src/__support/GPU/utils.h"
2423#include " src/__support/macros/config.h"
2524
2625#include < stdint.h>
@@ -38,6 +37,9 @@ namespace rpc {
3837#define __scoped_atomic_fetch_and (src, val, ord, scp ) \
3938 __atomic_fetch_and (src, val, ord)
4039#endif
40+ #if !__has_builtin(__scoped_atomic_thread_fence)
41+ #define __scoped_atomic_thread_fence (ord, scp ) __atomic_thread_fence(ord)
42+ #endif
4143
4244// / A fixed size channel used to communicate between the RPC client and server.
4345struct Buffer {
@@ -110,14 +112,14 @@ template <bool Invert> struct Process {
110112
111113 // / Retrieve the inbox state from memory shared between processes.
112114 LIBC_INLINE uint32_t load_inbox (uint64_t lane_mask, uint32_t index) const {
113- return gpu ::broadcast_value (
115+ return rpc ::broadcast_value (
114116 lane_mask, __scoped_atomic_load_n (&inbox[index], __ATOMIC_RELAXED,
115117 __MEMORY_SCOPE_SYSTEM));
116118 }
117119
118120 // / Retrieve the outbox state from memory shared between processes.
119121 LIBC_INLINE uint32_t load_outbox (uint64_t lane_mask, uint32_t index) const {
120- return gpu ::broadcast_value (
122+ return rpc ::broadcast_value (
121123 lane_mask, __scoped_atomic_load_n (&outbox[index], __ATOMIC_RELAXED,
122124 __MEMORY_SCOPE_SYSTEM));
123125 }
@@ -128,7 +130,7 @@ template <bool Invert> struct Process {
128130 // / cheaper than calling load_outbox to get the value to store.
129131 LIBC_INLINE uint32_t invert_outbox (uint32_t index, uint32_t current_outbox) {
130132 uint32_t inverted_outbox = !current_outbox;
131- __atomic_thread_fence (__ATOMIC_RELEASE);
133+ __scoped_atomic_thread_fence (__ATOMIC_RELEASE, __MEMORY_SCOPE_SYSTEM );
132134 __scoped_atomic_store_n (&outbox[index], inverted_outbox, __ATOMIC_RELAXED,
133135 __MEMORY_SCOPE_SYSTEM);
134136 return inverted_outbox;
@@ -142,7 +144,7 @@ template <bool Invert> struct Process {
142144 sleep_briefly ();
143145 in = load_inbox (lane_mask, index);
144146 }
145- __atomic_thread_fence (__ATOMIC_ACQUIRE);
147+ __scoped_atomic_thread_fence (__ATOMIC_ACQUIRE, __MEMORY_SCOPE_SYSTEM );
146148 }
147149
148150 // / The packet is a linearly allocated array of buffers used to communicate
@@ -162,9 +164,10 @@ template <bool Invert> struct Process {
162164
163165 // / Attempt to claim the lock at index. Return true on lock taken.
164166 // / lane_mask is a bitmap of the threads in the warp that would hold the
165- // / single lock on success, e.g. the result of gpu ::get_lane_mask()
167+ // / single lock on success, e.g. the result of rpc ::get_lane_mask()
166168 // / The lock is held when the n-th bit of the lock bitfield is set.
167- LIBC_INLINE bool try_lock (uint64_t lane_mask, uint32_t index) {
169+ [[clang::convergent]] LIBC_INLINE bool try_lock (uint64_t lane_mask,
170+ uint32_t index) {
168171 // On amdgpu, test and set to the nth lock bit and a sync_lane would suffice
169172 // On volta, need to handle differences between the threads running and
170173 // the threads that were detected in the previous call to get_lane_mask()
@@ -173,12 +176,12 @@ template <bool Invert> struct Process {
173176 // There may be threads active which are not in lane mask which must not
174177 // succeed in taking the lock, as otherwise it will leak. This is handled
175178 // by making threads which are not in lane_mask or with 0, a no-op.
176- uint32_t id = gpu ::get_lane_id ();
179+ uint32_t id = rpc ::get_lane_id ();
177180 bool id_in_lane_mask = lane_mask & (1ul << id);
178181
179182 // All threads in the warp call fetch_or. Possibly at the same time.
180183 bool before = set_nth (lock, index, id_in_lane_mask);
181- uint64_t packed = gpu ::ballot (lane_mask, before);
184+ uint64_t packed = rpc ::ballot (lane_mask, before);
182185
183186 // If every bit set in lane_mask is also set in packed, every single thread
184187 // in the warp failed to get the lock. Ballot returns unset for threads not
@@ -198,22 +201,23 @@ template <bool Invert> struct Process {
198201 // inlining the current function.
199202 bool holding_lock = lane_mask != packed;
200203 if (holding_lock)
201- __atomic_thread_fence (__ATOMIC_ACQUIRE);
204+ __scoped_atomic_thread_fence (__ATOMIC_ACQUIRE, __MEMORY_SCOPE_DEVICE );
202205 return holding_lock;
203206 }
204207
205208 // / Unlock the lock at index. We need a lane sync to keep this function
206209 // / convergent, otherwise the compiler will sink the store and deadlock.
207- LIBC_INLINE void unlock (uint64_t lane_mask, uint32_t index) {
210+ [[clang::convergent]] LIBC_INLINE void unlock (uint64_t lane_mask,
211+ uint32_t index) {
208212 // Do not move any writes past the unlock.
209- __atomic_thread_fence (__ATOMIC_RELEASE);
213+ __scoped_atomic_thread_fence (__ATOMIC_RELEASE, __MEMORY_SCOPE_DEVICE );
210214
211215 // Use exactly one thread to clear the nth bit in the lock array Must
212216 // restrict to a single thread to avoid one thread dropping the lock, then
213217 // an unrelated warp claiming the lock, then a second thread in this warp
214218 // dropping the lock again.
215- clear_nth (lock, index, gpu ::is_first_lane (lane_mask));
216- gpu ::sync_lane (lane_mask);
219+ clear_nth (lock, index, rpc ::is_first_lane (lane_mask));
220+ rpc ::sync_lane (lane_mask);
217221 }
218222
219223 // / Number of bytes to allocate for an inbox or outbox.
@@ -276,9 +280,9 @@ template <typename F>
276280LIBC_INLINE static void invoke_rpc (F &&fn, uint32_t lane_size,
277281 uint64_t lane_mask, Buffer *slot) {
278282 if constexpr (is_process_gpu ()) {
279- fn (&slot[gpu ::get_lane_id ()], gpu ::get_lane_id ());
283+ fn (&slot[rpc ::get_lane_id ()], rpc ::get_lane_id ());
280284 } else {
281- for (uint32_t i = 0 ; i < lane_size; i += gpu::get_lane_size ())
285+ for (uint32_t i = 0 ; i < lane_size; i += rpc::get_num_lanes ())
282286 if (lane_mask & (1ul << i))
283287 fn (&slot[i], i);
284288 }
@@ -323,7 +327,7 @@ template <bool T> struct Port {
323327
324328 LIBC_INLINE void close () {
325329 // Wait for all lanes to finish using the port.
326- gpu ::sync_lane (lane_mask);
330+ rpc ::sync_lane (lane_mask);
327331
328332 // The server is passive, if it own the buffer when it closes we need to
329333 // give ownership back to the client.
@@ -466,7 +470,7 @@ LIBC_INLINE void Port<T>::send_n(const void *const *src, uint64_t *size) {
466470 });
467471 uint64_t idx = sizeof (Buffer::data) - sizeof (uint64_t );
468472 uint64_t mask = process.header [index].mask ;
469- while (gpu ::ballot (mask, idx < num_sends)) {
473+ while (rpc ::ballot (mask, idx < num_sends)) {
470474 send ([=](Buffer *buffer, uint32_t id) {
471475 uint64_t len = lane_value (size, id) - idx > sizeof (Buffer::data)
472476 ? sizeof (Buffer::data)
@@ -499,7 +503,7 @@ LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
499503 });
500504 uint64_t idx = sizeof (Buffer::data) - sizeof (uint64_t );
501505 uint64_t mask = process.header [index].mask ;
502- while (gpu ::ballot (mask, idx < num_recvs)) {
506+ while (rpc ::ballot (mask, idx < num_recvs)) {
503507 recv ([=](Buffer *buffer, uint32_t id) {
504508 uint64_t len = lane_value (size, id) - idx > sizeof (Buffer::data)
505509 ? sizeof (Buffer::data)
@@ -517,16 +521,17 @@ LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
517521// / port. Each port instance uses an associated \p opcode to tell the server
518522// / what to do. The Client interface provides the appropriate lane size to the
519523// / port using the platform's returned value.
520- template <uint16_t opcode> LIBC_INLINE Client::Port Client::open () {
524+ template <uint16_t opcode>
525+ [[clang::convergent]] LIBC_INLINE Client::Port Client::open () {
521526 // Repeatedly perform a naive linear scan for a port that can be opened to
522527 // send data.
523- for (uint32_t index = gpu::get_cluster_id () ;; ++index) {
528+ for (uint32_t index = 0 ;; ++index) {
524529 // Start from the beginning if we run out of ports to check.
525530 if (index >= process.port_count )
526531 index = 0 ;
527532
528533 // Attempt to acquire the lock on this index.
529- uint64_t lane_mask = gpu ::get_lane_mask ();
534+ uint64_t lane_mask = rpc ::get_lane_mask ();
530535 if (!process.try_lock (lane_mask, index))
531536 continue ;
532537
@@ -540,22 +545,22 @@ template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
540545 continue ;
541546 }
542547
543- if (gpu ::is_first_lane (lane_mask)) {
548+ if (rpc ::is_first_lane (lane_mask)) {
544549 process.header [index].opcode = opcode;
545550 process.header [index].mask = lane_mask;
546551 }
547- gpu ::sync_lane (lane_mask);
548- return Port (process, lane_mask, gpu::get_lane_size (), index, out);
552+ rpc ::sync_lane (lane_mask);
553+ return Port (process, lane_mask, rpc::get_num_lanes (), index, out);
549554 }
550555}
551556
552557// / Attempts to open a port to use as the server. The server can only open a
553558// / port if it has a pending receive operation
554- LIBC_INLINE cpp::optional<typename Server::Port>
559+ [[clang::convergent]] LIBC_INLINE cpp::optional<typename Server::Port>
555560Server::try_open (uint32_t lane_size, uint32_t start) {
556561 // Perform a naive linear scan for a port that has a pending request.
557562 for (uint32_t index = start; index < process.port_count ; ++index) {
558- uint64_t lane_mask = gpu ::get_lane_mask ();
563+ uint64_t lane_mask = rpc ::get_lane_mask ();
559564 uint32_t in = process.load_inbox (lane_mask, index);
560565 uint32_t out = process.load_outbox (lane_mask, index);
561566
@@ -595,6 +600,9 @@ LIBC_INLINE Server::Port Server::open(uint32_t lane_size) {
595600#undef __scoped_atomic_fetch_or
596601#undef __scoped_atomic_fetch_and
597602#endif
603+ #if !__has_builtin(__scoped_atomic_thread_fence)
604+ #undef __scoped_atomic_thread_fence
605+ #endif
598606
599607} // namespace rpc
600608} // namespace LIBC_NAMESPACE_DECL
0 commit comments