Skip to content

Commit efed926

Browse files
bors[bot]QuietMisdreavusnikomatsakis
committed
550: add bridge from Iterator to ParallelIterator r=cuviper a=QuietMisdreavus Half of rayon-rs#46 This started getting reviewed in QuietMisdreavus/polyester#6, but i decided to move my work to Rayon proper. This PR adds a new trait, `AsParallel`, an implementation on `Iterator + Send`, and an iterator adapter `IterParallel` that implements `ParallelIterator` with a similar "cache items as you go" methodology as Polyester. I introduced a new trait because `ParallelIterator` was implemented on `Range`, which is itself an `Iterator`. The basic idea is that you would start with a quick sequential `Iterator`, call `.as_parallel()` on it, and be able to use `ParallelIterator` adapters after that point, to do more expensive processing in multiple threads. The design of `IterParallel` is like this: * `IterParallel` defers background work to `IterParallelProducer`, which implements `UnindexedProducer`. * `IterParallelProducer` will split as many times as there are threads in the current pool. (I've been told that rayon-rs#492 is a better way to organize this, but until that's in, this is how i wrote it. `>_>`) * When folding items, `IterParallelProducer` keeps a `Stealer` from `crossbeam-deque` (added as a dependency, but using the same version as `rayon-core`) to access a deque of items that have already been loaded from the iterator. * If the `Stealer` is empty, a worker will attempt to lock the Mutex to access the source `Iterator` and the `Deque`. * If the Mutex is already locked, it will call `yield_now`. The implementation in polyester used a `synchronoise::SignalEvent` but i've been told that worker threads should not block. In lieu of rayon-rs#548, a regular spin-loop was chosen instead. * If the Mutex is available, the worker will load a number of items from the iterator (currently (number of threads * number of threads * 2)) before closing the Mutex and continuing. * (If the Mutex is poisoned, the worker will just... stop. Is there a recommended approach here? `>_>`) This design is effectively a first brush, has [the same caveats as polyester](https://docs.rs/polyester/0.1.0/polyester/trait.Polyester.html#implementation-note), probably needs some extra features in rayon-core, and needs some higher-level docs before i'm willing to let it go. However, i'm putting it here because it was not in the right place when i talked to @cuviper about it last time. Co-authored-by: QuietMisdreavus <[email protected]> Co-authored-by: Niko Matsakis <[email protected]>
2 parents 11bd211 + d488733 commit efed926

File tree

9 files changed

+341
-78
lines changed

9 files changed

+341
-78
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ exclude = ["ci"]
1919

2020
[dependencies]
2121
rayon-core = { version = "1.4", path = "rayon-core" }
22+
crossbeam-deque = "0.2.0"
2223

2324
# This is a public dependency!
2425
[dependencies.either]

rayon-demo/src/life/bench.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,8 @@ fn generations(b: &mut ::test::Bencher) {
99
fn parallel_generations(b: &mut ::test::Bencher) {
1010
b.iter(|| super::parallel_generations(Board::new(200, 200).random(), 100));
1111
}
12+
13+
#[bench]
14+
fn as_parallel_generations(b: &mut ::test::Bencher) {
15+
b.iter(|| super::par_bridge_generations(Board::new(200, 200).random(), 100));
16+
}

rayon-demo/src/life/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use time;
2020

2121
use docopt::Docopt;
2222
use rayon::prelude::*;
23+
use rayon::iter::ParallelBridge;
2324

2425
#[cfg(test)]
2526
mod bench;
@@ -93,6 +94,15 @@ impl Board {
9394
self.next_board(new_brd)
9495
}
9596

97+
pub fn par_bridge_next_generation(&self) -> Board {
98+
let new_brd = (0..self.len())
99+
.par_bridge()
100+
.map(|cell| self.successor_cell(cell))
101+
.collect();
102+
103+
self.next_board(new_brd)
104+
}
105+
96106
fn cell_live(&self, x: usize, y: usize) -> bool {
97107
!(x >= self.cols || y >= self.rows) && self.board[y * self.cols + x]
98108
}
@@ -145,6 +155,11 @@ fn parallel_generations(board: Board, gens: usize) {
145155
for _ in 0..gens { brd = brd.parallel_next_generation(); }
146156
}
147157

158+
fn par_bridge_generations(board: Board, gens: usize) {
159+
let mut brd = board;
160+
for _ in 0..gens { brd = brd.par_bridge_next_generation(); }
161+
}
162+
148163
fn measure(f: fn(Board, usize) -> (), args: &Args) -> u64 {
149164
let (n, gens) = (args.flag_size, args.flag_gens);
150165
let brd = Board::new(n, n).random();
@@ -168,5 +183,9 @@ pub fn main(args: &[String]) {
168183
let parallel = measure(parallel_generations, &args);
169184
println!("parallel: {:10} ns -> {:.2}x speedup", parallel,
170185
serial as f64 / parallel as f64);
186+
187+
let par_bridge = measure(par_bridge_generations, &args);
188+
println!("par_bridge: {:10} ns -> {:.2}x speedup", par_bridge,
189+
serial as f64 / par_bridge as f64);
171190
}
172191
}

rayon-demo/src/nbody/bench.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ fn nbody_par(b: &mut ::test::Bencher) {
3030
nbody_bench(b, |n| { n.tick_par(); });
3131
}
3232

33+
#[bench]
34+
fn nbody_par_bridge(b: &mut ::test::Bencher) {
35+
nbody_bench(b, |n| { n.tick_par_bridge(); });
36+
}
37+
3338
#[bench]
3439
fn nbody_parreduce(b: &mut ::test::Bencher) {
3540
nbody_bench(b, |n| { n.tick_par_reduce(); });

rayon-demo/src/nbody/nbody.rs

Lines changed: 117 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
// [1]: https://github.com/IntelLabs/RiverTrail/blob/master/examples/nbody-webgl/NBody.js
3131

3232
use cgmath::{InnerSpace, Point3, Vector3, Zero};
33-
use rayon::prelude::*;
3433
use rand::{Rand, Rng};
34+
use rayon::prelude::*;
35+
#[cfg(test)]
36+
use rayon::iter::ParallelBridge;
3537
use std::f64::consts::PI;
3638

3739
const INITIAL_VELOCITY: f64 = 8.0; // set to 0.0 to turn off.
@@ -50,8 +52,7 @@ pub struct Body {
5052

5153
impl NBodyBenchmark {
5254
pub fn new<R: Rng>(num_bodies: usize, rng: &mut R) -> NBodyBenchmark {
53-
let bodies0: Vec<_> =
54-
(0..num_bodies)
55+
let bodies0: Vec<_> = (0..num_bodies)
5556
.map(|_| {
5657
let position = Point3 {
5758
x: f64::rand(rng).floor() * 40_000.0,
@@ -71,7 +72,11 @@ impl NBodyBenchmark {
7172
z: f64::rand(rng) * INITIAL_VELOCITY,
7273
};
7374

74-
Body { position: position, velocity: velocity, velocity2: velocity2 }
75+
Body {
76+
position: position,
77+
velocity: velocity,
78+
velocity2: velocity2,
79+
}
7580
})
7681
.collect();
7782

@@ -91,16 +96,44 @@ impl NBodyBenchmark {
9196
};
9297

9398
let time = self.time;
94-
out_bodies.par_iter_mut()
95-
.zip(&in_bodies[..])
96-
.for_each(|(out, prev)| {
97-
let (vel, vel2) = next_velocity(time, prev, in_bodies);
98-
out.velocity = vel;
99-
out.velocity2 = vel2;
99+
out_bodies
100+
.par_iter_mut()
101+
.zip(&in_bodies[..])
102+
.for_each(|(out, prev)| {
103+
let (vel, vel2) = next_velocity(time, prev, in_bodies);
104+
out.velocity = vel;
105+
out.velocity2 = vel2;
106+
107+
let next_velocity = vel - vel2;
108+
out.position = prev.position + next_velocity;
109+
});
100110

101-
let next_velocity = vel - vel2;
102-
out.position = prev.position + next_velocity;
103-
});
111+
self.time += 1;
112+
113+
out_bodies
114+
}
115+
116+
#[cfg(test)]
117+
pub fn tick_par_bridge(&mut self) -> &[Body] {
118+
let (in_bodies, out_bodies) = if (self.time & 1) == 0 {
119+
(&self.bodies.0, &mut self.bodies.1)
120+
} else {
121+
(&self.bodies.1, &mut self.bodies.0)
122+
};
123+
124+
let time = self.time;
125+
out_bodies
126+
.iter_mut()
127+
.zip(&in_bodies[..])
128+
.par_bridge()
129+
.for_each(|(out, prev)| {
130+
let (vel, vel2) = next_velocity(time, prev, in_bodies);
131+
out.velocity = vel;
132+
out.velocity2 = vel2;
133+
134+
let next_velocity = vel - vel2;
135+
out.position = prev.position + next_velocity;
136+
});
104137

105138
self.time += 1;
106139

@@ -115,16 +148,17 @@ impl NBodyBenchmark {
115148
};
116149

117150
let time = self.time;
118-
out_bodies.par_iter_mut()
119-
.zip(&in_bodies[..])
120-
.for_each(|(out, prev)| {
121-
let (vel, vel2) = next_velocity_par(time, prev, in_bodies);
122-
out.velocity = vel;
123-
out.velocity2 = vel2;
124-
125-
let next_velocity = vel - vel2;
126-
out.position = prev.position + next_velocity;
127-
});
151+
out_bodies
152+
.par_iter_mut()
153+
.zip(&in_bodies[..])
154+
.for_each(|(out, prev)| {
155+
let (vel, vel2) = next_velocity_par(time, prev, in_bodies);
156+
out.velocity = vel;
157+
out.velocity2 = vel2;
158+
159+
let next_velocity = vel - vel2;
160+
out.position = prev.position + next_velocity;
161+
});
128162

129163
self.time += 1;
130164

@@ -207,57 +241,57 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3<f64>, Ve
207241
let zero: Vector3<f64> = Vector3::zero();
208242
let (diff, diff2) = bodies
209243
.iter()
210-
.fold(
211-
(zero, zero),
212-
|(mut diff, mut diff2), body| {
213-
let r = body.position - prev.position;
214-
215-
// make sure we are not testing the particle against its own position
216-
let are_same = r == Vector3::zero();
217-
218-
let dist_sqrd = r.magnitude2();
219-
220-
if dist_sqrd < zone_sqrd && !are_same {
221-
let length = dist_sqrd.sqrt();
222-
let percent = dist_sqrd / zone_sqrd;
223-
224-
if dist_sqrd < repel {
225-
let f = (repel / percent - 1.0) * 0.025;
226-
let normal = (r / length) * f;
227-
diff += normal;
228-
diff2 += normal;
229-
} else if dist_sqrd < align {
230-
let thresh_delta = align - repel;
231-
let adjusted_percent = (percent - repel) / thresh_delta;
232-
let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9;
233-
234-
// normalize vel2 and multiply by factor
235-
let vel2_length = body.velocity2.magnitude();
236-
let vel2 = (body.velocity2 / vel2_length) * q;
237-
238-
// normalize own velocity
239-
let vel_length = prev.velocity.magnitude();
240-
let vel = (prev.velocity / vel_length) * q;
244+
.fold((zero, zero), |(mut diff, mut diff2), body| {
245+
let r = body.position - prev.position;
246+
247+
// make sure we are not testing the particle against its own position
248+
let are_same = r == Vector3::zero();
249+
250+
let dist_sqrd = r.magnitude2();
251+
252+
if dist_sqrd < zone_sqrd && !are_same {
253+
let length = dist_sqrd.sqrt();
254+
let percent = dist_sqrd / zone_sqrd;
255+
256+
if dist_sqrd < repel {
257+
let f = (repel / percent - 1.0) * 0.025;
258+
let normal = (r / length) * f;
259+
diff += normal;
260+
diff2 += normal;
261+
} else if dist_sqrd < align {
262+
let thresh_delta = align - repel;
263+
let adjusted_percent = (percent - repel) / thresh_delta;
264+
let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9;
265+
266+
// normalize vel2 and multiply by factor
267+
let vel2_length = body.velocity2.magnitude();
268+
let vel2 = (body.velocity2 / vel2_length) * q;
269+
270+
// normalize own velocity
271+
let vel_length = prev.velocity.magnitude();
272+
let vel = (prev.velocity / vel_length) * q;
273+
274+
diff += vel2;
275+
diff2 += vel;
276+
}
241277

242-
diff += vel2;
243-
diff2 += vel;
244-
}
278+
if dist_sqrd > attract {
279+
// attract
280+
let thresh_delta2 = 1.0 - attract;
281+
let adjusted_percent2 = (percent - attract) / thresh_delta2;
282+
let c =
283+
(1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power;
245284

246-
if dist_sqrd > attract { // attract
247-
let thresh_delta2 = 1.0 - attract;
248-
let adjusted_percent2 = (percent - attract) / thresh_delta2;
249-
let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power;
285+
// normalize the distance vector
286+
let d = (r / length) * c;
250287

251-
// normalize the distance vector
252-
let d = (r / length) * c;
253-
254-
diff += d;
255-
diff2 -= d;
256-
}
288+
diff += d;
289+
diff2 -= d;
257290
}
291+
}
258292

259-
(diff, diff2)
260-
});
293+
(diff, diff2)
294+
});
261295

262296
acc += diff;
263297
acc2 += diff2;
@@ -294,8 +328,7 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3<f64>, Ve
294328
}
295329

296330
/// Compute next velocity of `prev`
297-
fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])
298-
-> (Vector3<f64>, Vector3<f64>) {
331+
fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3<f64>, Vector3<f64>) {
299332
let time = time as f64;
300333
let center = Point3 {
301334
x: (time / 22.0).cos() * -4200.0,
@@ -344,8 +377,9 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])
344377

345378
let (diff, diff2) = bodies
346379
.par_iter()
347-
.fold(|| (Vector3::zero(), Vector3::zero()),
348-
|(mut diff, mut diff2), body| {
380+
.fold(
381+
|| (Vector3::zero(), Vector3::zero()),
382+
|(mut diff, mut diff2), body| {
349383
let r = body.position - prev.position;
350384

351385
// make sure we are not testing the particle against its own position
@@ -379,10 +413,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])
379413
diff2 += vel;
380414
}
381415

382-
if dist_sqrd > attract { // attract
416+
if dist_sqrd > attract {
417+
// attract
383418
let thresh_delta2 = 1.0 - attract;
384419
let adjusted_percent2 = (percent - attract) / thresh_delta2;
385-
let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power;
420+
let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5))
421+
* attract_power;
386422

387423
// normalize the distance vector
388424
let d = (r / length) * c;
@@ -393,9 +429,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])
393429
}
394430

395431
(diff, diff2)
396-
})
397-
.reduce(|| (Vector3::zero(), Vector3::zero()),
398-
|(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b));
432+
},
433+
)
434+
.reduce(
435+
|| (Vector3::zero(), Vector3::zero()),
436+
|(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b),
437+
);
399438

400439
acc += diff;
401440
acc2 += diff2;

src/iter/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ use self::plumbing::*;
8484
// e.g. `find::find()`, are always used **prefixed**, so that they
8585
// can be readily distinguished.
8686

87+
mod par_bridge;
88+
pub use self::par_bridge::{ParallelBridge, IterBridge};
89+
8790
mod find;
8891
mod find_first_last;
8992
mod chain;

0 commit comments

Comments
 (0)