Skip to content

Commit 2398b79

Browse files
Introduce columnar and derive extensively (#608)
1 parent 08eca72 commit 2398b79

File tree

9 files changed

+34
-28
lines changed

9 files changed

+34
-28
lines changed

communication/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ license = "MIT"
1717
default = ["getopts"]
1818

1919
[dependencies]
20+
columnar = "0.1"
2021
getopts = { version = "0.2.21", optional = true }
2122
byteorder = "1.5"
2223
serde = { version = "1.0", features = ["derive"] }

communication/src/logging.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
//! Configuration and events for communication logging.
22
3+
use columnar::Columnar;
34
use serde::{Serialize, Deserialize};
45

56
/// Configuration information about a communication thread.
6-
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
7+
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
78
pub struct CommunicationSetup {
89
/// `true` when this is a send thread (or the receive thread).
910
pub sender: bool,
@@ -14,7 +15,7 @@ pub struct CommunicationSetup {
1415
}
1516

1617
/// Various communication events.
17-
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
18+
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
1819
pub enum CommunicationEvent {
1920
/// An observed message.
2021
Message(MessageEvent),
@@ -23,7 +24,7 @@ pub enum CommunicationEvent {
2324
}
2425

2526
/// An observed message.
26-
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
27+
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
2728
pub struct MessageEvent {
2829
/// `true` for send event, `false` for receive event
2930
pub is_send: bool,
@@ -32,7 +33,7 @@ pub struct MessageEvent {
3233
}
3334

3435
/// Starting or stopping communication threads.
35-
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
36+
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
3637
pub struct StateEvent {
3738
/// Is the thread a send (vs a recv) thread.
3839
pub send: bool,

communication/src/networking.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::thread::sleep;
99
use std::time::Duration;
1010

1111
use byteorder::{ReadBytesExt, WriteBytesExt};
12+
use columnar::Columnar;
1213
use serde::{Deserialize, Serialize};
1314

1415
// This constant is sent along immediately after establishing a TCP stream, so
@@ -22,7 +23,7 @@ type ByteOrder = byteorder::BigEndian;
2223
/// Framing data for each `Vec<u8>` transmission, indicating a typed channel, the source and
2324
/// destination workers, and the length in bytes.
2425
// *Warning*: Adding, removing and altering fields requires to adjust the implementation below!
25-
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
26+
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
2627
pub struct MessageHeader {
2728
/// index of channel.
2829
pub channel: usize,

timely/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ default = ["getopts"]
1919
getopts = ["getopts-dep", "timely_communication/getopts"]
2020

2121
[dependencies]
22+
columnar = "0.1"
2223
getopts-dep = { package = "getopts", version = "0.2.21", optional = true }
2324
bincode = { version = "1.0" }
2425
byteorder = "1.5"
@@ -33,5 +34,3 @@ smallvec = { version = "1.13.2", features = ["serde", "const_generics"] }
3334
[dev-dependencies]
3435
bytemuck = "1.18.0"
3536
rand = { version = "0.8", features = ["small_rng"] }
36-
columnar = "0.1"
37-
# columnar = { git = "https://github.com/frankmcsherry/columnar" }

timely/src/dataflow/channels/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub mod pullers;
1212
pub mod pact;
1313

1414
/// A serializable representation of timestamped data.
15-
#[derive(Clone, Serialize, Deserialize)]
15+
#[derive(Clone)]
1616
pub struct Message<T, C> {
1717
/// The timestamp associated with the message.
1818
pub time: T,

timely/src/dataflow/operators/core/capture/event.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
//! stream. There are two types of events, (i) the receipt of data and (ii) reports of progress
55
//! of timestamps.
66
7+
use columnar::Columnar;
78
use serde::{Deserialize, Serialize};
89

910
/// Data and progress events of the captured stream.
10-
#[derive(Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)]
11+
#[derive(Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize, Columnar)]
1112
pub enum Event<T, C> {
1213
/// Progress received via `push_external_progress`.
1314
Progress(Vec<(T, i64)>),

timely/src/logging.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub type TimelyLogger = Logger<TimelyEvent>;
1010
pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;
1111

1212
use std::time::Duration;
13+
use columnar::Columnar;
1314
use serde::{Deserialize, Serialize};
1415
use crate::dataflow::operators::capture::{Event, EventPusher};
1516

@@ -49,7 +50,7 @@ impl<T, E, P> Drop for BatchLogger<T, E, P> where P: EventPusher<Duration, Vec<(
4950
}
5051
}
5152

52-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
53+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
5354
/// The creation of an `Operate` implementor.
5455
pub struct OperatesEvent {
5556
/// Worker-unique identifier for the operator.
@@ -60,7 +61,7 @@ pub struct OperatesEvent {
6061
pub name: String,
6162
}
6263

63-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
64+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
6465
/// The creation of a channel between operators.
6566
pub struct ChannelsEvent {
6667
/// Worker-unique identifier for the channel
@@ -146,14 +147,14 @@ pub struct TimelyProgressEvent {
146147
pub internal: Box<dyn ProgressEventTimestampVec>,
147148
}
148149

149-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
150+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
150151
/// External progress pushed onto an operator
151152
pub struct PushProgressEvent {
152153
/// Worker-unique operator identifier
153154
pub op_id: usize,
154155
}
155156

156-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
157+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
157158
/// Message send or receive event
158159
pub struct MessagesEvent {
159160
/// `true` if send event, `false` if receive event.
@@ -171,15 +172,15 @@ pub struct MessagesEvent {
171172
}
172173

173174
/// Records the starting and stopping of an operator.
174-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
175+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
175176
pub enum StartStop {
176177
/// Operator starts.
177178
Start,
178179
/// Operator stops.
179180
Stop,
180181
}
181182

182-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
183+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
183184
/// Operator start or stop.
184185
pub struct ScheduleEvent {
185186
/// Worker-unique identifier for the operator, linkable to the identifiers in [`OperatesEvent`].
@@ -197,14 +198,14 @@ impl ScheduleEvent {
197198
pub fn stop(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Stop } }
198199
}
199200

200-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
201+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
201202
/// Operator shutdown.
202203
pub struct ShutdownEvent {
203204
/// Worker-unique identifier for the operator, linkable to the identifiers in [`OperatesEvent`].
204205
pub id: usize,
205206
}
206207

207-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
208+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
208209
/// Application-defined code start or stop
209210
pub struct ApplicationEvent {
210211
/// Unique event type identifier
@@ -213,28 +214,28 @@ pub struct ApplicationEvent {
213214
pub is_start: bool,
214215
}
215216

216-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
217+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
217218
/// Application-defined code start or stop
218219
pub struct GuardedMessageEvent {
219220
/// `true` when activity begins, `false` when it stops
220221
pub is_start: bool,
221222
}
222223

223-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
224+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
224225
/// Application-defined code start or stop
225226
pub struct GuardedProgressEvent {
226227
/// `true` when activity begins, `false` when it stops
227228
pub is_start: bool,
228229
}
229230

230-
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone, Copy)]
231+
#[derive(Serialize, Deserialize, Columnar, Debug, PartialEq, Eq, Hash, Clone, Copy)]
231232
/// Identifier of the worker that generated a log line
232233
pub struct TimelySetup {
233234
/// Worker index
234235
pub index: usize,
235236
}
236237

237-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
238+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
238239
/// Kind of communication channel
239240
pub enum CommChannelKind {
240241
/// Communication channel carrying progress information
@@ -243,7 +244,7 @@ pub enum CommChannelKind {
243244
Data,
244245
}
245246

246-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
247+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
247248
/// Event on a communication channel
248249
pub struct CommChannelsEvent {
249250
/// Communication channel identifier
@@ -252,15 +253,15 @@ pub struct CommChannelsEvent {
252253
pub kind: CommChannelKind,
253254
}
254255

255-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
256+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
256257
/// Input logic start/stop
257258
pub struct InputEvent {
258259
/// True when activity begins, false when it stops
259260
pub start_stop: StartStop,
260261
}
261262

262263
/// Records the starting and stopping of an operator.
263-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
264+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
264265
pub enum ParkEvent {
265266
/// Worker parks.
266267
Park(Option<Duration>),
@@ -275,7 +276,7 @@ impl ParkEvent {
275276
pub fn unpark() -> Self { ParkEvent::Unpark }
276277
}
277278

278-
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
279+
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
279280
/// An event in a timely worker
280281
pub enum TimelyEvent {
281282
/// Operator creation.

timely/src/order.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub use product::flatcontainer::ProductRegion as FlatProductRegion;
6161
/// A pair of timestamps, partially ordered by the product order.
6262
mod product {
6363
use std::fmt::{Formatter, Error, Debug};
64+
use columnar::Columnar;
6465
use serde::{Deserialize, Serialize};
6566

6667
use crate::container::columnation::{Columnation, Region};
@@ -73,7 +74,7 @@ mod product {
7374
///
7475
/// We use `Product` rather than `(TOuter, TInner)` so that we can derive our own `PartialOrder`,
7576
/// because Rust just uses the lexicographic total order.
76-
#[derive(Copy, Clone, Hash, Eq, PartialEq, Default, Ord, PartialOrd, Serialize, Deserialize)]
77+
#[derive(Copy, Clone, Hash, Eq, PartialEq, Default, Ord, PartialOrd, Serialize, Deserialize, Columnar)]
7778
pub struct Product<TOuter, TInner> {
7879
/// Outer timestamp.
7980
pub outer: TOuter,

timely/src/progress/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Progress tracking mechanisms to support notification in timely dataflow
22
3+
use columnar::Columnar;
34
use serde::{Deserialize, Serialize};
45
pub use self::operate::Operate;
56
pub use self::subgraph::{Subgraph, SubgraphBuilder};
@@ -16,7 +17,7 @@ pub mod reachability;
1617
pub mod subgraph;
1718

1819
/// A timely dataflow location.
19-
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize)]
20+
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize, Columnar)]
2021
pub struct Location {
2122
/// A scope-local operator identifier.
2223
pub node: usize,
@@ -58,7 +59,7 @@ impl From<Source> for Location {
5859
}
5960

6061
/// An operator port.
61-
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize)]
62+
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize, Columnar)]
6263
pub enum Port {
6364
/// An operator input.
6465
Target(usize),

0 commit comments

Comments
 (0)