Skip to content

Commit 78feb98

Browse files
bors[bot]Charles Samuels
andauthored
Merge #996
996: par_bridge: use naive locking of the Iterator r=cuviper a=njaard Fix issue #795 instead of using crossbeam_deque, just lock the Iterator and get the next item, naively. The original code would spin until more data became available, which would cause par_bridge to not be useful in cases where the source Iterator did slow IO or was computationally costly in itself. This causes there to be virtually no change in runtime in many cases, but in cases where there are a lot CPUs and even slightly slower IO, significantly less runtime. In pretty much every case, total CPU time was much lower. I ran the below test program on a 1.5GiB file from a fast SSD and on a machine with 64 cores and mean runtime went from 44s to 30s. In fact, according to hyperfine, average runtime with this patch was lower than _minimum_ runtime without (25s and 41s, respectively). But more importantly, total CPU time was almost 10x less with this patch. I'm not sure what realistic usage patterns could be worse with this patch and therefor I'd love to hear other opinions. I wrote a basic test program: ``` use std::io::{BufReader,BufRead}; use rayon::prelude::*; use std::sync::atomic::{Ordering,AtomicUsize}; use std::os::unix::io::FromRawFd; fn main() { let counter = AtomicUsize::new(0); let stdin = BufReader::new(unsafe { std::fs::File::from_raw_fd(0) }); stdin.lines().par_bridge() .for_each( |row| { let row = row.unwrap(); for _ in 0 .. 1000 { let c = row.len(); counter.fetch_add(c, Ordering::Relaxed); } } ); println!("{}", counter.load(Ordering::Relaxed)); } ``` Which directly wastes some CPU time (`for _ in 0 .. 1000`). The unsafe is necessary to make the input stream Send. I feed this program a multigigabyte file as part of a test. Once over a relatively slow network and once off an SSD. Co-authored-by: Charles Samuels <[email protected]>
2 parents 8e48eae + d31d3e3 commit 78feb98

File tree

1 file changed

+26
-87
lines changed

1 file changed

+26
-87
lines changed

src/iter/par_bridge.rs

Lines changed: 26 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
use crossbeam_deque::{Steal, Stealer, Worker};
2-
3-
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4-
use std::sync::{Mutex, TryLockError};
5-
use std::thread::yield_now;
1+
use std::sync::atomic::{AtomicUsize, Ordering};
2+
use std::sync::Mutex;
63

74
use crate::current_num_threads;
85
use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
@@ -79,17 +76,13 @@ where
7976
C: UnindexedConsumer<Self::Item>,
8077
{
8178
let split_count = AtomicUsize::new(current_num_threads());
82-
let worker = Worker::new_fifo();
83-
let stealer = worker.stealer();
84-
let done = AtomicBool::new(false);
85-
let iter = Mutex::new((self.iter, worker));
79+
80+
let iter = Mutex::new(self.iter.fuse());
8681

8782
bridge_unindexed(
8883
IterParallelProducer {
8984
split_count: &split_count,
90-
done: &done,
9185
iter: &iter,
92-
items: stealer,
9386
},
9487
consumer,
9588
)
@@ -98,19 +91,15 @@ where
9891

9992
struct IterParallelProducer<'a, Iter: Iterator> {
10093
split_count: &'a AtomicUsize,
101-
done: &'a AtomicBool,
102-
iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
103-
items: Stealer<Iter::Item>,
94+
iter: &'a Mutex<std::iter::Fuse<Iter>>,
10495
}
10596

10697
// manual clone because T doesn't need to be Clone, but the derive assumes it should be
10798
impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
10899
fn clone(&self) -> Self {
109100
IterParallelProducer {
110101
split_count: self.split_count,
111-
done: self.done,
112102
iter: self.iter,
113-
items: self.items.clone(),
114103
}
115104
}
116105
}
@@ -125,24 +114,19 @@ where
125114
let mut count = self.split_count.load(Ordering::SeqCst);
126115

127116
loop {
128-
// Check if the iterator is exhausted *and* we've consumed every item from it.
129-
let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
130-
131-
match count.checked_sub(1) {
132-
Some(new_count) if !done => {
133-
match self.split_count.compare_exchange_weak(
134-
count,
135-
new_count,
136-
Ordering::SeqCst,
137-
Ordering::SeqCst,
138-
) {
139-
Ok(_) => return (self.clone(), Some(self)),
140-
Err(last_count) => count = last_count,
141-
}
142-
}
143-
_ => {
144-
return (self, None);
117+
// Check if the iterator is exhausted
118+
if let Some(new_count) = count.checked_sub(1) {
119+
match self.split_count.compare_exchange_weak(
120+
count,
121+
new_count,
122+
Ordering::SeqCst,
123+
Ordering::SeqCst,
124+
) {
125+
Ok(_) => return (self.clone(), Some(self)),
126+
Err(last_count) => count = last_count,
145127
}
128+
} else {
129+
return (self, None);
146130
}
147131
}
148132
}
@@ -152,65 +136,20 @@ where
152136
F: Folder<Self::Item>,
153137
{
154138
loop {
155-
match self.items.steal() {
156-
Steal::Success(it) => {
139+
if let Ok(mut iter) = self.iter.lock() {
140+
if let Some(it) = iter.next() {
141+
drop(iter);
157142
folder = folder.consume(it);
158143
if folder.full() {
159144
return folder;
160145
}
146+
} else {
147+
return folder;
161148
}
162-
Steal::Empty => {
163-
// Don't storm the mutex if we're already done.
164-
if self.done.load(Ordering::SeqCst) {
165-
// Someone might have pushed more between our `steal()` and `done.load()`
166-
if self.items.is_empty() {
167-
// The iterator is out of items, no use in continuing
168-
return folder;
169-
}
170-
} else {
171-
// our cache is out of items, time to load more from the iterator
172-
match self.iter.try_lock() {
173-
Ok(mut guard) => {
174-
// Check `done` again in case we raced with the previous lock
175-
// holder on its way out.
176-
if self.done.load(Ordering::SeqCst) {
177-
if self.items.is_empty() {
178-
return folder;
179-
}
180-
continue;
181-
}
182-
183-
let count = current_num_threads();
184-
let count = (count * count) * 2;
185-
186-
let (ref mut iter, ref worker) = *guard;
187-
188-
// while worker.len() < count {
189-
// FIXME the new deque doesn't let us count items. We can just
190-
// push a number of items, but that doesn't consider active
191-
// stealers elsewhere.
192-
for _ in 0..count {
193-
if let Some(it) = iter.next() {
194-
worker.push(it);
195-
} else {
196-
self.done.store(true, Ordering::SeqCst);
197-
break;
198-
}
199-
}
200-
}
201-
Err(TryLockError::WouldBlock) => {
202-
// someone else has the mutex, just sit tight until it's ready
203-
yield_now(); //TODO: use a thread-pool-aware yield? (#548)
204-
}
205-
Err(TryLockError::Poisoned(_)) => {
206-
// any panics from other threads will have been caught by the pool,
207-
// and will be re-thrown when joined - just exit
208-
return folder;
209-
}
210-
}
211-
}
212-
}
213-
Steal::Retry => (),
149+
} else {
150+
// any panics from other threads will have been caught by the pool,
151+
// and will be re-thrown when joined - just exit
152+
return folder;
214153
}
215154
}
216155
}

0 commit comments

Comments
 (0)