Skip to content

Commit dde3dd4

Browse files
aeitwoenoritwoen
andauthored
fix(source): avoid materializing entire range into memory (#65)
Co-authored-by: oritwoen <18102267+oritwoen@users.noreply.github.com>
1 parent 95e3f40 commit dde3dd4

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

src/source/range.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use crate::matcher::Matcher;
1010
use crate::output::Output;
1111
use crate::transform::{Input, Transform};
1212

13+
const BATCH_SIZE: u64 = 1000;
14+
1315
/// Generate keys from a numeric range
1416
pub struct RangeSource {
1517
pub start: u64,
@@ -34,14 +36,18 @@ impl Source for RangeSource {
3436
let pb = ProgressBar::new(count);
3537
pb.set_style(crate::default_progress_style());
3638

37-
let range: Vec<u64> = (self.start..=self.end).collect();
38-
39-
4039
let stats = std::sync::atomic::AtomicU64::new(0);
4140
let matches = std::sync::atomic::AtomicU64::new(0);
4241

43-
range.par_chunks(1000).for_each(|chunk| {
44-
let inputs: Vec<Input> = chunk.iter().map(|&v| Input::from_u64(v)).collect();
42+
let num_batches = count / BATCH_SIZE + u64::from(count % BATCH_SIZE != 0);
43+
44+
(0..num_batches).into_par_iter().for_each(|batch_idx| {
45+
let batch_start = self.start + batch_idx * BATCH_SIZE;
46+
let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(self.end);
47+
48+
let inputs: Vec<Input> = (batch_start..=batch_end)
49+
.map(Input::from_u64)
50+
.collect();
4551
let mut buffer = Vec::with_capacity(inputs.len() * 3);
4652

4753
for transform in transforms {
@@ -59,15 +65,14 @@ impl Source for RangeSource {
5965
matches.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6066
}
6167
} else {
62-
// No matcher - output all keys
6368
output.key(source, transform.name(), &derived).ok();
6469
}
6570

6671
stats.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6772
}
6873
}
6974

70-
pb.inc(chunk.len() as u64);
75+
pb.inc((batch_end - batch_start + 1) as u64);
7176
});
7277

7378
pb.finish_and_clear();

src/source/timestamps.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,10 @@ impl Source for TimestampSource {
5858
let pb = ProgressBar::new(total);
5959
pb.set_style(crate::default_progress_style());
6060

61-
let timestamps: Vec<u64> = (self.start..=self.end).collect();
62-
6361
let stats = std::sync::atomic::AtomicU64::new(0);
6462
let matches = std::sync::atomic::AtomicU64::new(0);
6563

66-
timestamps.par_iter().for_each(|&ts| {
64+
(self.start..=self.end).into_par_iter().for_each(|ts| {
6765
// Process base timestamp
6866
process_timestamp(ts, transforms, &deriver, matcher, output, &stats, &matches);
6967

0 commit comments

Comments
 (0)