Skip to content

Commit d361f41

Browse files
authored
Replace crossbeam-channel by the std variant (#653)
Replaces crossbeam-channel with the channels in the standard library. The implementations in the standard library are derived from crossbeam, but seem to be more actively maintained. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent d400bd3 commit d361f41

File tree

7 files changed

+10
-12
lines changed

7 files changed

+10
-12
lines changed

communication/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ serde = { version = "1.0", features = ["derive"] }
2424
timely_bytes = { path = "../bytes", version = "0.13" }
2525
timely_container = { path = "../container", version = "0.15.0" }
2626
timely_logging = { path = "../logging", version = "0.13" }
27-
crossbeam-channel = "0.5"
2827

2928
# Lgalloc only supports linux and macos, don't depend on any other OS.
3029
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]

communication/src/allocator/counters.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5+
use std::sync::mpsc::Sender;
56

67
use crate::{Push, Pull};
78

@@ -51,8 +52,6 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
5152
}
5253
}
5354

54-
use crossbeam_channel::Sender;
55-
5655
/// The push half of an intra-thread channel.
5756
pub struct ArcPusher<T, P: Push<T>> {
5857
index: usize,

communication/src/allocator/process.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
66
use std::any::Any;
77
use std::time::Duration;
88
use std::collections::{HashMap};
9-
use crossbeam_channel::{Sender, Receiver};
9+
use std::sync::mpsc::{Sender, Receiver};
1010

1111
use crate::allocator::thread::{ThreadBuilder};
1212
use crate::allocator::{Allocate, AllocateBuilder, PeerBuilder, Thread};
@@ -80,7 +80,7 @@ impl PeerBuilder for Process {
8080
let mut counters_send = Vec::with_capacity(peers);
8181
let mut counters_recv = Vec::with_capacity(peers);
8282
for _ in 0 .. peers {
83-
let (send, recv) = crossbeam_channel::unbounded();
83+
let (send, recv) = std::sync::mpsc::channel();
8484
counters_send.push(send);
8585
counters_recv.push(recv);
8686
}
@@ -130,7 +130,7 @@ impl Allocate for Process {
130130
let mut pushers = Vec::with_capacity(self.peers);
131131
let mut pullers = Vec::with_capacity(self.peers);
132132
for buzzer in self.buzzers.iter() {
133-
let (s, r): (Sender<T>, Receiver<T>) = crossbeam_channel::unbounded();
133+
let (s, r): (Sender<T>, Receiver<T>) = std::sync::mpsc::channel();
134134
// TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter.
135135
pushers.push((Pusher { target: s }, buzzer.clone()));
136136
pullers.push(Puller { source: r, current: None });

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use std::rc::Rc;
33
use std::cell::RefCell;
44
use std::collections::{VecDeque, HashMap, hash_map::Entry};
5-
use crossbeam_channel::{Sender, Receiver};
5+
use std::sync::mpsc::{Sender, Receiver};
66

77
use timely_bytes::arc::Bytes;
88

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::collections::{VecDeque, HashMap, hash_map::Entry};
6-
use crossbeam_channel::{Sender, Receiver};
6+
use std::sync::mpsc::{Sender, Receiver};
77

88
use timely_bytes::arc::Bytes;
99

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Methods related to reading from and writing to TCP connections
22
33
use std::io::{self, Write};
4-
use crossbeam_channel::{Sender, Receiver};
4+
use std::sync::mpsc::{Sender, Receiver};
55

66
use crate::networking::MessageHeader;
77

communication/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ pub use allocator::Generic as Allocator;
107107
pub use allocator::{Allocate, Exchangeable};
108108
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
109109

110+
use std::sync::mpsc::{Sender, Receiver};
111+
110112
use timely_bytes::arc::Bytes;
111113

112114
/// A type that can be serialized and deserialized through `Bytes`.
@@ -169,8 +171,6 @@ impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
169171
}
170172

171173

172-
use crossbeam_channel::{Sender, Receiver};
173-
174174
/// Allocate a matrix of send and receive changes to exchange items.
175175
///
176176
/// This method constructs channels for `sends` threads to create and send
@@ -183,7 +183,7 @@ fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<V
183183

184184
for sender in senders.iter_mut() {
185185
for recver in recvers.iter_mut() {
186-
let (send, recv) = crossbeam_channel::unbounded();
186+
let (send, recv) = std::sync::mpsc::channel();
187187
sender.push(send);
188188
recver.push(recv);
189189
}

0 commit comments

Comments
 (0)