Skip to content

Commit 91209b6

Browse files
committed
Merge branch 'master' into miri_test
2 parents f717abd + d361f41 commit 91209b6

File tree

19 files changed

+130
-221
lines changed

19 files changed

+130
-221
lines changed

communication/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ serde = { version = "1.0", features = ["derive"] }
2424
timely_bytes = { path = "../bytes", version = "0.13" }
2525
timely_container = { path = "../container", version = "0.15.0" }
2626
timely_logging = { path = "../logging", version = "0.13" }
27-
crossbeam-channel = "0.5"
2827

2928
# Lgalloc only supports linux and macos, don't depend on any other OS.
3029
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]

communication/src/allocator/counters.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5+
use std::sync::mpsc::Sender;
56

67
use crate::{Push, Pull};
78

@@ -51,8 +52,6 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
5152
}
5253
}
5354

54-
use crossbeam_channel::Sender;
55-
5655
/// The push half of an intra-thread channel.
5756
pub struct ArcPusher<T, P: Push<T>> {
5857
index: usize,

communication/src/allocator/process.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
66
use std::any::Any;
77
use std::time::Duration;
88
use std::collections::{HashMap};
9-
use crossbeam_channel::{Sender, Receiver};
9+
use std::sync::mpsc::{Sender, Receiver};
1010

1111
use crate::allocator::thread::{ThreadBuilder};
1212
use crate::allocator::{Allocate, AllocateBuilder, PeerBuilder, Thread};
@@ -80,7 +80,7 @@ impl PeerBuilder for Process {
8080
let mut counters_send = Vec::with_capacity(peers);
8181
let mut counters_recv = Vec::with_capacity(peers);
8282
for _ in 0 .. peers {
83-
let (send, recv) = crossbeam_channel::unbounded();
83+
let (send, recv) = std::sync::mpsc::channel();
8484
counters_send.push(send);
8585
counters_recv.push(recv);
8686
}
@@ -130,7 +130,7 @@ impl Allocate for Process {
130130
let mut pushers = Vec::with_capacity(self.peers);
131131
let mut pullers = Vec::with_capacity(self.peers);
132132
for buzzer in self.buzzers.iter() {
133-
let (s, r): (Sender<T>, Receiver<T>) = crossbeam_channel::unbounded();
133+
let (s, r): (Sender<T>, Receiver<T>) = std::sync::mpsc::channel();
134134
// TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter.
135135
pushers.push((Pusher { target: s }, buzzer.clone()));
136136
pullers.push(Puller { source: r, current: None });

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use std::rc::Rc;
33
use std::cell::RefCell;
44
use std::collections::{VecDeque, HashMap, hash_map::Entry};
5-
use crossbeam_channel::{Sender, Receiver};
5+
use std::sync::mpsc::{Sender, Receiver};
66

77
use timely_bytes::arc::Bytes;
88

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::collections::{VecDeque, HashMap, hash_map::Entry};
6-
use crossbeam_channel::{Sender, Receiver};
6+
use std::sync::mpsc::{Sender, Receiver};
77

88
use timely_bytes::arc::Bytes;
99

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Methods related to reading from and writing to TCP connections
22
33
use std::io::{self, Write};
4-
use crossbeam_channel::{Sender, Receiver};
4+
use std::sync::mpsc::{Sender, Receiver};
55

66
use crate::networking::MessageHeader;
77

communication/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ pub use allocator::Generic as Allocator;
107107
pub use allocator::{Allocate, Exchangeable};
108108
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
109109

110+
use std::sync::mpsc::{Sender, Receiver};
111+
110112
use timely_bytes::arc::Bytes;
111113

112114
/// A type that can be serialized and deserialized through `Bytes`.
@@ -169,8 +171,6 @@ impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
169171
}
170172

171173

172-
use crossbeam_channel::{Sender, Receiver};
173-
174174
/// Allocate a matrix of send and receive changes to exchange items.
175175
///
176176
/// This method constructs channels for `sends` threads to create and send
@@ -183,7 +183,7 @@ fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<V
183183

184184
for sender in senders.iter_mut() {
185185
for recver in recvers.iter_mut() {
186-
let (send, recv) = crossbeam_channel::unbounded();
186+
let (send, recv) = std::sync::mpsc::channel();
187187
sender.push(send);
188188
recver.push(recv);
189189
}

timely/src/dataflow/operators/capability.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use std::cell::RefCell;
2727
use std::fmt::{self, Debug};
2828

2929
use crate::order::PartialOrder;
30+
use crate::progress::Antichain;
3031
use crate::progress::Timestamp;
3132
use crate::progress::ChangeBatch;
32-
use crate::progress::operate::PortConnectivity;
3333
use crate::scheduling::Activations;
3434
use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3535

@@ -238,26 +238,26 @@ pub struct InputCapability<T: Timestamp> {
238238
/// Output capability buffers, for use in minting capabilities.
239239
internal: CapabilityUpdates<T>,
240240
/// Timestamp summaries for each output.
241-
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
241+
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
242242
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
243243
consumed_guard: ConsumedGuard<T>,
244244
}
245245

246246
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247247
fn time(&self) -> &T { self.time() }
248248
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
249-
let internal_borrow = self.internal.borrow();
250-
self.summaries.borrow().iter().any(|(output, summary)| {
249+
let borrow = self.summaries.borrow();
250+
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
251251
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252-
Rc::ptr_eq(&internal_borrow[*output], query_buffer) && summary.len() == 1 && summary[0] == Default::default()
252+
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
253253
})
254254
}
255255
}
256256

257257
impl<T: Timestamp> InputCapability<T> {
258258
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
259259
/// the provided [`ChangeBatch`].
260-
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
260+
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
261261
InputCapability {
262262
internal,
263263
summaries,
@@ -281,10 +281,10 @@ impl<T: Timestamp> InputCapability<T> {
281281
/// Delays capability for a specific output port.
282282
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
283283
use crate::progress::timestamp::PathSummary;
284-
if self.summaries.borrow()[&output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
284+
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
285285
Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone())
286286
} else {
287-
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[&output_port], self.time());
287+
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
288288
}
289289
}
290290

@@ -305,11 +305,11 @@ impl<T: Timestamp> InputCapability<T> {
305305
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
306306
use crate::progress::timestamp::PathSummary;
307307
let self_time = self.time().clone();
308-
if self.summaries.borrow()[&output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
308+
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
309309
Capability::new(self_time, self.internal.borrow()[output_port].clone())
310310
}
311311
else {
312-
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[&output_port], self_time);
312+
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
313313
}
314314
}
315315
}

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
113113
let summary = handle.summary;
114114
let mut output = handle.output;
115115

116-
let connection = Some((0, Antichain::from_elem(summary.clone()))).into_iter().collect();
117-
let mut input = builder.new_input_connection(self, Pipeline, connection);
116+
let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
118117

119118
builder.build(move |_capability| move |_frontier| {
120119
let mut output = output.activate();

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

10+
use crate::progress::frontier::Antichain;
1011
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
1112
use crate::progress::Source;
12-
use crate::progress::operate::Connectivity;
1313

1414
use crate::{Container, Data};
1515
use crate::communication::Push;
@@ -205,7 +205,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
205205
fn inputs(&self) -> usize { 0 }
206206
fn outputs(&self) -> usize { 1 }
207207

208-
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
208+
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
209209
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
210210
(Vec::new(), self.shared_progress.clone())
211211
}

0 commit comments

Comments
 (0)