Skip to content

Commit 5e227e8

Browse files
authored
fix(source): propagate output errors instead of silently discarding them (#78)
1 parent f83ad78 commit 5e227e8

File tree

6 files changed

+215
-34
lines changed

6 files changed

+215
-34
lines changed

src/source/files.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::fs::{self, File};
77
use std::io::Read;
88
use std::path::{Path, PathBuf};
99

10-
use super::{ProcessStats, Source};
10+
use super::{OutputGuard, ProcessStats, Source};
1111
use crate::derive::KeyDeriver;
1212
use crate::matcher::Matcher;
1313
use crate::output::Output;
@@ -103,8 +103,13 @@ impl Source for FilesSource {
103103
let processed = std::sync::atomic::AtomicU64::new(0);
104104
let stats = std::sync::atomic::AtomicU64::new(0);
105105
let matches = std::sync::atomic::AtomicU64::new(0);
106+
let guard = OutputGuard::new();
106107

107108
self.files.par_iter().for_each(|path| {
109+
if guard.is_poisoned() {
110+
return;
111+
}
112+
108113
let contents = match read_file_contents(path) {
109114
Ok(c) => c,
110115
Err(e) => {
@@ -121,21 +126,32 @@ impl Source for FilesSource {
121126
let mut buffer = Vec::with_capacity(transforms.len() * 2);
122127

123128
for transform in transforms {
129+
if guard.is_poisoned() {
130+
break;
131+
}
132+
124133
buffer.clear();
125134
transform.apply_batch(&inputs, &mut buffer);
126135

127136
for (source, key) in &buffer {
137+
if guard.is_poisoned() {
138+
break;
139+
}
140+
128141
let derived = deriver.derive(key);
129142

130143
if let Some(m) = matcher {
131144
if let Some(match_info) = m.check(&derived) {
132-
output
133-
.hit(source, transform.name(), &derived, &match_info)
134-
.ok();
145+
guard.check(output.hit(
146+
source,
147+
transform.name(),
148+
&derived,
149+
&match_info,
150+
));
135151
matches.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
136152
}
137153
} else {
138-
output.key(source, transform.name(), &derived).ok();
154+
guard.check(output.key(source, transform.name(), &derived));
139155
}
140156

141157
stats.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@@ -146,6 +162,7 @@ impl Source for FilesSource {
146162
});
147163

148164
pb.finish_and_clear();
165+
guard.into_result()?;
149166

150167
Ok(ProcessStats {
151168
inputs_processed: processed.load(std::sync::atomic::Ordering::Relaxed),

src/source/mod.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,75 @@ pub enum SourceType {
4848
Timestamps,
4949
Stdin,
5050
}
51+
52+
/// Captures output errors inside Rayon closures where `?` can't be used.
53+
///
54+
/// Check `is_poisoned()` before each output call to skip work after failure.
55+
/// Call `into_result()` after the parallel section to propagate the first error.
56+
pub(crate) struct OutputGuard {
57+
poisoned: std::sync::atomic::AtomicBool,
58+
first_error: std::sync::Mutex<Option<String>>,
59+
}
60+
61+
impl OutputGuard {
62+
pub fn new() -> Self {
63+
Self {
64+
poisoned: std::sync::atomic::AtomicBool::new(false),
65+
first_error: std::sync::Mutex::new(None),
66+
}
67+
}
68+
69+
pub fn is_poisoned(&self) -> bool {
70+
self.poisoned.load(std::sync::atomic::Ordering::Relaxed)
71+
}
72+
73+
pub fn check(&self, result: Result<()>) {
74+
if let Err(e) = result {
75+
self.poisoned
76+
.store(true, std::sync::atomic::Ordering::Relaxed);
77+
if let Ok(mut first) = self.first_error.lock() {
78+
if first.is_none() {
79+
*first = Some(e.to_string());
80+
}
81+
}
82+
}
83+
}
84+
85+
pub fn into_result(self) -> Result<()> {
86+
if self.poisoned.load(std::sync::atomic::Ordering::Relaxed) {
87+
let msg = self
88+
.first_error
89+
.into_inner()
90+
.unwrap_or_else(|e| e.into_inner())
91+
.unwrap_or_else(|| "unknown output error".to_string());
92+
anyhow::bail!("Output failed: {}", msg)
93+
} else {
94+
Ok(())
95+
}
96+
}
97+
}
98+
99+
#[cfg(test)]
100+
mod tests {
101+
use super::*;
102+
103+
#[test]
104+
fn output_guard_ok_stays_clean() {
105+
let guard = OutputGuard::new();
106+
guard.check(Ok(()));
107+
guard.check(Ok(()));
108+
assert!(!guard.is_poisoned());
109+
assert!(guard.into_result().is_ok());
110+
}
111+
112+
#[test]
113+
fn output_guard_captures_first_error() {
114+
let guard = OutputGuard::new();
115+
guard.check(Ok(()));
116+
guard.check(Err(anyhow::anyhow!("disk full")));
117+
guard.check(Err(anyhow::anyhow!("second error")));
118+
assert!(guard.is_poisoned());
119+
let err = guard.into_result().unwrap_err();
120+
assert!(err.to_string().contains("disk full"));
121+
}
122+
}

src/source/range.rs

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::Result;
44
use indicatif::ProgressBar;
55
use rayon::prelude::*;
66

7-
use super::{ProcessStats, Source};
7+
use super::{OutputGuard, ProcessStats, Source};
88
use crate::derive::KeyDeriver;
99
use crate::matcher::Matcher;
1010
use crate::output::Output;
@@ -52,32 +52,48 @@ impl Source for RangeSource {
5252

5353
let stats = std::sync::atomic::AtomicU64::new(0);
5454
let matches = std::sync::atomic::AtomicU64::new(0);
55+
let guard = OutputGuard::new();
5556

5657
let num_batches = count / BATCH_SIZE + u64::from(count % BATCH_SIZE != 0);
5758

5859
(0..num_batches).into_par_iter().for_each(|batch_idx| {
60+
if guard.is_poisoned() {
61+
return;
62+
}
63+
5964
let batch_start = self.start + batch_idx * BATCH_SIZE;
6065
let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(self.end);
6166

6267
let inputs: Vec<Input> = (batch_start..=batch_end).map(Input::from_u64).collect();
6368
let mut buffer = Vec::with_capacity(inputs.len() * 3);
6469

6570
for transform in transforms {
71+
if guard.is_poisoned() {
72+
break;
73+
}
74+
6675
buffer.clear();
6776
transform.apply_batch(&inputs, &mut buffer);
6877

6978
for (source, key) in &buffer {
79+
if guard.is_poisoned() {
80+
break;
81+
}
82+
7083
let derived = deriver.derive(key);
7184

7285
if let Some(m) = matcher {
7386
if let Some(match_info) = m.check(&derived) {
74-
output
75-
.hit(source, transform.name(), &derived, &match_info)
76-
.ok();
87+
guard.check(output.hit(
88+
source,
89+
transform.name(),
90+
&derived,
91+
&match_info,
92+
));
7793
matches.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7894
}
7995
} else {
80-
output.key(source, transform.name(), &derived).ok();
96+
guard.check(output.key(source, transform.name(), &derived));
8197
}
8298

8399
stats.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@@ -88,6 +104,7 @@ impl Source for RangeSource {
88104
});
89105

90106
pb.finish_and_clear();
107+
guard.into_result()?;
91108

92109
Ok(ProcessStats {
93110
inputs_processed: count,
@@ -101,7 +118,44 @@ impl Source for RangeSource {
101118
mod tests {
102119
use super::*;
103120
use crate::derive::KeyDeriver;
104-
use crate::output::ConsoleOutput;
121+
use crate::matcher::MatchInfo;
122+
use crate::output::{ConsoleOutput, Output};
123+
use crate::transform::TransformType;
124+
125+
struct FailingOutput;
126+
127+
impl Output for FailingOutput {
128+
fn key(&self, _: &str, _: &str, _: &crate::derive::DerivedKey) -> anyhow::Result<()> {
129+
anyhow::bail!("broken pipe")
130+
}
131+
fn hit(
132+
&self,
133+
_: &str,
134+
_: &str,
135+
_: &crate::derive::DerivedKey,
136+
_: &MatchInfo,
137+
) -> anyhow::Result<()> {
138+
anyhow::bail!("broken pipe")
139+
}
140+
fn flush(&self) -> anyhow::Result<()> {
141+
Ok(())
142+
}
143+
}
144+
145+
#[test]
146+
fn process_propagates_output_error() {
147+
let source = RangeSource::new(1, 10);
148+
let deriver = KeyDeriver::new();
149+
let output = FailingOutput;
150+
let transforms: Vec<Box<dyn Transform>> = vec![TransformType::Sha256.create()];
151+
152+
let result = source.process(&transforms, &deriver, None, &output);
153+
assert!(result.is_err());
154+
assert!(
155+
result.unwrap_err().to_string().contains("broken pipe"),
156+
"error message should contain the original output error"
157+
);
158+
}
105159

106160
#[test]
107161
fn process_rejects_descending_range() {

src/source/stdin.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use anyhow::Result;
44
use std::io::{self, BufRead};
55

6+
67
use super::{ProcessStats, Source};
78
use crate::derive::KeyDeriver;
89
use crate::matcher::Matcher;
@@ -56,7 +57,7 @@ impl Source for StdinSource {
5657
// Process in batches
5758
if batch.len() >= 1000 {
5859
let (keys, found) =
59-
process_batch(&batch, transforms, &deriver, matcher, output, &mut buffer);
60+
process_batch(&batch, transforms, &deriver, matcher, output, &mut buffer)?;
6061
keys_generated += keys;
6162
matches_found += found;
6263
batch.clear();
@@ -66,7 +67,7 @@ impl Source for StdinSource {
6667
// Process remaining
6768
if !batch.is_empty() {
6869
let (keys, found) =
69-
process_batch(&batch, transforms, &deriver, matcher, output, &mut buffer);
70+
process_batch(&batch, transforms, &deriver, matcher, output, &mut buffer)?;
7071
keys_generated += keys;
7172
matches_found += found;
7273
}
@@ -86,7 +87,7 @@ fn process_batch(
8687
matcher: Option<&Matcher>,
8788
output: &dyn Output,
8889
buffer: &mut Vec<(String, [u8; 32])>,
89-
) -> (u64, u64) {
90+
) -> Result<(u64, u64)> {
9091
let mut keys_generated = 0u64;
9192
let mut matches_found = 0u64;
9293

@@ -99,18 +100,16 @@ fn process_batch(
99100

100101
if let Some(m) = matcher {
101102
if let Some(match_info) = m.check(&derived) {
102-
output
103-
.hit(source, transform.name(), &derived, &match_info)
104-
.ok();
103+
output.hit(source, transform.name(), &derived, &match_info)?;
105104
matches_found += 1;
106105
}
107106
} else {
108-
output.key(source, transform.name(), &derived).ok();
107+
output.key(source, transform.name(), &derived)?;
109108
}
110109

111110
keys_generated += 1;
112111
}
113112
}
114113

115-
(keys_generated, matches_found)
114+
Ok((keys_generated, matches_found))
116115
}

0 commit comments

Comments
 (0)