118118
119119use crate :: loom:: cell:: UnsafeCell ;
120120use crate :: loom:: sync:: atomic:: { AtomicBool , AtomicUsize } ;
121- use crate :: loom:: sync:: { Arc , Mutex , MutexGuard , RwLock , RwLockReadGuard } ;
121+ use crate :: loom:: sync:: { Arc , Mutex , MutexGuard } ;
122122use crate :: runtime:: coop:: cooperative;
123123use crate :: util:: linked_list:: { self , GuardedLinkedList , LinkedList } ;
124124use crate :: util:: WakeList ;
@@ -304,7 +304,7 @@ use self::error::{RecvError, SendError, TryRecvError};
304304/// Data shared between senders and receivers.
305305struct Shared < T > {
306306 /// slots in the channel.
307- buffer : Box < [ RwLock < Slot < T > > ] > ,
307+ buffer : Box < [ Mutex < Slot < T > > ] > ,
308308
309309 /// Mask a position -> index.
310310 mask : usize ,
@@ -348,7 +348,7 @@ struct Slot<T> {
348348 ///
349349 /// The value is set by `send` when the write lock is held. When a reader
350350 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
351- val : UnsafeCell < Option < T > > ,
351+ val : Option < T > ,
352352}
353353
354354/// An entry in the wait queue.
@@ -386,7 +386,7 @@ generate_addr_of_methods! {
386386}
387387
388388struct RecvGuard < ' a , T > {
389- slot : RwLockReadGuard < ' a , Slot < T > > ,
389+ slot : MutexGuard < ' a , Slot < T > > ,
390390}
391391
392392/// Receive a value future.
@@ -395,11 +395,15 @@ struct Recv<'a, T> {
395395 receiver : & ' a mut Receiver < T > ,
396396
397397 /// Entry in the waiter `LinkedList`.
398- waiter : UnsafeCell < Waiter > ,
398+ waiter : WaiterCell ,
399399}
400400
401- unsafe impl < ' a , T : Send > Send for Recv < ' a , T > { }
402- unsafe impl < ' a , T : Send > Sync for Recv < ' a , T > { }
401+ // The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
402+ // from `Recv`.
403+ struct WaiterCell ( UnsafeCell < Waiter > ) ;
404+
405+ unsafe impl Send for WaiterCell { }
406+ unsafe impl Sync for WaiterCell { }
403407
404408/// Max number of receivers. Reserve space to lock.
405409const MAX_RECEIVERS : usize = usize:: MAX >> 2 ;
@@ -467,12 +471,6 @@ pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
467471 ( tx, rx)
468472}
469473
470- unsafe impl < T : Send > Send for Sender < T > { }
471- unsafe impl < T : Send > Sync for Sender < T > { }
472-
473- unsafe impl < T : Send > Send for Receiver < T > { }
474- unsafe impl < T : Send > Sync for Receiver < T > { }
475-
476474impl < T > Sender < T > {
477475 /// Creates the sending-half of the [`broadcast`] channel.
478476 ///
@@ -511,10 +509,10 @@ impl<T> Sender<T> {
511509 let mut buffer = Vec :: with_capacity ( capacity) ;
512510
513511 for i in 0 ..capacity {
514- buffer. push ( RwLock :: new ( Slot {
512+ buffer. push ( Mutex :: new ( Slot {
515513 rem : AtomicUsize :: new ( 0 ) ,
516514 pos : ( i as u64 ) . wrapping_sub ( capacity as u64 ) ,
517- val : UnsafeCell :: new ( None ) ,
515+ val : None ,
518516 } ) ) ;
519517 }
520518
@@ -600,7 +598,7 @@ impl<T> Sender<T> {
600598 tail. pos = tail. pos . wrapping_add ( 1 ) ;
601599
602600 // Get the slot
603- let mut slot = self . shared . buffer [ idx] . write ( ) ;
601+ let mut slot = self . shared . buffer [ idx] . lock ( ) ;
604602
605603 // Track the position
606604 slot. pos = pos;
@@ -609,7 +607,7 @@ impl<T> Sender<T> {
609607 slot. rem . with_mut ( |v| * v = rem) ;
610608
611609 // Write the value
612- slot. val = UnsafeCell :: new ( Some ( value) ) ;
610+ slot. val = Some ( value) ;
613611
614612 // Release the slot lock before notifying the receivers.
615613 drop ( slot) ;
@@ -696,7 +694,7 @@ impl<T> Sender<T> {
696694 while low < high {
697695 let mid = low + ( high - low) / 2 ;
698696 let idx = base_idx. wrapping_add ( mid) & self . shared . mask ;
699- if self . shared . buffer [ idx] . read ( ) . rem . load ( SeqCst ) == 0 {
697+ if self . shared . buffer [ idx] . lock ( ) . rem . load ( SeqCst ) == 0 {
700698 low = mid + 1 ;
701699 } else {
702700 high = mid;
@@ -738,7 +736,7 @@ impl<T> Sender<T> {
738736 let tail = self . shared . tail . lock ( ) ;
739737
740738 let idx = ( tail. pos . wrapping_sub ( 1 ) & self . shared . mask as u64 ) as usize ;
741- self . shared . buffer [ idx] . read ( ) . rem . load ( SeqCst ) == 0
739+ self . shared . buffer [ idx] . lock ( ) . rem . load ( SeqCst ) == 0
742740 }
743741
744742 /// Returns the number of active receivers.
@@ -1058,7 +1056,7 @@ impl<T> Receiver<T> {
10581056 let idx = ( self . next & self . shared . mask as u64 ) as usize ;
10591057
10601058 // The slot holding the next value to read
1061- let mut slot = self . shared . buffer [ idx] . read ( ) ;
1059+ let mut slot = self . shared . buffer [ idx] . lock ( ) ;
10621060
10631061 if slot. pos != self . next {
10641062 // Release the `slot` lock before attempting to acquire the `tail`
@@ -1075,7 +1073,7 @@ impl<T> Receiver<T> {
10751073 let mut tail = self . shared . tail . lock ( ) ;
10761074
10771075 // Acquire slot lock again
1078- slot = self . shared . buffer [ idx] . read ( ) ;
1076+ slot = self . shared . buffer [ idx] . lock ( ) ;
10791077
10801078 // Make sure the position did not change. This could happen in the
10811079 // unlikely event that the buffer is wrapped between dropping the
@@ -1367,12 +1365,12 @@ impl<'a, T> Recv<'a, T> {
13671365 fn new ( receiver : & ' a mut Receiver < T > ) -> Recv < ' a , T > {
13681366 Recv {
13691367 receiver,
1370- waiter : UnsafeCell :: new ( Waiter {
1368+ waiter : WaiterCell ( UnsafeCell :: new ( Waiter {
13711369 queued : AtomicBool :: new ( false ) ,
13721370 waker : None ,
13731371 pointers : linked_list:: Pointers :: new ( ) ,
13741372 _p : PhantomPinned ,
1375- } ) ,
1373+ } ) ) ,
13761374 }
13771375 }
13781376
@@ -1384,7 +1382,7 @@ impl<'a, T> Recv<'a, T> {
13841382 is_unpin :: < & mut Receiver < T > > ( ) ;
13851383
13861384 let me = self . get_unchecked_mut ( ) ;
1387- ( me. receiver , & me. waiter )
1385+ ( me. receiver , & me. waiter . 0 )
13881386 }
13891387 }
13901388}
@@ -1418,6 +1416,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14181416 // `Shared::notify_rx` before we drop the object.
14191417 let queued = self
14201418 . waiter
1419+ . 0
14211420 . with ( |ptr| unsafe { ( * ptr) . queued . load ( Acquire ) } ) ;
14221421
14231422 // If the waiter is queued, we need to unlink it from the waiters list.
@@ -1432,6 +1431,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14321431 // `Relaxed` order suffices because we hold the tail lock.
14331432 let queued = self
14341433 . waiter
1434+ . 0
14351435 . with_mut ( |ptr| unsafe { ( * ptr) . queued . load ( Relaxed ) } ) ;
14361436
14371437 if queued {
@@ -1440,7 +1440,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14401440 // safety: tail lock is held and the wait node is verified to be in
14411441 // the list.
14421442 unsafe {
1443- self . waiter . with_mut ( |ptr| {
1443+ self . waiter . 0 . with_mut ( |ptr| {
14441444 tail. waiters . remove ( ( & mut * ptr) . into ( ) ) ;
14451445 } ) ;
14461446 }
@@ -1486,16 +1486,15 @@ impl<'a, T> RecvGuard<'a, T> {
14861486 where
14871487 T : Clone ,
14881488 {
1489- self . slot . val . with ( |ptr| unsafe { ( * ptr ) . clone ( ) } )
1489+ self . slot . val . clone ( )
14901490 }
14911491}
14921492
14931493impl < ' a , T > Drop for RecvGuard < ' a , T > {
14941494 fn drop ( & mut self ) {
14951495 // Decrement the remaining counter
14961496 if 1 == self . slot . rem . fetch_sub ( 1 , SeqCst ) {
1497- // Safety: Last receiver, drop the value
1498- self . slot . val . with_mut ( |ptr| unsafe { * ptr = None } ) ;
1497+ self . slot . val = None ;
14991498 }
15001499 }
15011500}
0 commit comments