Skip to content

Commit d31d3e3

Browse files
author
Charles Samuels
committed
remove the done flag, its function is now done by the fuse
1 parent 91cb49d commit d31d3e3

File tree

1 file changed

+12
-22
lines changed

1 file changed

+12
-22
lines changed

src/iter/par_bridge.rs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1+
use std::sync::atomic::{AtomicUsize, Ordering};
22
use std::sync::Mutex;
33

44
use crate::current_num_threads;
@@ -77,13 +77,11 @@ where
7777
{
7878
let split_count = AtomicUsize::new(current_num_threads());
7979

80-
let done = AtomicBool::new(false);
8180
let iter = Mutex::new(self.iter.fuse());
8281

8382
bridge_unindexed(
8483
IterParallelProducer {
8584
split_count: &split_count,
86-
done: &done,
8785
iter: &iter,
8886
},
8987
consumer,
@@ -93,7 +91,6 @@ where
9391

9492
struct IterParallelProducer<'a, Iter: Iterator> {
9593
split_count: &'a AtomicUsize,
96-
done: &'a AtomicBool,
9794
iter: &'a Mutex<std::iter::Fuse<Iter>>,
9895
}
9996

@@ -102,7 +99,6 @@ impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
10299
fn clone(&self) -> Self {
103100
IterParallelProducer {
104101
split_count: self.split_count,
105-
done: self.done,
106102
iter: self.iter,
107103
}
108104
}
@@ -119,22 +115,18 @@ where
119115

120116
loop {
121117
// Check if the iterator is exhausted
122-
let done = self.done.load(Ordering::SeqCst);
123-
match count.checked_sub(1) {
124-
Some(new_count) if !done => {
125-
match self.split_count.compare_exchange_weak(
126-
count,
127-
new_count,
128-
Ordering::SeqCst,
129-
Ordering::SeqCst,
130-
) {
131-
Ok(_) => return (self.clone(), Some(self)),
132-
Err(last_count) => count = last_count,
133-
}
134-
}
135-
_ => {
136-
return (self, None);
118+
if let Some(new_count) = count.checked_sub(1) {
119+
match self.split_count.compare_exchange_weak(
120+
count,
121+
new_count,
122+
Ordering::SeqCst,
123+
Ordering::SeqCst,
124+
) {
125+
Ok(_) => return (self.clone(), Some(self)),
126+
Err(last_count) => count = last_count,
137127
}
128+
} else {
129+
return (self, None);
138130
}
139131
}
140132
}
@@ -152,13 +144,11 @@ where
152144
return folder;
153145
}
154146
} else {
155-
self.done.store(true, Ordering::SeqCst);
156147
return folder;
157148
}
158149
} else {
159150
// any panics from other threads will have been caught by the pool,
160151
// and will be re-thrown when joined - just exit
161-
self.done.store(true, Ordering::SeqCst);
162152
return folder;
163153
}
164154
}

0 commit comments

Comments
 (0)