diff --git a/.github/workflows/miri.yml b/.github/workflows/miri.yml index ba4ea8075..304e2e6d0 100644 --- a/.github/workflows/miri.yml +++ b/.github/workflows/miri.yml @@ -13,7 +13,6 @@ jobs: - uses: actions-rust-lang/setup-rust-toolchain@v1 with: toolchain: nightly - - name: Install miri - run: rustup component add miri + components: miri - name: Cargo test run: cargo miri test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ce93447cb..dc4fe97bb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,9 +23,24 @@ jobs: - uses: actions-rust-lang/setup-rust-toolchain@v1 with: toolchain: ${{ matrix.toolchain }} + components: clippy - name: Cargo test run: cargo test --workspace --all-targets + # Check for clippy warnings + clippy: + name: Cargo clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + components: clippy + - name: Cargo clippy + run: cargo clippy --workspace --all-targets + env: + RUSTFLAGS: "" # Don't make test fail on clippy + # Check mdbook files for errors mdbook: name: test mdBook diff --git a/Cargo.toml b/Cargo.toml index 660f267c3..9f7738ccb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,70 @@ edition = "2021" [workspace.dependencies] columnar = "0.3" +[workspace.lints.clippy] +type_complexity = "allow" +option_map_unit_fn = "allow" +wrong_self_convention = "allow" +should_implement_trait = "allow" +module_inception = "allow" + +#as_conversions = "warn" +bool_comparison = "warn" +borrow_interior_mutable_const = "warn" +borrowed_box = "warn" +builtin_type_shadow = "warn" +clone_on_ref_ptr = "warn" +crosspointer_transmute = "warn" +dbg_macro = "warn" +deref_addrof = "warn" +disallowed_macros = "warn" +disallowed_methods = "warn" +disallowed_types = "warn" +double_must_use = "warn" +double_neg = "warn" +double_parens = "warn" +duplicate_underscore_argument = "warn" +excessive_precision = "warn" +extra_unused_lifetimes = "warn" +from_over_into = "warn" +match_overlapping_arm = "warn" +must_use_unit = "warn" +mut_mutex_lock = "warn" +needless_borrow = "warn" +needless_pass_by_ref_mut = "warn" +needless_question_mark = "warn" +needless_return = "warn" +no_effect = "warn" +panicking_overflow_checks = "warn" +partialeq_ne_impl = "warn" +print_literal = "warn" +redundant_closure = "warn" +redundant_closure_call = "warn" +redundant_field_names = "warn" +redundant_pattern = "warn" +redundant_slicing = "warn" +redundant_static_lifetimes = "warn" +same_item_push = "warn" +shadow_unrelated = "warn" +single_component_path_imports = "warn" +suspicious_assignment_formatting = "warn" +suspicious_else_formatting = "warn" +suspicious_unary_op_formatting = "warn" +todo = "warn" +transmutes_expressible_as_ptr_casts = "warn" +unnecessary_cast = "warn" +unnecessary_lazy_evaluations = "warn" +unnecessary_mut_passed = "warn" +unnecessary_unwrap = "warn" +unused_async = "warn" +useless_asref = "warn" +useless_conversion = "warn" +useless_format = "warn" +wildcard_in_or_patterns = "warn" +write_literal = "warn" +zero_divided_by_zero = "warn" +zero_prefixed_literal = "warn" + [profile.release] opt-level = 3 debug = true diff --git a/bytes/Cargo.toml b/bytes/Cargo.toml index 9d997d749..3376e355e 100644 --- a/bytes/Cargo.toml +++ b/bytes/Cargo.toml @@ -11,3 +11,6 @@ homepage = "https://github.com/TimelyDataflow/timely-dataflow" repository = "https://github.com/TimelyDataflow/timely-dataflow.git" keywords = ["timely", "dataflow", "bytes"] license = "MIT" + +[lints] +workspace = true diff --git a/bytes/src/lib.rs b/bytes/src/lib.rs index 8dd23f403..be3a98b17 100644 --- a/bytes/src/lib.rs +++ b/bytes/src/lib.rs @@ -94,7 +94,7 @@ pub mod arc { let result = BytesMut { ptr: self.ptr, len: index, - sequestered: self.sequestered.clone(), + sequestered: Arc::clone(&self.sequestered), }; self.ptr = self.ptr.wrapping_add(index); @@ -204,7 +204,7 @@ pub mod arc { let result = Bytes { ptr: self.ptr, len: index, - sequestered: self.sequestered.clone(), + sequestered: Arc::clone(&self.sequestered), }; self.ptr = self.ptr.wrapping_add(index); diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 000000000..b2141e587 --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +ignore-interior-mutability = ["timely::dataflow::operators::capability::Capability"] diff --git a/communication/Cargo.toml b/communication/Cargo.toml index 696c9124e..259a0b081 100644 --- a/communication/Cargo.toml +++ b/communication/Cargo.toml @@ -13,6 +13,9 @@ repository = "https://github.com/TimelyDataflow/timely-dataflow.git" keywords = ["timely", "dataflow"] license = "MIT" +[lints] +workspace = true + [features] default = ["getopts"] diff --git a/communication/examples/comm_hello.rs b/communication/examples/comm_hello.rs index 309a52e94..2b65efd01 100644 --- a/communication/examples/comm_hello.rs +++ b/communication/examples/comm_hello.rs @@ -32,9 +32,9 @@ fn main() { let (mut senders, mut receiver) = allocator.allocate(0); // send typed data along each channel - for i in 0 .. allocator.peers() { - senders[i].send(Message { payload: format!("hello, {}", i)}); - senders[i].done(); + for (i, sender) in senders.iter_mut().enumerate() { + sender.send(Message { payload: format!("hello, {}", i)}); + sender.done(); } // no support for termination notification, diff --git a/communication/examples/lgalloc.rs b/communication/examples/lgalloc.rs index d1834a398..255409390 100644 --- a/communication/examples/lgalloc.rs +++ b/communication/examples/lgalloc.rs @@ -1,112 +1,126 @@ -#![cfg(any(target_os = "linux", target_os = "macos"))] +//! Test using lgalloc as the allocator for zero-copy data transfer. +//! Note that this example only works on Linux and MacOS. + +#[cfg(any(target_os = "linux", target_os = "macos"))] +mod example { + use std::ops::{Deref, DerefMut}; + use std::ptr::NonNull; + use timely_communication::{Allocate, Bytesable}; + use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill; + + /// A wrapper that indicates the serialization/deserialization strategy. + pub struct Message { + /// Text contents. + pub payload: String, + } -use std::ops::{Deref, DerefMut}; -use std::ptr::NonNull; -use timely_communication::{Allocate, Bytesable}; -use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill; + impl Bytesable for Message { + fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { + Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } + } -/// A wrapper that indicates the serialization/deserialization strategy. -pub struct Message { - /// Text contents. - pub payload: String, -} + fn length_in_bytes(&self) -> usize { + self.payload.len() + } -impl Bytesable for Message { - fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { - Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } + fn into_bytes(&self, writer: &mut W) { + writer.write_all(self.payload.as_bytes()).unwrap(); + } } - fn length_in_bytes(&self) -> usize { - self.payload.len() + fn lgalloc_refill(size: usize) -> Box { + let (pointer, capacity, handle) = lgalloc::allocate::(size).unwrap(); + let handle = Some(handle); + Box::new(LgallocHandle { handle, pointer, capacity }) } - fn into_bytes(&self, writer: &mut W) { - writer.write_all(self.payload.as_bytes()).unwrap(); + struct LgallocHandle { + handle: Option, + pointer: NonNull, + capacity: usize, } -} -fn lgalloc_refill(size: usize) -> Box { - let (pointer, capacity, handle) = lgalloc::allocate::(size).unwrap(); - let handle = Some(handle); - Box::new(LgallocHandle { handle, pointer, capacity }) -} - -struct LgallocHandle { - handle: Option, - pointer: NonNull, - capacity: usize, -} - -impl Deref for LgallocHandle { - type Target = [u8]; - #[inline(always)] - fn deref(&self) -> &Self::Target { - unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) } + impl Deref for LgallocHandle { + type Target = [u8]; + #[inline(always)] + fn deref(&self) -> &Self::Target { + unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) } + } } -} -impl DerefMut for LgallocHandle { - #[inline(always)] - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) } + impl DerefMut for LgallocHandle { + #[inline(always)] + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) } + } } -} -impl Drop for LgallocHandle { - fn drop(&mut self) { - lgalloc::deallocate(self.handle.take().unwrap()); + impl Drop for LgallocHandle { + fn drop(&mut self) { + lgalloc::deallocate(self.handle.take().unwrap()); + } } -} -fn main() { - let mut config = lgalloc::LgAlloc::new(); - config.enable().with_path(std::env::temp_dir()); - lgalloc::lgalloc_set_config(&config); + pub(crate) fn main() { + let mut lgconfig = lgalloc::LgAlloc::new(); + lgconfig.enable().with_path(std::env::temp_dir()); + lgalloc::lgalloc_set_config(&lgconfig); - let refill = BytesRefill { - logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box>), - limit: None, - }; + let refill = BytesRefill { + logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box>), + limit: None, + }; - // extract the configuration from user-supplied arguments, initialize the computation. - let config = timely_communication::Config::ProcessBinary(4); - let (allocators, others) = config.try_build_with(refill).unwrap(); - let guards = timely_communication::initialize_from(allocators, others, |mut allocator| { + // extract the configuration from user-supplied arguments, initialize the computation. + let config = timely_communication::Config::ProcessBinary(4); + let (allocators, others) = config.try_build_with(refill).unwrap(); + let guards = timely_communication::initialize_from(allocators, others, |mut allocator| { - println!("worker {} of {} started", allocator.index(), allocator.peers()); + println!("worker {} of {} started", allocator.index(), allocator.peers()); - // allocates a pair of senders list and one receiver. - let (mut senders, mut receiver) = allocator.allocate(0); + // allocates a pair of senders list and one receiver. + let (mut senders, mut receiver) = allocator.allocate(0); - // send typed data along each channel - for i in 0 .. allocator.peers() { - senders[i].send(Message { payload: format!("hello, {}", i)}); - senders[i].done(); - } + // send typed data along each channel + for (i, sender) in senders.iter_mut().enumerate() { + sender.send(Message { payload: format!("hello, {}", i)}); + sender.done(); + } - // no support for termination notification, - // we have to count down ourselves. - let mut received = 0; - while received < allocator.peers() { + // no support for termination notification, + // we have to count down ourselves. + let mut received = 0; + while received < allocator.peers() { - allocator.receive(); + allocator.receive(); - if let Some(message) = receiver.recv() { - println!("worker {}: received: <{}>", allocator.index(), message.payload); - received += 1; - } + if let Some(message) = receiver.recv() { + println!("worker {}: received: <{}>", allocator.index(), message.payload); + received += 1; + } - allocator.release(); - } + allocator.release(); + } - allocator.index() - }); + allocator.index() + }); - // computation runs until guards are joined or dropped. - if let Ok(guards) = guards { - for guard in guards.join() { - println!("result: {:?}", guard); + // computation runs until guards are joined or dropped. + if let Ok(guards) = guards { + for guard in guards.join() { + println!("result: {:?}", guard); + } } + else { println!("error in computation"); } } - else { println!("error in computation"); } +} + +// Disable entirely on non-Linux and non-MacOS platforms. +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +mod example { + pub fn main() { } +} + +pub fn main() { + example::main(); } diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index 22afc44c0..536237631 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -94,7 +94,7 @@ pub trait Allocate { (thread::ThreadPusher, thread::ThreadPuller) { - thread::Thread::new_from(identifier, self.events().clone()) + thread::Thread::new_from(identifier, Rc::clone(self.events())) } /// Allocates a broadcast channel, where each pushed message is received by all. diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index 4d6dfecd3..cc7471f9e 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -102,7 +102,7 @@ impl PeerBuilder for Process { peers, buzzers_send: bsend, buzzers_recv: brecv, - channels: channels.clone(), + channels: Arc::clone(&channels), counters_send: counters_send.clone(), counters_recv: recv, } @@ -173,7 +173,7 @@ impl Allocate for Process { .map(|s| Box::new(s) as Box>) .collect::>(); - let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box>; + let recv = Box::new(CountPuller::new(recv, identifier, Rc::clone(self.inner.events()))) as Box>; (sends, recv) } diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index c957755c5..9857ed5ba 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -30,7 +30,7 @@ impl Allocate for Thread { fn index(&self) -> usize { 0 } fn peers(&self) -> usize { 1 } fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { - let (pusher, puller) = Thread::new_from(identifier, self.events.clone()); + let (pusher, puller) = Thread::new_from(identifier, Rc::clone(&self.events)); (vec![Box::new(pusher)], Box::new(puller)) } fn events(&self) -> &Rc>> { @@ -59,8 +59,8 @@ impl Thread { -> (ThreadPusher, ThreadPuller) { let shared = Rc::new(RefCell::new((VecDeque::::new(), VecDeque::::new()))); - let pusher = Pusher { target: shared.clone() }; - let pusher = CountPusher::new(pusher, identifier, events.clone()); + let pusher = Pusher { target: Rc::clone(&shared) }; + let pusher = CountPusher::new(pusher, identifier, Rc::clone(&events)); let puller = Puller { source: shared, current: None }; let puller = CountPuller::new(puller, identifier, events); (pusher, puller) diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 9ddcaef9c..8f35279f0 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -175,19 +175,15 @@ impl Allocate for TcpAllocator { // create, box, and stash new process_binary pusher. if process_id > self.index / inner_peers { process_id -= 1; } - pushes.push(Box::new(Pusher::new(header, self.sends[process_id].clone()))); + pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[process_id])))); } } - let channel = - self.to_local - .entry(identifier) - .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new()))) - .clone(); + let channel = Rc::clone(self.to_local.entry(identifier).or_default()); use crate::allocator::counters::Puller as CountPuller; - let canary = Canary::new(identifier, self.canaries.clone()); - let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone())); + let canary = Canary::new(identifier, Rc::clone(&self.canaries)); + let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events()))); (pushes, puller, ) } @@ -221,18 +217,14 @@ impl Allocate for TcpAllocator { length: 0, seqno: 0, }; - pushes.push(Box::new(Pusher::new(header, send.clone()))) + pushes.push(Box::new(Pusher::new(header, Rc::clone(send)))) } - let channel = - self.to_local - .entry(identifier) - .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new()))) - .clone(); + let channel = Rc::clone(self.to_local.entry(identifier).or_default()); use crate::allocator::counters::Puller as CountPuller; - let canary = Canary::new(identifier, self.canaries.clone()); - let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone())); + let canary = Canary::new(identifier, Rc::clone(&self.canaries)); + let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events()))); let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes }); (pushes, puller, ) diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index 689e20dda..f948e257b 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -147,18 +147,14 @@ impl Allocate for ProcessAllocator { }; // create, box, and stash new process_binary pusher. - pushes.push(Box::new(Pusher::new(header, self.sends[target_index].clone()))); + pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[target_index])))); } - let channel = - self.to_local - .entry(identifier) - .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new()))) - .clone(); + let channel = Rc::clone(self.to_local.entry(identifier).or_default()); use crate::allocator::counters::Puller as CountPuller; - let canary = Canary::new(identifier, self.canaries.clone()); - let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, self.events().clone())); + let canary = Canary::new(identifier, Rc::clone(&self.canaries)); + let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, Rc::clone(self.events()))); (pushes, puller) } diff --git a/communication/src/allocator/zero_copy/bytes_exchange.rs b/communication/src/allocator/zero_copy/bytes_exchange.rs index e19f57552..d3b61fa6e 100644 --- a/communication/src/allocator/zero_copy/bytes_exchange.rs +++ b/communication/src/allocator/zero_copy/bytes_exchange.rs @@ -75,9 +75,9 @@ impl BytesPush for MergeQueue { bytes }; - for bytes in iterator { - if let Err(bytes) = tail.try_merge(bytes) { - queue.push_back(::std::mem::replace(&mut tail, bytes)); + for more_bytes in iterator { + if let Err(more_bytes) = tail.try_merge(more_bytes) { + queue.push_back(::std::mem::replace(&mut tail, more_bytes)); } } queue.push_back(tail); diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index 83e500bfc..793e25ff6 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -87,7 +87,7 @@ pub fn initialize_networking_from_sockets( let remote_recv = promises_iter.next().unwrap(); { - let log_sender = log_sender.clone(); + let log_sender = Arc::clone(&log_sender); let stream = stream.try_clone()?; let join_guard = ::std::thread::Builder::new() @@ -110,7 +110,7 @@ pub fn initialize_networking_from_sockets( { // let remote_sends = remote_sends.clone(); - let log_sender = log_sender.clone(); + let log_sender = Arc::clone(&log_sender); let stream = stream.try_clone()?; let refill = refill.clone(); let join_guard = diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 90be31499..7592eb67e 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -223,52 +223,49 @@ impl Config { /// } /// } /// -/// fn main() { +/// // extract the configuration from user-supplied arguments, initialize the computation. +/// let config = timely_communication::Config::from_args(std::env::args()).unwrap(); +/// let guards = timely_communication::initialize(config, |mut allocator| { /// -/// // extract the configuration from user-supplied arguments, initialize the computation. -/// let config = timely_communication::Config::from_args(std::env::args()).unwrap(); -/// let guards = timely_communication::initialize(config, |mut allocator| { +/// println!("worker {} of {} started", allocator.index(), allocator.peers()); /// -/// println!("worker {} of {} started", allocator.index(), allocator.peers()); +/// // allocates a pair of senders list and one receiver. +/// let (mut senders, mut receiver) = allocator.allocate(0); /// -/// // allocates a pair of senders list and one receiver. -/// let (mut senders, mut receiver) = allocator.allocate(0); -/// -/// // send typed data along each channel -/// for i in 0 .. allocator.peers() { -/// senders[i].send(Message { payload: format!("hello, {}", i)}); -/// senders[i].done(); -/// } -/// -/// // no support for termination notification, -/// // we have to count down ourselves. -/// let mut received = 0; -/// while received < allocator.peers() { +/// // send typed data along each channel +/// for i in 0 .. allocator.peers() { +/// senders[i].send(Message { payload: format!("hello, {}", i)}); +/// senders[i].done(); +/// } /// -/// allocator.receive(); +/// // no support for termination notification, +/// // we have to count down ourselves. +/// let mut received = 0; +/// while received < allocator.peers() { /// -/// if let Some(message) = receiver.recv() { -/// println!("worker {}: received: <{}>", allocator.index(), message.payload); -/// received += 1; -/// } +/// allocator.receive(); /// -/// allocator.release(); +/// if let Some(message) = receiver.recv() { +/// println!("worker {}: received: <{}>", allocator.index(), message.payload); +/// received += 1; /// } /// -/// allocator.index() -/// }); +/// allocator.release(); +/// } /// -/// // computation runs until guards are joined or dropped. -/// if let Ok(guards) = guards { -/// for guard in guards.join() { -/// println!("result: {:?}", guard); -/// } +/// allocator.index() +/// }); +/// +/// // computation runs until guards are joined or dropped. +/// if let Ok(guards) = guards { +/// for guard in guards.join() { +/// println!("result: {:?}", guard); /// } -/// else { println!("error in computation"); } /// } +/// else { println!("error in computation"); } /// ``` /// -/// The should produce output like: +/// This should produce output like: /// /// ```ignore /// worker 0 started @@ -319,49 +316,46 @@ pub fn initializeT+Send+Sync+'static>( /// } /// } /// -/// fn main() { +/// // extract the configuration from user-supplied arguments, initialize the computation. +/// let config = timely_communication::Config::from_args(std::env::args()).unwrap(); +/// let guards = timely_communication::initialize(config, |mut allocator| { /// -/// // extract the configuration from user-supplied arguments, initialize the computation. -/// let config = timely_communication::Config::from_args(std::env::args()).unwrap(); -/// let guards = timely_communication::initialize(config, |mut allocator| { +/// println!("worker {} of {} started", allocator.index(), allocator.peers()); /// -/// println!("worker {} of {} started", allocator.index(), allocator.peers()); +/// // allocates a pair of senders list and one receiver. +/// let (mut senders, mut receiver) = allocator.allocate(0); /// -/// // allocates a pair of senders list and one receiver. -/// let (mut senders, mut receiver) = allocator.allocate(0); -/// -/// // send typed data along each channel -/// for i in 0 .. allocator.peers() { -/// senders[i].send(Message { payload: format!("hello, {}", i)}); -/// senders[i].done(); -/// } -/// -/// // no support for termination notification, -/// // we have to count down ourselves. -/// let mut received = 0; -/// while received < allocator.peers() { +/// // send typed data along each channel +/// for i in 0 .. allocator.peers() { +/// senders[i].send(Message { payload: format!("hello, {}", i)}); +/// senders[i].done(); +/// } /// -/// allocator.receive(); +/// // no support for termination notification, +/// // we have to count down ourselves. +/// let mut received = 0; +/// while received < allocator.peers() { /// -/// if let Some(message) = receiver.recv() { -/// println!("worker {}: received: <{}>", allocator.index(), message.payload); -/// received += 1; -/// } +/// allocator.receive(); /// -/// allocator.release(); +/// if let Some(message) = receiver.recv() { +/// println!("worker {}: received: <{}>", allocator.index(), message.payload); +/// received += 1; /// } /// -/// allocator.index() -/// }); +/// allocator.release(); +/// } /// -/// // computation runs until guards are joined or dropped. -/// if let Ok(guards) = guards { -/// for guard in guards.join() { -/// println!("result: {:?}", guard); -/// } +/// allocator.index() +/// }); +/// +/// // computation runs until guards are joined or dropped. +/// if let Ok(guards) = guards { +/// for guard in guards.join() { +/// println!("result: {:?}", guard); /// } -/// else { println!("error in computation"); } /// } +/// else { println!("error in computation"); } /// ``` pub fn initialize_from( builders: Vec, @@ -376,7 +370,7 @@ where let logic = Arc::new(func); let mut guards = Vec::new(); for (index, builder) in builders.into_iter().enumerate() { - let clone = logic.clone(); + let clone = Arc::clone(&logic); guards.push(thread::Builder::new() .name(format!("timely:work-{}", index)) .spawn(move || { diff --git a/communication/src/lib.rs b/communication/src/lib.rs index feb284886..d58e4860d 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -16,70 +16,67 @@ //! # Examples //! ``` //! use timely_communication::{Allocate, Bytesable}; -//! +//! //! /// A wrapper that indicates `bincode` as the serialization/deserialization strategy. //! pub struct Message { //! /// Text contents. //! pub payload: String, //! } -//! +//! //! impl Bytesable for Message { //! fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { //! Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } //! } -//! +//! //! fn length_in_bytes(&self) -> usize { //! self.payload.len() //! } -//! +//! //! fn into_bytes(&self, writer: &mut W) { //! writer.write_all(self.payload.as_bytes()).unwrap(); //! } //! } -//! -//! fn main() { -//! -//! // extract the configuration from user-supplied arguments, initialize the computation. -//! let config = timely_communication::Config::from_args(std::env::args()).unwrap(); -//! let guards = timely_communication::initialize(config, |mut allocator| { -//! -//! println!("worker {} of {} started", allocator.index(), allocator.peers()); -//! -//! // allocates a pair of senders list and one receiver. -//! let (mut senders, mut receiver) = allocator.allocate(0); -//! -//! // send typed data along each channel -//! for i in 0 .. allocator.peers() { -//! senders[i].send(Message { payload: format!("hello, {}", i)}); -//! senders[i].done(); -//! } -//! -//! // no support for termination notification, -//! // we have to count down ourselves. -//! let mut received = 0; -//! while received < allocator.peers() { -//! -//! allocator.receive(); -//! -//! if let Some(message) = receiver.recv() { -//! println!("worker {}: received: <{}>", allocator.index(), message.payload); -//! received += 1; -//! } -//! -//! allocator.release(); -//! } -//! -//! allocator.index() -//! }); -//! -//! // computation runs until guards are joined or dropped. -//! if let Ok(guards) = guards { -//! for guard in guards.join() { -//! println!("result: {:?}", guard); +//! +//! // extract the configuration from user-supplied arguments, initialize the computation. +//! let config = timely_communication::Config::from_args(std::env::args()).unwrap(); +//! let guards = timely_communication::initialize(config, |mut allocator| { +//! +//! println!("worker {} of {} started", allocator.index(), allocator.peers()); +//! +//! // allocates a pair of senders list and one receiver. +//! let (mut senders, mut receiver) = allocator.allocate(0); +//! +//! // send typed data along each channel +//! for i in 0 .. allocator.peers() { +//! senders[i].send(Message { payload: format!("hello, {}", i)}); +//! senders[i].done(); +//! } +//! +//! // no support for termination notification, +//! // we have to count down ourselves. +//! let mut received = 0; +//! while received < allocator.peers() { +//! +//! allocator.receive(); +//! +//! if let Some(message) = receiver.recv() { +//! println!("worker {}: received: <{}>", allocator.index(), message.payload); +//! received += 1; //! } +//! +//! allocator.release(); +//! } +//! +//! allocator.index() +//! }); +//! +//! // computation runs until guards are joined or dropped. +//! if let Ok(guards) = guards { +//! for guard in guards.join() { +//! println!("result: {:?}", guard); //! } -//! else { println!("error in computation"); } //! } +//! else { println!("error in computation"); } //! ``` //! //! This should produce output like: diff --git a/communication/src/networking.rs b/communication/src/networking.rs index 251e34427..f06efe68e 100644 --- a/communication/src/networking.rs +++ b/communication/src/networking.rs @@ -50,7 +50,7 @@ impl MessageHeader { /// Returns a header when there is enough supporting data #[inline] pub fn try_read(bytes: &[u8]) -> Option { - let mut cursor = io::Cursor::new(&bytes[..]); + let mut cursor = io::Cursor::new(bytes); let mut buffer = [0; Self::FIELDS]; cursor.read_u64_into::(&mut buffer).ok()?; let header = MessageHeader { @@ -106,7 +106,7 @@ impl MessageHeader { pub fn create_sockets(addresses: Vec, my_index: usize, noisy: bool) -> Result>> { let hosts1 = Arc::new(addresses); - let hosts2 = hosts1.clone(); + let hosts2 = Arc::clone(&hosts1); let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy)); let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy)); diff --git a/container/Cargo.toml b/container/Cargo.toml index db9c41e0d..c0689f201 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -4,3 +4,6 @@ version = "0.15.0" description = "Container abstractions for Timely" license = "MIT" edition.workspace = true + +[lints] +workspace = true diff --git a/logging/Cargo.toml b/logging/Cargo.toml index 33722fd86..ce35de892 100644 --- a/logging/Cargo.toml +++ b/logging/Cargo.toml @@ -11,5 +11,8 @@ repository = "https://github.com/TimelyDataflow/timely-dataflow.git" keywords = ["timely", "dataflow", "logging"] license = "MIT" +[lints] +workspace = true + [dependencies] timely_container = { version = "0.15.0", path = "../container" } diff --git a/logging/src/lib.rs b/logging/src/lib.rs index d0d583c67..b6c07334c 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -96,7 +96,7 @@ pub struct Logger { impl Clone for Logger { fn clone(&self) -> Self { Self { - inner: self.inner.clone() + inner: Rc::clone(&self.inner) } } } @@ -241,7 +241,7 @@ impl>() .into_iter() .map(|l| l.incoming().next().unwrap().unwrap()) - .map(|r| EventReader::<_,Vec,_>::new(r)) + .map(EventReader::<_,Vec,_>::new) .collect::>(); worker.dataflow::(|scope| { diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 12d362f3c..54d87b233 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -31,7 +31,7 @@ fn main() { // initializes and runs a timely dataflow. timely::execute(config, |worker| { let mut input = >>::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output worker.dataflow::(|scope| { @@ -95,7 +95,7 @@ fn main() { ) .container::() .inspect(|x| println!("seen: {:?}", x)) - .probe_with(&mut probe); + .probe_with(&probe); }); // introduce data and watch! @@ -232,7 +232,7 @@ mod container { // If the alignment is borked, we can relocate. IF the size is borked, // not sure what we do in that case. assert!(bytes.len() % 8 == 0); - if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { + if bytemuck::try_cast_slice::<_, u64>(&bytes).is_ok() { Self::Bytes(bytes) } else { diff --git a/timely/examples/distinct.rs b/timely/examples/distinct.rs index d778368fa..b3e58f5c0 100644 --- a/timely/examples/distinct.rs +++ b/timely/examples/distinct.rs @@ -10,7 +10,7 @@ fn main() { timely::execute_from_args(std::env::args(), |worker| { let index = worker.index(); let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output worker.dataflow::(|scope| { @@ -21,7 +21,7 @@ fn main() { input.for_each(|time, data| { let counts = counts_by_time - .entry(time.time().clone()) + .entry(*time.time()) .or_insert(HashMap::new()); let mut session = output.session(&time); for &datum in data.iter() { @@ -35,15 +35,15 @@ fn main() { }) .container::>() .inspect(move |x| println!("worker {}:\tvalue {}", index, x)) - .probe_with(&mut probe); + .probe_with(&probe); }); // introduce data and watch! for round in 0..1 { if index == 0 { - vec![0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x)); + [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x)); } else if index == 1 { - vec![0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x)); + [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x)); } input.advance_to(round + 1); while probe.less_than(input.time()) { diff --git a/timely/examples/flow_controlled.rs b/timely/examples/flow_controlled.rs index e557c23ae..77817b045 100644 --- a/timely/examples/flow_controlled.rs +++ b/timely/examples/flow_controlled.rs @@ -5,7 +5,7 @@ fn main() { timely::execute_from_args(std::env::args(), |worker| { let mut input = (0u64..100000).peekable(); worker.dataflow(|scope| { - let mut probe_handle = probe::Handle::new(); + let probe_handle = probe::Handle::new(); let probe_handle_2 = probe_handle.clone(); iterator_source( @@ -27,7 +27,7 @@ fn main() { }, probe_handle_2) .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d)) - .probe_with(&mut probe_handle); + .probe_with(&probe_handle); }); }).unwrap(); } diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index c5383f993..7ff612d2d 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -21,7 +21,7 @@ fn main() { let mut input1 = InputHandle::new(); let mut input2 = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); worker.dataflow(|scope| { @@ -45,11 +45,11 @@ fn main() { for (key, val1) in data.drain(..) { if let Some(values) = map2.get(&key) { for val2 in values.iter() { - session.give((val1.clone(), val2.clone())); + session.give((val1, *val2)); } } - map1.entry(key).or_insert(Vec::new()).push(val1); + map1.entry(key).or_default().push(val1); } }); @@ -59,17 +59,17 @@ fn main() { for (key, val2) in data.drain(..) { if let Some(values) = map1.get(&key) { for val1 in values.iter() { - session.give((val1.clone(), val2.clone())); + session.give((*val1, val2)); } } - map2.entry(key).or_insert(Vec::new()).push(val2); + map2.entry(key).or_default().push(val2); } }); } }) .container::>() - .probe_with(&mut probe); + .probe_with(&probe); }); let mut rng: SmallRng = SeedableRng::seed_from_u64(index as u64); diff --git a/timely/examples/hello.rs b/timely/examples/hello.rs index bf998f2ec..3353fed68 100644 --- a/timely/examples/hello.rs +++ b/timely/examples/hello.rs @@ -7,14 +7,14 @@ fn main() { let index = worker.index(); let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output worker.dataflow(|scope| { scope.input_from(&mut input) .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) - .probe_with(&mut probe); + .probe_with(&probe); }); // introduce data and watch! diff --git a/timely/examples/logging-recv.rs b/timely/examples/logging-recv.rs index ed4d775b5..ffad3f1bb 100644 --- a/timely/examples/logging-recv.rs +++ b/timely/examples/logging-recv.rs @@ -18,7 +18,7 @@ fn main() { .collect::>() .into_iter() .map(|l| l.incoming().next().unwrap().unwrap()) - .map(|r| EventReader::,_>::new(r)) + .map(EventReader::,_>::new) .collect::>(); worker.dataflow(|scope| { diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 08a16daad..4d9c2bb00 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -13,7 +13,7 @@ fn main() { let batch = std::env::args().nth(1).unwrap().parse::().unwrap(); let rounds = std::env::args().nth(2).unwrap().parse::().unwrap(); let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // Register timely worker logging. worker.log_register().insert::("timely", |time, data| @@ -77,7 +77,7 @@ fn main() { scope .input_from(&mut input) .exchange(|&x| x as u64) - .probe_with(&mut probe); + .probe_with(&probe); }); // Register timely worker logging. @@ -95,7 +95,7 @@ fn main() { scope .input_from(&mut input) .exchange(|&x| x as u64) - .probe_with(&mut probe); + .probe_with(&probe); }); // Register user-level logging. diff --git a/timely/examples/loopdemo.rs b/timely/examples/loopdemo.rs index df7dc3fe8..132f66a6c 100644 --- a/timely/examples/loopdemo.rs +++ b/timely/examples/loopdemo.rs @@ -16,7 +16,7 @@ fn main() { let timer = std::time::Instant::now(); let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // Create a dataflow that discards input data (just synchronizes). worker.dataflow(|scope| { @@ -32,7 +32,7 @@ fn main() { .filter(|x| x > &1); step.connect_loop(loop_handle); - step.probe_with(&mut probe); + step.probe_with(&probe); }); let ns_per_request = 1_000_000_000 / rate; diff --git a/timely/examples/openloop.rs b/timely/examples/openloop.rs index 52604f9b2..4174667cc 100644 --- a/timely/examples/openloop.rs +++ b/timely/examples/openloop.rs @@ -19,14 +19,14 @@ fn main() { let timer = std::time::Instant::now(); let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // Create a dataflow that discards input data (just synchronizes). worker.dataflow(|scope| { scope .input_from(&mut input) // read input. .filter(|_| false) // do nothing. - .probe_with(&mut probe); // observe output. + .probe_with(&probe); // observe output. }); let ns_per_request = 1_000_000_000 / rate; diff --git a/timely/examples/pagerank.rs b/timely/examples/pagerank.rs index 8c9cdb08f..4716182c8 100644 --- a/timely/examples/pagerank.rs +++ b/timely/examples/pagerank.rs @@ -11,7 +11,7 @@ fn main() { timely::execute_from_args(std::env::args().skip(3), move |worker| { let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); worker.dataflow::(|scope| { @@ -30,8 +30,8 @@ fn main() { |_capability, _info| { // where we stash out-of-order data. - let mut edge_stash = HashMap::new(); - let mut rank_stash = HashMap::new(); + let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new(); + let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new(); // lists of edges, ranks, and changes. let mut edges = Vec::new(); @@ -45,12 +45,12 @@ fn main() { // hold on to edge changes until it is time. input1.for_each(|time, data| { - edge_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..)); + edge_stash.entry(time.retain()).or_default().append(data); }); // hold on to rank changes until it is time. input2.for_each(|time, data| { - rank_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..)); + rank_stash.entry(time.retain()).or_default().append(data); }); let frontiers = &[input1.frontier(), input2.frontier()]; @@ -145,7 +145,7 @@ fn main() { ); changes - .probe_with(&mut probe) + .probe_with(&probe) .connect_loop(handle); }); diff --git a/timely/examples/rc.rs b/timely/examples/rc.rs index c8ed8f9e6..d481675b7 100644 --- a/timely/examples/rc.rs +++ b/timely/examples/rc.rs @@ -13,12 +13,12 @@ fn main() { // create a new input, exchange data, and inspect its output let index = worker.index(); let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); worker.dataflow(|scope| { scope.input_from(&mut input) //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send. .inspect(move |x| println!("worker {}:\thello {:?}", index, x)) - .probe_with(&mut probe); + .probe_with(&probe); }); // introduce data and watch! diff --git a/timely/examples/sequence.rs b/timely/examples/sequence.rs index d3cbe6888..597d2cd12 100644 --- a/timely/examples/sequence.rs +++ b/timely/examples/sequence.rs @@ -14,7 +14,7 @@ fn main() { std::thread::sleep(Duration::from_secs(1 + worker.index() as u64)); sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round)); // } - while let Some(element) = sequencer.next() { + for element in &mut sequencer { println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element); } worker.step(); diff --git a/timely/examples/threadless.rs b/timely/examples/threadless.rs index 24e32d871..4a7c124d5 100644 --- a/timely/examples/threadless.rs +++ b/timely/examples/threadless.rs @@ -10,14 +10,14 @@ fn main() { // create input and probe handles. let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // directly build a dataflow. worker.dataflow(|scope| { input .to_stream(scope) .inspect(|x| println!("{:?}", x)) - .probe_with(&mut probe); + .probe_with(&probe); }); // manage inputs. diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index 04641c174..23068466c 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -20,7 +20,7 @@ fn main() { let peers = worker.peers(); let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); worker.dataflow(|scope| { scope.input_from(&mut input) @@ -28,7 +28,7 @@ fn main() { .union_find() .exchange(|_| 0) .union_find() - .probe_with(&mut probe); + .probe_with(&probe); }); let mut rng: SmallRng = SeedableRng::seed_from_u64(index as u64); diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index bfb0b01ca..de8d3c89b 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -9,7 +9,7 @@ fn main() { timely::execute_from_args(std::env::args(), |worker| { let mut input = InputHandle::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // define a distribution function for strings. let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64); @@ -53,7 +53,7 @@ fn main() { }}) .container::>() .inspect(|x| println!("seen: {:?}", x)) - .probe_with(&mut probe); + .probe_with(&probe); }); // introduce data and watch! diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 42bdf254c..f0de36274 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -67,7 +67,7 @@ where let data = &mut message.data; // if the time isn't right, flush everything. - if self.current.as_ref().map_or(false, |x| x != time) { + if self.current.as_ref().is_some_and(|x| x != time) { for index in 0..self.pushers.len() { self.flush(index); } diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index e9251c24a..49d9e17d0 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -45,7 +45,7 @@ impl Tee { let shared = Rc::new(RefCell::new(Vec::new())); let port = Tee { buffer: Default::default(), - shared: shared.clone(), + shared: Rc::clone(&shared), }; (port, TeeHelper { shared }) @@ -56,7 +56,7 @@ impl Clone for Tee { fn clone(&self) -> Self { Self { buffer: Default::default(), - shared: self.shared.clone(), + shared: Rc::clone(&self.shared), } } } @@ -94,7 +94,7 @@ impl TeeHelper { impl Clone for TeeHelper { fn clone(&self) -> Self { TeeHelper { - shared: self.shared.clone(), + shared: Rc::clone(&self.shared), } } } diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index bc5bedc9a..9ca301ba2 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -118,7 +118,7 @@ impl Capability { /// Returns [`None`] `self.time` is not less or equal to `new_time`. pub fn try_delayed(&self, new_time: &T) -> Option> { if self.time.less_equal(new_time) { - Some(Self::new(new_time.clone(), self.internal.clone())) + Some(Self::new(new_time.clone(), Rc::clone(&self.internal))) } else { None } @@ -171,7 +171,7 @@ impl Drop for Capability { impl Clone for Capability { fn clone(&self) -> Capability { - Self::new(self.time.clone(), self.internal.clone()) + Self::new(self.time.clone(), Rc::clone(&self.internal)) } } @@ -282,7 +282,7 @@ impl InputCapability { pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { use crate::progress::timestamp::PathSummary; if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { - Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone()) + Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port])) } else { panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time()); } @@ -306,7 +306,7 @@ impl InputCapability { use crate::progress::timestamp::PathSummary; let self_time = self.time().clone(); if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { - Capability::new(self_time, self.internal.borrow()[output_port].clone()) + Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port])) } else { panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time); @@ -365,8 +365,8 @@ impl ActivateCapability { pub fn delayed(&self, time: &T) -> Self { ActivateCapability { capability: self.capability.delayed(time), - address: self.address.clone(), - activations: self.activations.clone(), + address: Rc::clone(&self.address), + activations: Rc::clone(&self.activations), } } diff --git a/timely/src/dataflow/operators/core/capture/event.rs b/timely/src/dataflow/operators/core/capture/event.rs index a4d523f40..73bc31888 100644 --- a/timely/src/dataflow/operators/core/capture/event.rs +++ b/timely/src/dataflow/operators/core/capture/event.rs @@ -73,7 +73,7 @@ pub mod link { impl EventPusher for Rc> { fn push(&mut self, event: Event) { *self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) })); - let next = self.next.borrow().as_ref().unwrap().clone(); + let next = Rc::clone(self.next.borrow().as_ref().unwrap()); *self = next; } } @@ -82,7 +82,7 @@ pub mod link { fn next(&mut self) -> Option>> { let is_some = self.next.borrow().is_some(); if is_some { - let next = self.next.borrow().as_ref().unwrap().clone(); + let next = Rc::clone(self.next.borrow().as_ref().unwrap()); *self = next; if let Some(this) = Rc::get_mut(self) { this.event.take().map(Cow::Owned) @@ -121,7 +121,7 @@ pub mod link { #[cfg(not(miri))] let limit = 1_000_000; let mut event1 = Rc::new(EventLink::<(),()>::new()); - let _event2 = event1.clone(); + let _event2 = Rc::clone(&event1); for _ in 0 .. limit { event1.push(Event::Progress(vec![])); } diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 16464a7f6..bc221c4a6 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -20,6 +20,7 @@ //! ``` use std::marker::PhantomData; +use std::rc::Rc; use crate::logging::{TimelyLogger, MessagesEvent}; use crate::progress::Timestamp; @@ -29,7 +30,6 @@ use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::Message; - use crate::worker::AsWorker; use crate::dataflow::{StreamCore, Scope}; use crate::dataflow::scopes::Child; @@ -65,7 +65,7 @@ impl, C: Data+Container> Enter Input for G where ::Timestamp: TotalOrder { fn input_from(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore { let (output, registrar) = Tee::<::Timestamp, CB::Container>::new(); let counter = Counter::new(output); - let produced = counter.produced().clone(); + let produced = Rc::clone(counter.produced()); let index = self.allocate_operator_index(); let address = self.addr_for_child(index); - handle.activate.push(self.activator_for(address.clone())); + handle.activate.push(self.activator_for(Rc::clone(&address))); let progress = Rc::new(RefCell::new(ChangeBatch::new())); - handle.register(counter, progress.clone()); + handle.register(counter, Rc::clone(&progress)); let copies = self.peers(); @@ -207,7 +207,7 @@ impl Operate for Operator { fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); - (Vec::new(), self.shared_progress.clone()) + (Vec::new(), Rc::clone(&self.shared_progress)) } fn notify_me(&self) -> bool { false } diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index c7fdced33..cb3c96b34 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -169,7 +169,7 @@ impl Handle { impl Clone for Handle { fn clone(&self) -> Self { Handle { - frontier: self.frontier.clone() + frontier: Rc::clone(&self.frontier) } } } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index aacd89ca9..2c7b26df6 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -85,15 +85,15 @@ impl UnorderedInput for G { let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); - let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); + let cap = Capability::new(G::Timestamp::minimum(), Rc::clone(&internal)); let counter = Counter::new(output); - let produced = counter.produced().clone(); + let produced = Rc::clone(counter.produced()); let peers = self.peers(); let index = self.allocate_operator_index(); let address = self.addr_for_child(index); - let cap = ActivateCapability::new(cap, address.clone(), self.activations()); + let cap = ActivateCapability::new(cap, Rc::clone(&address), self.activations()); let helper = UnorderedHandle::new(counter); @@ -139,7 +139,7 @@ impl Operate for UnorderedOperator { for (time, count) in borrow.drain() { self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); } - (Vec::new(), self.shared_progress.clone()) + (Vec::new(), Rc::clone(&self.shared_progress)) } fn notify_me(&self) -> bool { false } @@ -161,7 +161,7 @@ impl UnorderedHandle { /// Allocates a new automatically flushing session based on the supplied capability. #[inline] pub fn session_with_builder(&mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { - ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) + ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), Rc::clone(&cap.address), Rc::clone(&cap.activations)) } } diff --git a/timely/src/dataflow/operators/flow_controlled.rs b/timely/src/dataflow/operators/flow_controlled.rs index 0d91ef7f9..c9e30d334 100644 --- a/timely/src/dataflow/operators/flow_controlled.rs +++ b/timely/src/dataflow/operators/flow_controlled.rs @@ -39,37 +39,35 @@ pub struct IteratorSourceInput, I: I /// use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput}; /// use timely::dataflow::operators::{probe, Probe, Inspect}; /// -/// fn main() { -/// timely::execute_from_args(std::env::args(), |worker| { -/// let mut input = (0u64..100000).peekable(); -/// worker.dataflow(|scope| { -/// let mut probe_handle = probe::Handle::new(); -/// let probe_handle_2 = probe_handle.clone(); +/// timely::execute_from_args(std::env::args(), |worker| { +/// let mut input = (0u64..100000).peekable(); +/// worker.dataflow(|scope| { +/// let mut probe_handle = probe::Handle::new(); +/// let probe_handle_2 = probe_handle.clone(); /// -/// let mut next_t: u64 = 0; -/// iterator_source( -/// scope, -/// "Source", -/// move |prev_t| { -/// if let Some(first_x) = input.peek().cloned() { -/// next_t = first_x / 100 * 100; -/// Some(IteratorSourceInput { -/// lower_bound: Default::default(), -/// data: vec![ -/// (next_t, -/// input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::>())], -/// target: *prev_t, -/// }) -/// } else { -/// None -/// } -/// }, -/// probe_handle_2) -/// .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d)) -/// .probe_with(&mut probe_handle); -/// }); -/// }).unwrap(); -/// } +/// let mut next_t: u64 = 0; +/// iterator_source( +/// scope, +/// "Source", +/// move |prev_t| { +/// if let Some(first_x) = input.peek().cloned() { +/// next_t = first_x / 100 * 100; +/// Some(IteratorSourceInput { +/// lower_bound: Default::default(), +/// data: vec![ +/// (next_t, +/// input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::>())], +/// target: *prev_t, +/// }) +/// } else { +/// None +/// } +/// }, +/// probe_handle_2) +/// .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d)) +/// .probe_with(&mut probe_handle); +/// }); +/// }).unwrap(); /// ``` pub fn iterator_source< G: Scope, diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8ba131612..43266683e 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -118,7 +118,7 @@ impl OperatorBuilder { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); - let (sender, receiver) = pact.connect(&mut self.scope, channel_id, self.address.clone(), logging); + let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging); let target = Target::new(self.index, self.shape.inputs); stream.connect_to(target, sender, channel_id); @@ -174,7 +174,7 @@ impl OperatorBuilder { /// Information describing the operator. pub fn operator_info(&self) -> OperatorInfo { - OperatorInfo::new(self.index, self.global, self.address.clone()) + OperatorInfo::new(self.index, self.global, Rc::clone(&self.address)) } } @@ -225,7 +225,7 @@ where .iter_mut() .for_each(|output| output.update(T::minimum(), self.shape.peers as i64)); - (self.summary.clone(), self.shared_progress.clone()) + (self.summary.clone(), Rc::clone(&self.shared_progress)) } // initialize self.frontier antichains as indicated by hosting scope. diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 752b3be2e..1df5dc0cc 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -84,12 +84,12 @@ impl OperatorBuilder { let input = PullCounter::new(puller); self.frontier.push(MutableAntichain::new()); - self.consumed.push(input.consumed().clone()); + self.consumed.push(Rc::clone(input.consumed())); let shared_summary = Rc::new(RefCell::new(connection)); - self.summaries.push(shared_summary.clone()); + self.summaries.push(Rc::clone(&shared_summary)); - new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone()) + new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone()) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. @@ -117,10 +117,10 @@ impl OperatorBuilder { let (tee, stream) = self.builder.new_output_connection(connection.clone()); let internal = Rc::new(RefCell::new(ChangeBatch::new())); - self.internal.borrow_mut().push(internal.clone()); + self.internal.borrow_mut().push(Rc::clone(&internal)); let mut buffer = PushBuffer::new(PushCounter::new(tee)); - self.produced.push(buffer.inner().produced().clone()); + self.produced.push(Rc::clone(buffer.inner().produced())); for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { summary.borrow_mut().push(connection.clone()); @@ -155,7 +155,7 @@ impl OperatorBuilder { // create capabilities, discard references to their creation. let mut capabilities = Vec::with_capacity(self.internal.borrow().len()); for batch in self.internal.borrow().iter() { - capabilities.push(Capability::new(G::Timestamp::minimum(), batch.clone())); + capabilities.push(Capability::new(G::Timestamp::minimum(), Rc::clone(batch))); // Discard evidence of creation, as we are assumed to start with one. batch.borrow_mut().clear(); } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 47811a659..b8b220e39 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -58,7 +58,7 @@ impl>> InputHandleCore { /// use timely::dataflow::operators::generic::Operator; /// use timely::dataflow::channels::pact::Pipeline; /// - /// fn main() { - /// timely::example(|scope| { - /// (0u64..10).to_stream(scope) - /// .unary_frontier(Pipeline, "example", |default_cap, _info| { - /// let mut cap = Some(default_cap.delayed(&12)); - /// let mut notificator = FrontierNotificator::default(); - /// let mut stash = HashMap::new(); - /// move |input, output| { - /// if let Some(ref c) = cap.take() { - /// output.session(&c).give(12); - /// } - /// while let Some((time, data)) = input.next() { - /// stash.entry(time.time().clone()) - /// .or_insert(Vec::new()) - /// .extend(data.drain(..)); - /// } - /// notificator.for_each(&[input.frontier()], |time, _not| { - /// if let Some(mut vec) = stash.remove(time.time()) { - /// output.session(&time).give_iterator(vec.drain(..)); - /// } - /// }); + /// timely::example(|scope| { + /// (0u64..10).to_stream(scope) + /// .unary_frontier(Pipeline, "example", |default_cap, _info| { + /// let mut cap = Some(default_cap.delayed(&12)); + /// let mut notificator = FrontierNotificator::default(); + /// let mut stash = HashMap::new(); + /// move |input, output| { + /// if let Some(ref c) = cap.take() { + /// output.session(&c).give(12); + /// } + /// while let Some((time, data)) = input.next() { + /// stash.entry(time.time().clone()) + /// .or_insert(Vec::new()) + /// .extend(data.drain(..)); /// } - /// }) - /// .container::>(); - /// }); - /// } + /// notificator.for_each(&[input.frontier()], |time, _not| { + /// if let Some(mut vec) = stash.remove(time.time()) { + /// output.session(&time).give_iterator(vec.drain(..)); + /// } + /// }); + /// } + /// }) + /// .container::>(); + /// }); /// ``` fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where @@ -74,21 +72,19 @@ pub trait Operator { /// use timely::dataflow::operators::generic::Operator; /// use timely::dataflow::channels::pact::Pipeline; /// - /// fn main() { - /// timely::example(|scope| { - /// (0u64..10) - /// .to_stream(scope) - /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| { - /// input.for_each(|time, data| { - /// output.session(&time).give_container(data); - /// notificator.notify_at(time.retain()); - /// }); - /// notificator.for_each(|time, _cnt, _not| { - /// println!("notified at {:?}", time); - /// }); + /// timely::example(|scope| { + /// (0u64..10) + /// .to_stream(scope) + /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| { + /// input.for_each(|time, data| { + /// output.session(&time).give_container(data); + /// notificator.notify_at(time.retain()); + /// }); + /// notificator.for_each(|time, _cnt, _not| { + /// println!("notified at {:?}", time); /// }); - /// }); - /// } + /// }); + /// }); /// ``` fn unary_notify, diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index d071cac45..272ff3c6a 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -31,7 +31,7 @@ pub struct StreamCore { impl Clone for StreamCore { fn clone(&self) -> Self { Self { - name: self.name.clone(), + name: self.name, scope: self.scope.clone(), ports: self.ports.clone(), } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 36de954eb..e3082087e 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -188,7 +188,7 @@ where let progress_logging = worker.log_register().get::>(&format!("timely/progress/{type_name}")); let (tracker, scope_summary) = builder.build(reachability_logging); - let progcaster = Progcaster::new(worker, self.path.clone(), self.identifier, self.logging.clone(), progress_logging); + let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging); let mut incomplete = vec![true; self.children.len()]; incomplete[0] = false; @@ -584,7 +584,7 @@ where self.propagate_pointstamps(); // Propagate expressed capabilities to output frontiers. // Return summaries and shared progress information. - (internal_summary, self.shared_progress.clone()) + (internal_summary, Rc::clone(&self.shared_progress)) } fn set_external_summary(&mut self) { @@ -744,7 +744,7 @@ impl PerOperatorState { } /// Extracts shared progress information and converts to pointstamp changes. - fn extract_progress(&mut self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap>) { + fn extract_progress(&self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap>) { let shared_progress = &mut *self.shared_progress.borrow_mut(); @@ -776,7 +776,7 @@ impl PerOperatorState { /// The validity of shared progress information depends on both the external frontiers and the /// internal capabilities, as events can occur that cannot be explained locally otherwise. #[allow(dead_code)] - fn validate_progress(&mut self, child_state: &reachability::PerOperator) { + fn validate_progress(&self, child_state: &reachability::PerOperator) { let shared_progress = &mut *self.shared_progress.borrow_mut(); diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 37c61c3f9..6cfc140c1 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -106,8 +106,8 @@ impl Sequencer { // only initialize the activator once we obtain the operator // address. let activator = Rc::new(RefCell::new(None)); - let activator_source = activator.clone(); - let activator_sink = activator.clone(); + let activator_source = Rc::clone(&activator); + let activator_sink = Rc::clone(&activator); // build a dataflow used to serialize and circulate commands worker.dataflow::(move |dataflow| { diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 416cc544c..155115f38 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -262,7 +262,7 @@ impl AsWorker for Worker { impl Scheduler for Worker { fn activations(&self) -> Rc> { - self.activations.clone() + Rc::clone(&self.activations) } } @@ -342,7 +342,7 @@ impl Worker { { // Process channel events. Activate responders. let mut allocator = self.allocator.borrow_mut(); allocator.receive(); - let events = allocator.events().clone(); + let events = allocator.events(); let mut borrow = events.borrow_mut(); let paths = self.paths.borrow(); borrow.sort_unstable(); @@ -719,7 +719,7 @@ impl Worker { } // Acquire a new distinct dataflow identifier. - fn allocate_dataflow_index(&mut self) -> usize { + fn allocate_dataflow_index(&self) -> usize { *self.dataflow_counter.borrow_mut() += 1; *self.dataflow_counter.borrow() - 1 } @@ -730,15 +730,15 @@ impl Clone for Worker { Worker { config: self.config.clone(), timer: self.timer, - paths: self.paths.clone(), - allocator: self.allocator.clone(), - identifiers: self.identifiers.clone(), - dataflows: self.dataflows.clone(), - dataflow_counter: self.dataflow_counter.clone(), - logging: self.logging.clone(), - activations: self.activations.clone(), + paths: Rc::clone(&self.paths), + allocator: Rc::clone(&self.allocator), + identifiers: Rc::clone(&self.identifiers), + dataflows: Rc::clone(&self.dataflows), + dataflow_counter: Rc::clone(&self.dataflow_counter), + logging: Rc::clone(&self.logging), + activations: Rc::clone(&self.activations), active_dataflows: Vec::new(), - temp_channel_ids: self.temp_channel_ids.clone(), + temp_channel_ids: Rc::clone(&self.temp_channel_ids), } } }