Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.

Commit b689142

Browse files
committed
Update trait bounds in rest of dbsp to work with persistence.
Signed-off-by: Gerd Zellweger <[email protected]>
1 parent 088f0a5 commit b689142

File tree

16 files changed

+345
-405
lines changed

16 files changed

+345
-405
lines changed

Cargo.lock

Lines changed: 102 additions & 233 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,41 +7,48 @@ edition = "2021"
77
default = ["with-serde"]
88
with-serde = ["serde"]
99
with-csv = ["csv"]
10-
with-nexmark = ["rand", "clap", "cached"]
11-
persistence = []
10+
with-nexmark = [
11+
"arcstr",
12+
"cached",
13+
"clap",
14+
"csv",
15+
"rand",
16+
"regex",
17+
"rust_decimal",
18+
"time",
19+
"size-of/rust_decimal",
20+
"size-of/arcstr",
21+
]
1222

1323
[dependencies]
14-
anyhow = "1.0.57"
1524
num = "0.4.0"
25+
anyhow = "1.0.57"
26+
size-of = "0.1.1"
1627
petgraph = "0.6.0"
1728
typedmap = { version = "0.3.0", features = ["dashmap"] }
1829
crossbeam-utils = "0.8.6"
19-
once_cell = "1.13.0"
30+
once_cell = "1.9.0"
2031
priority-queue = "1.2.1"
2132
hashbrown = "0.12.0"
2233
csv = { version = "1.1", optional = true }
2334
serde = { version = "1.0", optional = true }
2435
impl-trait-for-tuples = "0.2"
25-
deepsize = "0.2.0"
26-
deepsize_derive = "0.1.2"
2736
textwrap = "0.15.0"
2837
fxhash = "0.2"
2938
ordered-float = "3.0.0"
39+
arcstr = { version = "1.1.4", optional = true }
40+
rust_decimal = { version = "1.26.1", optional = true }
41+
regex = { version = "1.6.0", optional = true }
42+
time = { version = "0.3.14", features = ["formatting"], optional = true }
43+
rocksdb = { version = "0.18", default-features = false, features = ["multi-threaded-cf"] }
44+
bincode = { version = "2.0.0-rc.1" }
45+
uuid = { version = "1.1.2", features = ["v4"] }
3046

3147
# TODO: Remove these dependencies
3248
rand = { version = "0.8", optional = true }
3349
clap = { version = "3.2", optional = true, features = ["derive", "env"] }
3450
cached = { version = "0.38.0", optional = true }
3551
crossbeam = "0.8.2"
36-
rocksdb = { version = "0.18", default-features = false, features = ["multi-threaded-cf"] }
37-
bincode = { version = "2.0.0-rc.1" }
38-
uuid = { version = "1.1.2", features = ["v4"] }
39-
40-
# TODO: eliminate dependency on timely-dataflow by cloning relevant
41-
# parts.
42-
timely = "0.12.0"
43-
proptest = "1"
44-
proptest-derive = "0.3.0"
4552

4653
[dev-dependencies]
4754
zip = "0.6.2"
@@ -60,22 +67,13 @@ clap = { version = "3.2.8", features = ["derive", "env"] }
6067
reqwest = { version = "0.11.11", features = ["blocking"] }
6168
ascii_table = "4.0.2"
6269
num-format = "0.4.0"
63-
zipf = "7.0"
6470

6571
[target.'cfg(unix)'.dev-dependencies]
6672
libc = "0.2.127"
67-
# The latest release of pbr (1.0.4) depends on an old version of time (0.1.44) with
68-
# a vulnerability. The dependency has been removed but a new version is yet
69-
# to be released https://github.com/a8m/pb/issues/113
70-
pbr = { git = "https://github.com/a8m/pb", rev = "09a8e592c1bb0aa1d6215e35c5c8b49b7a5ad6bd"}
7173

7274
[profile.bench]
7375
debug = true
7476

75-
[[bench]]
76-
name = "persistence"
77-
harness = false
78-
7977
[[bench]]
8078
name = "galen"
8179
harness = false

src/operator/aggregate/average.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@ where
140140
#[track_caller]
141141
pub fn average<TS, A, F>(&self, f: F) -> Stream<Circuit<P>, OrdIndexedZSet<Z::Key, A, isize>>
142142
where
143-
TS: Timestamp + SizeOf,
143+
TS: Timestamp + SizeOf + bincode::Decode + bincode::Encode,
144144
Z: IndexedZSet,
145-
Z::Key: PartialEq + Ord + SizeOf + Hash + Clone + SizeOf + Send,
146-
Z::Val: Ord + SizeOf + Clone,
147-
Avg<A>: MulByRef<Z::R, Output = Avg<A>>,
145+
Z::Key: PartialEq + Ord + Hash + Clone + SizeOf + Send + bincode::Decode + bincode::Encode,
146+
Z::Val: Ord + SizeOf + Clone + bincode::Decode + bincode::Encode,
147+
Avg<A>: MulByRef<Z::R, Output = Avg<A>> + bincode::Decode + bincode::Encode,
148148
A: GroupValue + SizeOf + Ord + Send + Clone,
149149
A: Div<isize, Output = A>,
150150
isize: MulByRef<Z::R, Output = isize>,

src/operator/aggregate/mod.rs

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use crate::{
2121
time::Timestamp,
2222
trace::{
2323
cursor::{Cursor, CursorGroup},
24-
Spine,
25-
Batch, BatchReader, Builder,
24+
Batch, BatchReader, Builder, Spine,
2625
},
2726
NumEntries, OrdIndexedZSet, OrdZSet,
2827
};
@@ -110,8 +109,9 @@ where
110109
) -> Stream<Circuit<P>, OrdIndexedZSet<Z::Key, A::Output, isize>>
111110
where
112111
Z: IndexedZSet + Send + 'static,
113-
Z::Key: Ord + SizeOf + Clone + Hash,
114-
Z::Val: Ord + SizeOf + Clone,
112+
Z::Key: Ord + SizeOf + Clone + Hash + bincode::Decode + bincode::Encode,
113+
Z::Val: Ord + SizeOf + Clone + bincode::Decode + bincode::Encode,
114+
Z::R: bincode::Decode + bincode::Encode,
115115
A: Aggregator<Z::Val, (), Z::R> + 'static,
116116
A::Output: Ord + SizeOf + Clone,
117117
{
@@ -122,11 +122,12 @@ where
122122
pub fn stream_aggregate_generic<A, O>(&self, aggregator: A) -> Stream<Circuit<P>, O>
123123
where
124124
Z: IndexedZSet + Send + 'static,
125-
Z::Key: Ord + Clone + Hash,
126-
Z::Val: Ord + Clone,
125+
Z::Key: Ord + Clone + Hash + bincode::Decode + bincode::Encode,
126+
Z::Val: Ord + Clone + bincode::Decode + bincode::Encode,
127+
Z::R: bincode::Decode + bincode::Encode,
127128
A: Aggregator<Z::Val, (), Z::R> + 'static,
128129
O: Clone + IndexedZSet<Key = Z::Key, Val = A::Output> + 'static,
129-
O::R: ZRingValue,
130+
O::R: ZRingValue + bincode::Decode + bincode::Encode,
130131
{
131132
self.circuit()
132133
.add_unary_operator(Aggregate::new(aggregator), &self.shard())
@@ -144,29 +145,29 @@ where
144145
aggregator: A,
145146
) -> Stream<Circuit<P>, OrdIndexedZSet<Z::Key, A::Output, isize>>
146147
where
147-
TS: Timestamp + SizeOf,
148+
TS: Timestamp + SizeOf + bincode::Decode + bincode::Encode,
148149
Z: IndexedZSet + SizeOf + NumEntries + Send, /* + std::fmt::Display */
149-
Z::Key: PartialEq + Ord + Hash + Clone + SizeOf, /* + std::fmt::Display */
150-
Z::Val: Ord + Clone + SizeOf, /* + std::fmt::Debug */
151-
Z::R: SizeOf, /* + std::fmt::Display */
150+
Z::Key: PartialEq + Ord + Hash + Clone + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
151+
Z::Val: Ord + Clone + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Debug */
152+
Z::R: SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
152153
A: Aggregator<Z::Val, TS, Z::R> + 'static,
153-
A::Output: Ord + Clone + SizeOf + 'static,
154+
A::Output: Ord + Clone + SizeOf + bincode::Decode + bincode::Encode + 'static,
154155
{
155156
self.aggregate_generic::<TS, A, OrdIndexedZSet<Z::Key, A::Output, isize>>(aggregator)
156157
}
157158

158159
/// Like [`Self::aggregate`], but can return any batch type.
159160
pub fn aggregate_generic<TS, A, O>(&self, aggregator: A) -> Stream<Circuit<P>, O>
160161
where
161-
TS: Timestamp + SizeOf,
162+
TS: Timestamp + SizeOf + bincode::Decode + bincode::Encode,
162163
Z: IndexedZSet + SizeOf + NumEntries + Send, /* + std::fmt::Display */
163-
Z::Key: PartialEq + Ord + Hash + Clone + SizeOf, /* + std::fmt::Display */
164-
Z::Val: Ord + Clone + SizeOf, /* + std::fmt::Debug */
165-
Z::R: SizeOf, /* + std::fmt::Display */
164+
Z::Key: PartialEq + Ord + Hash + Clone + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
165+
Z::Val: Ord + Clone + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Debug */
166+
Z::R: SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
166167
A: Aggregator<Z::Val, TS, Z::R> + 'static,
167-
A::Output: Ord + Clone + SizeOf, /* + std::fmt::Display */
168+
A::Output: Ord + Clone + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
168169
O: Batch<Key = Z::Key, Val = A::Output, Time = ()> + Clone + 'static,
169-
O::R: ZRingValue + SizeOf, /* + std::fmt::Display */
170+
O::R: ZRingValue + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
170171
{
171172
let circuit = self.circuit();
172173
let stream = self.shard();
@@ -205,11 +206,18 @@ where
205206
f: F,
206207
) -> Stream<Circuit<P>, OrdIndexedZSet<Z::Key, A, isize>>
207208
where
208-
TS: Timestamp + SizeOf,
209+
TS: Timestamp + SizeOf + bincode::Decode + bincode::Encode,
209210
Z: IndexedZSet,
210-
Z::Key: PartialEq + Ord + SizeOf + Hash + Clone + SizeOf + Send, /* + std::fmt::Display */
211-
Z::Val: Ord + SizeOf + Clone, /* + std::fmt::Display */
212-
A: MulByRef<Z::R, Output = A> + GroupValue + SizeOf + Ord + Send,
211+
Z::Key:
212+
PartialEq + Ord + Hash + Clone + SizeOf + Send + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
213+
Z::Val: Ord + Clone + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
214+
A: MulByRef<Z::R, Output = A>
215+
+ GroupValue
216+
+ SizeOf
217+
+ Ord
218+
+ Send
219+
+ bincode::Decode
220+
+ bincode::Encode,
213221
F: Fn(&Z::Key, &Z::Val) -> A + Clone + 'static,
214222
{
215223
self.aggregate_linear_generic::<TS, _, _>(f)
@@ -218,14 +226,26 @@ where
218226
/// Like [`Self::aggregate_linear`], but can return any batch type.
219227
pub fn aggregate_linear_generic<TS, F, O>(&self, f: F) -> Stream<Circuit<P>, O>
220228
where
221-
TS: Timestamp + SizeOf,
222-
Z: IndexedZSet,
223-
Z::Key: PartialEq + Ord + SizeOf + Hash + Clone + SizeOf + Send, /* + std::fmt::Display */
224229
Z::Val: Ord + SizeOf + Clone, /* + std::fmt::Display */
225230
F: Fn(&Z::Key, &Z::Val) -> O::Val + Clone + 'static,
226231
O: Clone + Batch<Key = Z::Key, Time = ()> + 'static,
227232
O::R: ZRingValue + SizeOf, /* + std::fmt::Display */
228233
O::Val: MulByRef<Z::R, Output = O::Val> + GroupValue + SizeOf + Ord + Send, /* + std::fmt::Display */
234+
TS: Timestamp + SizeOf + bincode::Decode + bincode::Encode,
235+
Z: IndexedZSet,
236+
Z::Key:
237+
PartialEq + Ord + SizeOf + Hash + Clone + Send + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
238+
Z::Val: Ord + SizeOf + Clone + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
239+
F: Fn(&Z::Key, &Z::Val) -> O::Val + Clone + 'static,
240+
O: Clone + Batch<Key = Z::Key, Time = ()> + 'static,
241+
O::R: ZRingValue + SizeOf + bincode::Decode + bincode::Encode, /* + std::fmt::Display */
242+
O::Val: MulByRef<Z::R, Output = O::Val>
243+
+ GroupValue
244+
+ SizeOf
245+
+ Ord
246+
+ Send
247+
+ bincode::Decode
248+
+ bincode::Encode, /* + std::fmt::Display */
229249
{
230250
self.weigh(f).aggregate_generic::<TS, _, _>(WeightedCount)
231251
}
@@ -245,10 +265,10 @@ where
245265
pub fn weigh<F, T>(&self, f: F) -> Stream<Circuit<P>, OrdZSet<Z::Key, T>>
246266
where
247267
Z: IndexedZSet,
248-
Z::Key: Ord + SizeOf + Clone,
249-
Z::Val: Ord + SizeOf + Clone,
268+
Z::Key: Ord + SizeOf + Clone + bincode::Decode + bincode::Encode,
269+
Z::Val: Ord + SizeOf + Clone + bincode::Decode + bincode::Encode,
250270
F: Fn(&Z::Key, &Z::Val) -> T + 'static,
251-
T: MulByRef<Z::R, Output = T> + MonoidValue + SizeOf,
271+
T: MulByRef<Z::R, Output = T> + MonoidValue + SizeOf + bincode::Decode + bincode::Encode,
252272
{
253273
self.weigh_generic::<_, OrdZSet<_, _>>(f)
254274
}

src/operator/communication/shard.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use crate::{
88
circuit::GlobalNodeId,
99
circuit_cache_key,
10-
trace::{cursor::Cursor, Spine, Batch, BatchReader, Builder, Trace},
10+
trace::{cursor::Cursor, Batch, BatchReader, Builder, Spine, Trace},
1111
Circuit, Runtime, Stream,
1212
};
1313
use std::{hash::Hash, panic::Location};
@@ -28,8 +28,10 @@ impl<P, IB> Stream<Circuit<P>, IB>
2828
where
2929
P: Clone + 'static,
3030
IB: BatchReader<Time = ()> + Clone + 'static,
31-
IB::Key: Ord + Clone + Hash,
32-
IB::Val: Ord + Clone,
31+
IB::Key: Ord + Clone + Hash + bincode::Decode + bincode::Encode,
32+
IB::Val: Ord + Clone + bincode::Decode + bincode::Encode,
33+
IB::R: bincode::Decode + bincode::Encode,
34+
IB::Time: bincode::Decode + bincode::Encode,
3335
{
3436
/// Shard batches across multiple worker threads based on keys.
3537
///

src/operator/distinct.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ where
3535
pub fn distinct(&self) -> Stream<Circuit<P>, Z>
3636
where
3737
Z: IndexedZSet + Send,
38-
Z::R: ZRingValue,
39-
Z::Key: Ord + Clone + Hash,
40-
Z::Val: Ord + Clone + Hash,
38+
Z::Key: Ord + Clone + Hash + bincode::Decode + bincode::Encode,
39+
Z::R: ZRingValue + bincode::Decode + bincode::Encode,
40+
Z::Val: Ord + Clone + bincode::Decode + bincode::Encode,
4141
{
4242
self.circuit()
4343
.cache_get_or_insert_with(DistinctId::new(self.origin_node_id().clone()), || {
@@ -55,19 +55,19 @@ where
5555
pub fn distinct_incremental(&self) -> Stream<Circuit<P>, Z>
5656
where
5757
Z: SizeOf + NumEntries + IndexedZSet + Send,
58-
Z::Key: Clone + PartialEq + Ord + Hash,
59-
Z::Val: Clone + Ord + Hash,
60-
Z::R: ZRingValue,
58+
Z::Key: Clone + PartialEq + Ord + Hash + bincode::Decode + bincode::Encode,
59+
Z::Val: Clone + Ord + Hash + bincode::Decode + bincode::Encode,
60+
Z::R: ZRingValue + bincode::Decode + bincode::Encode,
6161
{
6262
self.shard().distinct_incremental_inner().mark_sharded()
6363
}
6464

6565
fn distinct_incremental_inner(&self) -> Stream<Circuit<P>, Z>
6666
where
6767
Z: SizeOf + NumEntries + IndexedZSet,
68-
Z::Key: Clone + PartialEq + Ord,
69-
Z::Val: Clone + Ord,
70-
Z::R: ZRingValue,
68+
Z::Key: Clone + PartialEq + Ord + bincode::Decode + bincode::Encode,
69+
Z::Val: Clone + Ord + bincode::Decode + bincode::Encode,
70+
Z::R: ZRingValue + bincode::Decode + bincode::Encode,
7171
{
7272
self.circuit()
7373
.cache_get_or_insert_with(
@@ -87,9 +87,10 @@ where
8787
// TODO: remove this method.
8888
pub fn distinct_incremental_nested(&self) -> Stream<Circuit<P>, Z>
8989
where
90+
9091
Z: SizeOf + NumEntries + Send + ZSet,
91-
Z::Key: Clone + PartialEq + Ord + Hash,
92-
Z::R: ZRingValue,
92+
Z::Key: Clone + PartialEq + Ord + Hash + bincode::Decode + bincode::Encode,
93+
Z::R: ZRingValue + bincode::Decode + bincode::Encode,
9394
{
9495
self.shard()
9596
.integrate_nested()
@@ -114,8 +115,8 @@ where
114115
pub fn distinct_trace(&self) -> Stream<Circuit<P>, Z>
115116
where
116117
Z: NumEntries + ZSet + SizeOf + Send,
117-
Z::Key: Clone + Ord + SizeOf + Hash,
118-
Z::R: ZRingValue + SizeOf,
118+
Z::Key: Clone + Ord + SizeOf + Hash + bincode::Decode + bincode::Encode,
119+
Z::R: ZRingValue + SizeOf + bincode::Decode + bincode::Encode,
119120
{
120121
self.circuit()
121122
.cache_get_or_insert_with(DistinctTraceId::new(self.origin_node_id().clone()), || {

src/operator/input.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,15 @@ impl Circuit<()> {
133133
upsert_func: F,
134134
) -> Stream<Self, B>
135135
where
136-
K: Clone + Ord + Send + SizeOf + Hash + 'static,
136+
K: Clone + Ord + Send + SizeOf + Hash + bincode::Decode + bincode::Encode + 'static,
137137
F: Fn(VI) -> Option<V> + 'static,
138138
B: Batch<Key = K, Val = V, Time = ()> + SizeOf + 'static,
139-
B::R: ZRingValue + Ord + SizeOf,
140-
V: Eq + Clone + Ord + SizeOf + 'static,
139+
B::R: ZRingValue
140+
+ Ord
141+
+ SizeOf
142+
+ bincode::Decode
143+
+ bincode::Encode,
144+
V: Eq + Clone + Ord + SizeOf + bincode::Decode + bincode::Encode + 'static,
141145
VI: Eq + Clone + Send + 'static,
142146
{
143147
let sorted = input_stream
@@ -231,8 +235,8 @@ impl Circuit<()> {
231235
// TODO: Add a version that takes a custom hash function.
232236
pub fn add_input_set<K, R>(&self) -> (ZSetStream<K, R>, UpsertHandle<K, bool>)
233237
where
234-
K: Clone + Ord + Send + SizeOf + Hash + 'static,
235-
R: ZRingValue + SizeOf + Ord,
238+
K: Clone + Ord + Send + SizeOf + Hash + bincode::Decode + bincode::Encode + 'static,
239+
R: ZRingValue + SizeOf + Ord + bincode::Decode + bincode::Encode,
236240
{
237241
self.region("input_set", || {
238242
let (input, input_handle) = Input::new(|tuples: Vec<(K, bool)>| tuples);
@@ -314,9 +318,9 @@ impl Circuit<()> {
314318
// TODO: Add a version that takes a custom hash function.
315319
pub fn add_input_map<K, V, R>(&self) -> (IndexedZSetStream<K, V, R>, UpsertHandle<K, Option<V>>)
316320
where
317-
K: Clone + Ord + Hash + Send + SizeOf + 'static,
318-
V: Ord + Clone + Send + SizeOf + 'static,
319-
R: ZRingValue + SizeOf + Ord,
321+
K: Clone + Ord + Hash + Send + SizeOf + bincode::Decode + bincode::Encode + 'static,
322+
V: Ord + Clone + Send + SizeOf + bincode::Decode + bincode::Encode + 'static,
323+
R: ZRingValue + Ord + SizeOf + bincode::Decode + bincode::Encode,
320324
{
321325
self.region("input_map", || {
322326
let (input, input_handle) = Input::new(|tuples: Vec<(K, Option<V>)>| tuples);

0 commit comments

Comments
 (0)