Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
3ddd46d
Update rand to 0.8 (#436)
antiguru Dec 15, 2021
1f09f66
Update Rust to 1.56.1 for tests (#442)
antiguru Dec 16, 2021
9b09c6a
remove redundant `Error` impl (#443)
tisonkun Jan 12, 2022
84ff8bf
Container stream (without Allocation) (#426)
antiguru Jan 14, 2022
03ea035
correct columnation reference
frankmcsherry Jan 14, 2022
2cc6bea
Exchange operator can take FnMut (#445)
davidselassie Jan 21, 2022
3cf1540
Implement TotalOrder for totally ordered things (#449)
frankmcsherry Feb 4, 2022
a3da86f
RefOrMut::take (#448)
antiguru Feb 9, 2022
731b06e
Correct references in doc comments (#447)
antiguru Feb 9, 2022
ae3dd8c
enterleave should not depend on command line args (#450)
antiguru Feb 15, 2022
402ce63
Derive (De)serialize for EventCore (#451)
antiguru Feb 17, 2022
4db679a
Add From impls for MutableAntichain
frankmcsherry Feb 20, 2022
40abb6f
Fix typos
Mar 7, 2022
b578b67
implement Hash for Antichain<T> when T is Ord+Hash (#454)
danhhz Mar 7, 2022
dd6187c
Add a tuple timestamp (#455)
frankmcsherry Mar 12, 2022
7e313ee
exchange: remove one layer of boxing (#456)
petrosagg Mar 17, 2022
eb5dc63
Add option conversions for totally ordered antichains (#458)
frankmcsherry Mar 18, 2022
6f50573
Antichain `FromIterator` implementations (#459)
frankmcsherry Mar 18, 2022
c4145f0
Avoid spinning when workers have no dataflows (#463)
frankmcsherry Mar 31, 2022
7ad8962
fix a few broken links in docs (#465)
bddap Apr 27, 2022
be94fd8
Open the Inspect trait for more stream kinds (#472)
antiguru Jul 15, 2022
ee65cdc
Turn a stream of data into a stream of shared data (#471)
antiguru Jul 15, 2022
1f3a315
Update versions for Github CI (#478)
antiguru Aug 11, 2022
330f15f
Derive Clone for activators (#481)
antiguru Aug 29, 2022
0fce4ab
Container-invariant BranchWhen operator (#477)
antiguru Aug 29, 2022
8442e15
Container-invariant Reclock operator (#474)
antiguru Aug 29, 2022
204b444
Container-invariant Exchange (#476)
antiguru Aug 29, 2022
13a1415
Apply columnation changes on TimelyStack
antiguru Sep 6, 2022
9548bf9
updated consumed counts after capabilityrefs are dropped (#429)
petrosagg Sep 7, 2022
157b80d
Panic with consistent message on TCP failure (#486)
benesch Nov 4, 2022
faf5eb6
Make TCP communication fabric generic over network protocol (#489)
benesch Dec 4, 2022
1cf7b4a
Activate operators that may want to shut down (#488)
frankmcsherry Jan 9, 2023
01438ae
update rustc in CI to 1.64 (#495)
petrosagg Jan 15, 2023
51212fe
Add cease to output handles (#496)
antiguru Jan 24, 2023
6a73600
timely: unconstrained lifetime for `CapabilityRef` (#491)
petrosagg Jan 29, 2023
0c52513
Validate timestamp summary before forming capability (#497)
frankmcsherry Feb 10, 2023
592914b
Update mdbook to 0.4.26 (#500)
antiguru Feb 10, 2023
5b808ff
Back Mutableantichain by ChangeBatch (#505)
frankmcsherry Feb 21, 2023
f2ea960
Fix typo (#506)
nooberfsh Feb 23, 2023
9a355e6
Update mdbook to 0.4.27
antiguru Mar 3, 2023
8af9467
Inline give for Buffer (#511)
antiguru Mar 7, 2023
036b4da
container: implement Container::clear for reference counted containers
petrosagg Nov 6, 2022
134842a
Log messages over enter/leave channels (#507)
teskje Mar 28, 2023
432ef57
Implement TimelyStack heap_size and sum functions
antiguru Apr 13, 2023
5f313d2
Correct mismatched references to `maximum` from `max`.
arusahni May 26, 2023
18d353d
give_container: Only push if message non-empty
antiguru May 30, 2023
93025bb
input handles: give empty containers to operators
petrosagg May 30, 2023
b4bce96
order: add missing Refines implementation for tuples (#527)
petrosagg Jun 9, 2023
b990fab
Drop implementation for Tracker (#517)
antiguru Jun 10, 2023
8fc707d
Remove an unused dependency on clap (#534)
antiguru Oct 17, 2023
c27ed84
Implement Columnation for Product (#535)
antiguru Nov 1, 2023
98af39f
Insert an element by reference into an antichain (#536)
antiguru Nov 25, 2023
64be92b
Manual Antichain::default to avoid bounds (#537)
antiguru Nov 26, 2023
86b3d69
Probe only retains weak handle to Rc (#543)
antiguru Feb 5, 2024
f2245d0
Tidy warnings (#542)
frankmcsherry Feb 5, 2024
6df895d
Remove experiments (#544)
antiguru Feb 5, 2024
08b2087
Remove Kafkaesque (#545)
frankmcsherry Feb 5, 2024
b869dcb
Activate only by channel ID (#526)
antiguru Feb 5, 2024
ba4336b
Revert "async: relax requirements of supplied streams (#401)" (#546)
antiguru Feb 5, 2024
7012b6f
Add support for release-plz
antiguru Mar 13, 2024
2667399
dnm: run release-plz on test
antiguru Mar 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
# Check for updates every Monday
schedule:
interval: "weekly"

8 changes: 4 additions & 4 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ on:

jobs:
deploy:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- run: cargo install mdbook --version 0.4.6
- uses: actions/checkout@v3
- run: cargo install mdbook --version 0.4.27
- run: cd mdbook && mdbook build
- uses: JamesIves/github-pages-deploy-action@4.0.0
- uses: JamesIves/github-pages-deploy-action@v4
with:
branch: gh-pages
folder: mdbook/book
28 changes: 28 additions & 0 deletions .github/workflows/release-plz.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Release-plz

permissions:
pull-requests: write
contents: write

on:
push:
branches:
- master
pull_request:

jobs:
release-plz:
name: Release-plz
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Run release-plz
uses: MarcoIeni/[email protected]
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- run: rustup update 1.50.0 --no-self-update && rustup default 1.50.0
- uses: actions/checkout@v3
- run: rustup update 1.64 --no-self-update && rustup default 1.64
- run: cargo build
- name: test mdBook
# rustdoc doesn't build dependencies, so it needs to run after `cargo build`,
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Beneficial fallouts include our ability to remove `RootTimestamp`, as dataflows
- The `Root` type has been renamed `Worker` and is found in the `::worker` module. The methods of the `ScopeParent` trait are now in the `::worker::AsWorker` trait.
- The communication `Allocate` trait's main method `allocate` now takes a worker-unique identifier to use for the channel. The allocator may or may not use the information (most often for logging), but they are allowed to be incorrect if one allocates two channels with the same identifier.
- A `CapabilityRef<T>` now supports `retain_for(usize)` which indicates a specific output port the capability should be retain for use with. The `retain()` method still exists for now and is equivalent to `retain(0)`. This change also comes with the *inability* to use an arbitrary `Capability<T>` with any output; using a capability bound to the wrong output will result in a run-time error.
- The `unary` and `binary` operators now provide `data` as a `RefOrMut`, which does not implement `DerefMut`. More information on how to port methods can be found [here](https://github.com/frankmcsherry/timely-dataflow/pull/135#issuecomment-418355284).
- The `unary` and `binary` operators now provide `data` as a `RefOrMut`, which does not implement `DerefMut`. More information on how to port methods can be found [here](https://github.com/TimelyDataflow/timely-dataflow/pull/135#issuecomment-418355284).


### Removed
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
members = [
"bytes",
"communication",
"kafkaesque",
"container",
"logging",
"timely",
"experiments"
]

[profile.release]
Expand Down
22 changes: 10 additions & 12 deletions communication/src/allocator/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use crate::{Push, Pull};
use crate::allocator::Event;

/// The push half of an intra-thread channel.
pub struct Pusher<T, P: Push<T>> {
index: usize,
// count: usize,
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
pusher: P,
phantom: ::std::marker::PhantomData<T>,
}

impl<T, P: Push<T>> Pusher<T, P> {
/// Wraps a pusher with a message counter.
pub fn new(pusher: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
pub fn new(pusher: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
Pusher {
index,
// count: 0,
Expand All @@ -36,7 +34,7 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
// if self.count != 0 {
// self.events
// .borrow_mut()
// .push_back((self.index, Event::Pushed(self.count)));
// .push_back(self.index);
// self.count = 0;
// }
// }
Expand All @@ -47,7 +45,7 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
// moving information along. Better, but needs cooperation.
self.events
.borrow_mut()
.push_back((self.index, Event::Pushed(1)));
.push(self.index);

self.pusher.push(element)
}
Expand All @@ -59,15 +57,15 @@ use crossbeam_channel::Sender;
pub struct ArcPusher<T, P: Push<T>> {
index: usize,
// count: usize,
events: Sender<(usize, Event)>,
events: Sender<usize>,
pusher: P,
phantom: ::std::marker::PhantomData<T>,
buzzer: crate::buzzer::Buzzer,
}

impl<T, P: Push<T>> ArcPusher<T, P> {
/// Wraps a pusher with a message counter.
pub fn new(pusher: P, index: usize, events: Sender<(usize, Event)>, buzzer: crate::buzzer::Buzzer) -> Self {
pub fn new(pusher: P, index: usize, events: Sender<usize>, buzzer: crate::buzzer::Buzzer) -> Self {
ArcPusher {
index,
// count: 0,
Expand Down Expand Up @@ -99,7 +97,7 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
// and finally awaken the thread. Other orders are defective when
// multiple threads are involved.
self.pusher.push(element);
let _ = self.events.send((self.index, Event::Pushed(1)));
let _ = self.events.send(self.index);
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
// .expect("Failed to send message count");
self.buzzer.buzz();
Expand All @@ -110,14 +108,14 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
pub struct Puller<T, P: Pull<T>> {
index: usize,
count: usize,
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
puller: P,
phantom: ::std::marker::PhantomData<T>,
}

impl<T, P: Pull<T>> Puller<T, P> {
/// Wraps a puller with a message counter.
pub fn new(puller: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
pub fn new(puller: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
Puller {
index,
count: 0,
Expand All @@ -135,7 +133,7 @@ impl<T, P: Pull<T>> Pull<T> for Puller<T, P> {
if self.count != 0 {
self.events
.borrow_mut()
.push_back((self.index, Event::Pulled(self.count)));
.push(self.index);
self.count = 0;
}
}
Expand Down
7 changes: 3 additions & 4 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use crate::allocator::thread::ThreadBuilder;
use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process};
use crate::allocator::{Allocate, AllocateBuilder, Thread, Process};
use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};

Expand Down Expand Up @@ -74,7 +73,7 @@ impl Generic {
Generic::ZeroCopy(z) => z.release(),
}
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
match self {
Generic::Thread(ref t) => t.events(),
Generic::Process(ref p) => p.events(),
Expand All @@ -93,7 +92,7 @@ impl Allocate for Generic {

fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
Generic::Thread(t) => t.await_events(_duration),
Expand Down
11 changes: 1 addition & 10 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

pub use self::thread::Thread;
pub use self::process::Process;
Expand Down Expand Up @@ -50,7 +49,7 @@ pub trait Allocate {
/// drain these events in order to drive their computation. If they
/// fail to do so the event queue may become quite large, and turn
/// into a performance problem.
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>>;
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;

/// Awaits communication events.
///
Expand Down Expand Up @@ -92,11 +91,3 @@ pub trait Allocate {
thread::Thread::new_from(identifier, self.events().clone())
}
}

/// A communication channel event.
pub enum Event {
/// A number of messages pushed into the channel.
Pushed(usize),
/// A number of messages pulled from the channel.
Pulled(usize),
}
18 changes: 9 additions & 9 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::time::Duration;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap};
use crossbeam_channel::{Sender, Receiver};

use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
use crate::allocator::{Allocate, AllocateBuilder, Thread};
use crate::{Push, Pull, Message};
use crate::buzzer::Buzzer;

Expand All @@ -25,8 +25,8 @@ pub struct ProcessBuilder {
buzzers_send: Vec<Sender<Buzzer>>,
buzzers_recv: Vec<Receiver<Buzzer>>,

counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
counters_send: Vec<Sender<usize>>,
counters_recv: Receiver<usize>,
}

impl AllocateBuilder for ProcessBuilder {
Expand Down Expand Up @@ -63,8 +63,8 @@ pub struct Process {
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap</* channel id */ usize, Box<dyn Any+Send>>>>,
buzzers: Vec<Buzzer>,
counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
counters_send: Vec<Sender<usize>>,
counters_recv: Receiver<usize>,
}

impl Process {
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Allocate for Process {
(sends, recv)
}

fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}

Expand All @@ -184,8 +184,8 @@ impl Allocate for Process {

fn receive(&mut self) {
let mut events = self.inner.events().borrow_mut();
while let Ok((index, event)) = self.counters_recv.try_recv() {
events.push_back((index, event));
while let Ok(index) = self.counters_recv.try_recv() {
events.push(index);
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

use crate::allocator::{Allocate, AllocateBuilder, Event};
use crate::allocator::{Allocate, AllocateBuilder};
use crate::allocator::counters::Pusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;
use crate::{Push, Pull, Message};
Expand All @@ -22,7 +22,7 @@ impl AllocateBuilder for ThreadBuilder {
/// An allocator for intra-thread communication.
pub struct Thread {
/// Shared counts of messages in channels.
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
}

impl Allocate for Thread {
Expand All @@ -32,7 +32,7 @@ impl Allocate for Thread {
let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
(vec![Box::new(pusher)], Box::new(puller))
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<Duration>) {
Expand All @@ -56,12 +56,12 @@ impl Thread {
/// Allocates a new thread-local channel allocator.
pub fn new() -> Self {
Thread {
events: Rc::new(RefCell::new(VecDeque::new())),
events: Rc::new(RefCell::new(Default::default())),
}
}

/// Creates a new thread-local channel from an identifier and shared counts.
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>)
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
{
let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
Expand Down
5 changes: 2 additions & 3 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::allocator::Event;
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -229,7 +228,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {

// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
events.push_back((header.channel, Event::Pushed(1)));
events.push(header.channel);

// Ensure that a queue exists.
match self.to_local.entry(header.channel) {
Expand Down Expand Up @@ -269,7 +268,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
// }
// }
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
Expand Down
Loading