Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub mod arc {
sequestered: self.sequestered.clone(),
};

unsafe { self.ptr = self.ptr.offset(index as isize); }
unsafe { self.ptr = self.ptr.add(index); }
self.len -= index;

result
Expand Down Expand Up @@ -161,7 +161,7 @@ pub mod arc {
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
/// ```
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.offset(self.len as isize) }, other.ptr) {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) {
self.len += other.len;
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl AllocateBuilder for ProcessBuilder {

// Initialize buzzers; send first, then recv.
for worker in self.buzzers_send.iter() {
let buzzer = Buzzer::new();
let buzzer = Buzzer::default();
worker.send(buzzer).expect("Failed to send buzzer");
}
let mut buzzers = Vec::with_capacity(self.buzzers_recv.len());
Expand Down Expand Up @@ -88,8 +88,8 @@ impl Process {

counters_recv
.into_iter()
.zip(buzzers_send.into_iter())
.zip(buzzers_recv.into_iter())
.zip(buzzers_send)
.zip(buzzers_recv)
.enumerate()
.map(|(index, ((recv, bsend), brecv))| {
ProcessBuilder {
Expand Down
10 changes: 2 additions & 8 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ pub struct ThreadBuilder;

impl AllocateBuilder for ThreadBuilder {
type Allocator = Thread;
fn build(self) -> Self::Allocator { Thread::new() }
fn build(self) -> Self::Allocator { Thread::default() }
}


/// An allocator for intra-thread communication.
#[derive(Default)]
pub struct Thread {
/// Shared counts of messages in channels.
events: Rc<RefCell<Vec<usize>>>,
Expand Down Expand Up @@ -53,13 +54,6 @@ pub type ThreadPusher<T> = CountPusher<T, Pusher<T>>;
pub type ThreadPuller<T> = CountPuller<T, Puller<T>>;

impl Thread {
/// Allocates a new thread-local channel allocator.
pub fn new() -> Self {
Thread {
events: Rc::new(RefCell::new(Default::default())),
}
}

/// Creates a new thread-local channel from an identifier and shared counts.
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<T>, ThreadPuller<T>)
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<A: AllocateBuilder> TcpBuilder<A> {
// Fulfill puller obligations.
let mut recvs = Vec::with_capacity(self.peers);
for promise in self.promises.into_iter() {
let buzzer = crate::buzzer::Buzzer::new();
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
promise.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ProcessBuilder {
// Fulfill puller obligations.
let mut recvs = Vec::with_capacity(self.peers);
for puller in self.pullers.into_iter() {
let buzzer = crate::buzzer::Buzzer::new();
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
puller.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
Expand Down
6 changes: 2 additions & 4 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
// Sockets are expected to be blocking,
for socket in sockets.iter_mut() {
if let Some(socket) = socket {
socket.set_nonblocking(false).expect("failed to set socket to blocking");
}
for socket in sockets.iter_mut().flatten() {
socket.set_nonblocking(false).expect("failed to set socket to blocking");
}

let processes = sockets.len();
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//!
//! Methods related to reading from and writing to TCP connections

use std::io::{self, Write};
use crossbeam_channel::{Sender, Receiver};
Expand Down Expand Up @@ -67,9 +67,9 @@ where
assert!(!buffer.empty().is_empty());

// Attempt to read some more bytes into self.buffer.
let read = match reader.read(&mut buffer.empty()) {
let read = match reader.read(buffer.empty()) {
Err(x) => tcp_panic("reading data", x),
Ok(n) if n == 0 => {
Ok(0) => {
tcp_panic(
"reading data",
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
Expand Down Expand Up @@ -102,7 +102,7 @@ where
panic!("Clean shutdown followed by data.");
}
buffer.ensure_capacity(1);
if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
panic!("Clean shutdown followed by data.");
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ pub fn send_loop<S: Stream>(
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));

let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
let buzzer = crate::buzzer::Buzzer::new();
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
x.send(queue.clone()).expect("failed to send MergeQueue");
queue
Expand Down
10 changes: 4 additions & 6 deletions communication/src/buzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ pub struct Buzzer {
thread: Thread,
}

impl Default for Buzzer {
fn default() -> Self { Self { thread: std::thread::current() } }
}

impl Buzzer {
/// Creates a new buzzer for the current thread.
pub fn new() -> Self {
Self {
thread: std::thread::current()
}
}
/// Unparks the target thread.
pub fn buzz(&self) {
self.thread.unpark()
Expand Down
6 changes: 3 additions & 3 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ impl Config {
Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
},
Config::Process(threads) => {
Ok((Process::new_vector(threads).into_iter().map(|x| GenericBuilder::Process(x)).collect(), Box::new(())))
Ok((Process::new_vector(threads).into_iter().map(GenericBuilder::Process).collect(), Box::new(())))
},
Config::ProcessBinary(threads) => {
Ok((ProcessBuilder::new_vector(threads).into_iter().map(|x| GenericBuilder::ProcessBinary(x)).collect(), Box::new(())))
Ok((ProcessBuilder::new_vector(threads).into_iter().map(GenericBuilder::ProcessBinary).collect(), Box::new(())))
},
Config::Cluster { threads, process, addresses, report, log_fn } => {
match initialize_networking(addresses, process, threads, report, log_fn) {
Ok((stuff, guard)) => {
Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard)))
Ok((stuff.into_iter().map(GenericBuilder::ZeroCopy).collect(), Box::new(guard)))
},
Err(err) => Err(format!("failed to initialize networking: {}", err))
}
Expand Down
8 changes: 4 additions & 4 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<V
let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();

for sender in 0 .. sends {
for recver in 0 .. recvs {
for sender in senders.iter_mut() {
for recver in recvers.iter_mut() {
let (send, recv) = crossbeam_channel::unbounded();
senders[sender].push(send);
recvers[recver].push(recv);
sender.push(send);
recver.push(recv);
}
}

Expand Down
2 changes: 1 addition & 1 deletion communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> R
let mut results = start_task.join().unwrap()?;
results.push(None);
let to_extend = await_task.join().unwrap()?;
results.extend(to_extend.into_iter());
results.extend(to_extend);

if noisy { println!("worker {}:\tinitialization complete", my_index) }

Expand Down
4 changes: 2 additions & 2 deletions container/src/columnation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ impl<T: Columnation> Default for TimelyStack<T> {

impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
let mut iter = iter.into_iter();
let iter = iter.into_iter();
let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
while let Some(element) = iter.next() {
for element in iter {
c.copy(element);
}

Expand Down
4 changes: 2 additions & 2 deletions container/src/flatcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ impl<R: Region + Clone + 'static> Container for FlatStack<R> {

type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;

fn iter<'a>(&'a self) -> Self::Iter<'a> {
fn iter(&self) -> Self::Iter<'_> {
IntoIterator::into_iter(self)
}

type DrainIter<'a> = Self::Iter<'a>;

fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
fn drain(&mut self) -> Self::DrainIter<'_> {
IntoIterator::into_iter(&*self)
}
}
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ fn main() {

in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

let mut notificator = FrontierNotificator::new();
let mut notificator = FrontierNotificator::default();
let mut stash = HashMap::new();

move |input1, input2, output| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/threadless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use timely::WorkerConfig;
fn main() {

// create a naked single-threaded worker.
let allocator = timely::communication::allocator::Thread::new();
let allocator = timely::communication::allocator::Thread::default();
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);

// create input and probe handles.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct ConsumedGuard<T: Ord + Clone + 'static> {

impl<T:Ord+Clone+'static> ConsumedGuard<T> {
pub(crate) fn time(&self) -> &T {
&self.time.as_ref().unwrap()
self.time.as_ref().unwrap()
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
let hash_func = &mut self.hash_func;

// if the number of pushers is a power of two, use a mask
if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
if self.pushers.len().is_power_of_two() {
let mask = (self.pushers.len() - 1) as u64;
let pushers = &mut self.pushers;
data.push_partitioned(
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/aggregation/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for
hash: H) -> Stream<S, R> where S::Timestamp: Eq {

let mut aggregates = HashMap::new();
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {

// read each input, fold into aggregates
input.for_each(|time, data| {
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/aggregation/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;

/// Provides the `state_machine` method.
///
/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
/// Events are applied in time-order, but no other promises are made. Each state transition can
/// produce output, which is sent.
Expand All @@ -15,8 +17,6 @@ use crate::dataflow::channels::pact::Exchange;
/// updates for the current time reflected in the notificator, though. In the case of partially
/// ordered times, the only guarantee is that updates are not applied out of order, not that there
/// is some total order on times respecting the total order (updates may be interleaved).

/// Provides the `state_machine` method.
pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
/// Tracks a state for each presented key, using user-supplied state transition logic.
///
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
let mut states = HashMap::new(); // keys -> state

self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {

// go through each time with data, process each (key, val) pair.
notificator.for_each(|time,_,_| {
Expand All @@ -88,7 +88,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f

// stash if not time yet
if notificator.frontier(0).less_than(time.time()) {
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data);
notificator.notify_at(time.retain());
}
else {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
let mut out1 = output1_handle.session(&time);
let mut out2 = output2_handle.session(&time);
for datum in data.drain(..) {
if condition(&time.time(), &datum) {
if condition(time.time(), &datum) {
out2.give(datum);
} else {
out1.give(datum);
Expand Down Expand Up @@ -107,7 +107,7 @@ impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
let mut output2_handle = output2.activate();

input.for_each(|time, data| {
let mut out = if condition(&time.time()) {
let mut out = if condition(time.time()) {
output2_handle.session(&time)
} else {
output1_handle.session(&time)
Expand Down
11 changes: 6 additions & 5 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ pub trait CapabilityTrait<T: Timestamp> {
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
}

impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a C {
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
}
}
impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a mut C {
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
Expand Down Expand Up @@ -227,9 +227,10 @@ impl Error for DowngradeError {}
/// A shared list of shared output capability buffers.
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;

/// An capability of an input port. Holding onto this capability will implicitly holds onto a
/// capability for all the outputs ports this input is connected to, after the connection summaries
/// have been applied.
/// An capability of an input port.
///
/// Holding onto this capability will implicitly holds onto a capability for all the outputs
/// ports this input is connected to, after the connection summaries have been applied.
///
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
/// capability and turns it into a [Capability] for a specific output port.
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub trait Leave<G: Scope, C: Container> {
fn leave(&self) -> StreamCore<G, C>;
}

impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'a, G, T>, C> {
impl<G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C> {
fn leave(&self) -> StreamCore<G, C> {

let scope = self.scope();
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/core/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ pub trait Exchange<C: PushPartitioned> {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn exchange<F: 'static>(&self, route: F) -> Self
fn exchange<F>(&self, route: F) -> Self
where
for<'a> F: FnMut(&C::Item<'a>) -> u64;
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
}

impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
where
C: PushPartitioned + ExchangeData,
{
fn exchange<F: 'static>(&self, route: F) -> StreamCore<G, C>
fn exchange<F>(&self, route: F) -> StreamCore<G, C>
where
for<'a> F: FnMut(&C::Item<'a>) -> u64,
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
{
self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
move |input, output| {
Expand Down
Loading