Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/miri.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: nightly
- name: Install miri
run: rustup component add miri
components: miri
- name: Cargo test
run: cargo miri test
15 changes: 15 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,24 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ matrix.toolchain }}
components: clippy
- name: Cargo test
run: cargo test --workspace --all-targets

# Check for clippy warnings
clippy:
name: Cargo clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: clippy
- name: Cargo clippy
run: cargo clippy --workspace --all-targets
env:
RUSTFLAGS: "" # Don't make test fail on clippy

# Check mdbook files for errors
mdbook:
name: test mdBook
Expand Down
64 changes: 64 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,70 @@ edition = "2021"
[workspace.dependencies]
columnar = "0.3"

[workspace.lints.clippy]
type_complexity = "allow"
option_map_unit_fn = "allow"
wrong_self_convention = "allow"
should_implement_trait = "allow"
module_inception = "allow"

#as_conversions = "warn"
bool_comparison = "warn"
borrow_interior_mutable_const = "warn"
borrowed_box = "warn"
builtin_type_shadow = "warn"
clone_on_ref_ptr = "warn"
crosspointer_transmute = "warn"
dbg_macro = "warn"
deref_addrof = "warn"
disallowed_macros = "warn"
disallowed_methods = "warn"
disallowed_types = "warn"
double_must_use = "warn"
double_neg = "warn"
double_parens = "warn"
duplicate_underscore_argument = "warn"
excessive_precision = "warn"
extra_unused_lifetimes = "warn"
from_over_into = "warn"
match_overlapping_arm = "warn"
must_use_unit = "warn"
mut_mutex_lock = "warn"
needless_borrow = "warn"
needless_pass_by_ref_mut = "warn"
needless_question_mark = "warn"
needless_return = "warn"
no_effect = "warn"
panicking_overflow_checks = "warn"
partialeq_ne_impl = "warn"
print_literal = "warn"
redundant_closure = "warn"
redundant_closure_call = "warn"
redundant_field_names = "warn"
redundant_pattern = "warn"
redundant_slicing = "warn"
redundant_static_lifetimes = "warn"
same_item_push = "warn"
shadow_unrelated = "warn"
single_component_path_imports = "warn"
suspicious_assignment_formatting = "warn"
suspicious_else_formatting = "warn"
suspicious_unary_op_formatting = "warn"
todo = "warn"
transmutes_expressible_as_ptr_casts = "warn"
unnecessary_cast = "warn"
unnecessary_lazy_evaluations = "warn"
unnecessary_mut_passed = "warn"
unnecessary_unwrap = "warn"
unused_async = "warn"
useless_asref = "warn"
useless_conversion = "warn"
useless_format = "warn"
wildcard_in_or_patterns = "warn"
write_literal = "warn"
zero_divided_by_zero = "warn"
zero_prefixed_literal = "warn"

[profile.release]
opt-level = 3
debug = true
Expand Down
3 changes: 3 additions & 0 deletions bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ homepage = "https://github.com/TimelyDataflow/timely-dataflow"
repository = "https://github.com/TimelyDataflow/timely-dataflow.git"
keywords = ["timely", "dataflow", "bytes"]
license = "MIT"

[lints]
workspace = true
4 changes: 2 additions & 2 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub mod arc {
let result = BytesMut {
ptr: self.ptr,
len: index,
sequestered: self.sequestered.clone(),
sequestered: Arc::clone(&self.sequestered),
};

self.ptr = self.ptr.wrapping_add(index);
Expand Down Expand Up @@ -204,7 +204,7 @@ pub mod arc {
let result = Bytes {
ptr: self.ptr,
len: index,
sequestered: self.sequestered.clone(),
sequestered: Arc::clone(&self.sequestered),
};

self.ptr = self.ptr.wrapping_add(index);
Expand Down
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ignore-interior-mutability = ["timely::dataflow::operators::capability::Capability"]
3 changes: 3 additions & 0 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ repository = "https://github.com/TimelyDataflow/timely-dataflow.git"
keywords = ["timely", "dataflow"]
license = "MIT"

[lints]
workspace = true

[features]
default = ["getopts"]

Expand Down
6 changes: 3 additions & 3 deletions communication/examples/comm_hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ fn main() {
let (mut senders, mut receiver) = allocator.allocate(0);

// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(Message { payload: format!("hello, {}", i)});
senders[i].done();
for (i, sender) in senders.iter_mut().enumerate() {
sender.send(Message { payload: format!("hello, {}", i)});
sender.done();
}

// no support for termination notification,
Expand Down
180 changes: 97 additions & 83 deletions communication/examples/lgalloc.rs
Original file line number Diff line number Diff line change
@@ -1,112 +1,126 @@
#![cfg(any(target_os = "linux", target_os = "macos"))]
//! Test using lgalloc as the allocator for zero-copy data transfer.
//! Note that this example only works on Linux and MacOS.

#[cfg(any(target_os = "linux", target_os = "macos"))]
mod example {
use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;
use timely_communication::{Allocate, Bytesable};
use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill;

/// A wrapper that indicates the serialization/deserialization strategy.
pub struct Message {
/// Text contents.
pub payload: String,
}

use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;
use timely_communication::{Allocate, Bytesable};
use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill;
impl Bytesable for Message {
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
}

/// A wrapper that indicates the serialization/deserialization strategy.
pub struct Message {
/// Text contents.
pub payload: String,
}
fn length_in_bytes(&self) -> usize {
self.payload.len()
}

impl Bytesable for Message {
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
writer.write_all(self.payload.as_bytes()).unwrap();
}
}

fn length_in_bytes(&self) -> usize {
self.payload.len()
fn lgalloc_refill(size: usize) -> Box<LgallocHandle> {
let (pointer, capacity, handle) = lgalloc::allocate::<u8>(size).unwrap();
let handle = Some(handle);
Box::new(LgallocHandle { handle, pointer, capacity })
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
writer.write_all(self.payload.as_bytes()).unwrap();
struct LgallocHandle {
handle: Option<lgalloc::Handle>,
pointer: NonNull<u8>,
capacity: usize,
}
}

fn lgalloc_refill(size: usize) -> Box<LgallocHandle> {
let (pointer, capacity, handle) = lgalloc::allocate::<u8>(size).unwrap();
let handle = Some(handle);
Box::new(LgallocHandle { handle, pointer, capacity })
}

struct LgallocHandle {
handle: Option<lgalloc::Handle>,
pointer: NonNull<u8>,
capacity: usize,
}

impl Deref for LgallocHandle {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
impl Deref for LgallocHandle {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
}
}
}

impl DerefMut for LgallocHandle {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
impl DerefMut for LgallocHandle {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
}
}
}

impl Drop for LgallocHandle {
fn drop(&mut self) {
lgalloc::deallocate(self.handle.take().unwrap());
impl Drop for LgallocHandle {
fn drop(&mut self) {
lgalloc::deallocate(self.handle.take().unwrap());
}
}
}

fn main() {
let mut config = lgalloc::LgAlloc::new();
config.enable().with_path(std::env::temp_dir());
lgalloc::lgalloc_set_config(&config);
pub(crate) fn main() {
let mut lgconfig = lgalloc::LgAlloc::new();
lgconfig.enable().with_path(std::env::temp_dir());
lgalloc::lgalloc_set_config(&lgconfig);

let refill = BytesRefill {
logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>),
limit: None,
};
let refill = BytesRefill {
logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>),
limit: None,
};

// extract the configuration from user-supplied arguments, initialize the computation.
let config = timely_communication::Config::ProcessBinary(4);
let (allocators, others) = config.try_build_with(refill).unwrap();
let guards = timely_communication::initialize_from(allocators, others, |mut allocator| {
// extract the configuration from user-supplied arguments, initialize the computation.
let config = timely_communication::Config::ProcessBinary(4);
let (allocators, others) = config.try_build_with(refill).unwrap();
let guards = timely_communication::initialize_from(allocators, others, |mut allocator| {

println!("worker {} of {} started", allocator.index(), allocator.peers());
println!("worker {} of {} started", allocator.index(), allocator.peers());

// allocates a pair of senders list and one receiver.
let (mut senders, mut receiver) = allocator.allocate(0);
// allocates a pair of senders list and one receiver.
let (mut senders, mut receiver) = allocator.allocate(0);

// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(Message { payload: format!("hello, {}", i)});
senders[i].done();
}
// send typed data along each channel
for (i, sender) in senders.iter_mut().enumerate() {
sender.send(Message { payload: format!("hello, {}", i)});
sender.done();
}

// no support for termination notification,
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {
// no support for termination notification,
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {

allocator.receive();
allocator.receive();

if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.payload);
received += 1;
}
if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.payload);
received += 1;
}

allocator.release();
}
allocator.release();
}

allocator.index()
});
allocator.index()
});

// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
}
}
else { println!("error in computation"); }
}
else { println!("error in computation"); }
}

// Disable entirely on non-Linux and non-MacOS platforms.
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
mod example {
pub fn main() { }
}

pub fn main() {
example::main();
}
2 changes: 1 addition & 1 deletion communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait Allocate {
(thread::ThreadPusher<T>,
thread::ThreadPuller<T>)
{
thread::Thread::new_from(identifier, self.events().clone())
thread::Thread::new_from(identifier, Rc::clone(self.events()))
}

/// Allocates a broadcast channel, where each pushed message is received by all.
Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl PeerBuilder for Process {
peers,
buzzers_send: bsend,
buzzers_recv: brecv,
channels: channels.clone(),
channels: Arc::clone(&channels),
counters_send: counters_send.clone(),
counters_recv: recv,
}
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Allocate for Process {
.map(|s| Box::new(s) as Box<dyn Push<T>>)
.collect::<Vec<_>>();

let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box<dyn Pull<T>>;
let recv = Box::new(CountPuller::new(recv, identifier, Rc::clone(self.inner.events()))) as Box<dyn Pull<T>>;

(sends, recv)
}
Expand Down
Loading
Loading