Skip to content

Commit 51a4144

Browse files
committed
chore(iori): add test for concat merge
1 parent c9d57d9 commit 51a4144

File tree

3 files changed

+140
-19
lines changed

3 files changed

+140
-19
lines changed

crates/iori/src/merge/auto.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ where
126126
let mut namer = DuplicateOutputFileNamer::new(self.output_file.clone());
127127
let mut final_outputs = Vec::new();
128128

129-
for (i, part_index) in all_part_indexes.into_iter().enumerate() {
129+
for part_index in all_part_indexes {
130130
let mut tracks = Vec::new();
131131

132132
for stream_id in &streams {
@@ -170,11 +170,7 @@ where
170170
tokio::fs::create_dir_all(parent).await?;
171171
}
172172

173-
let part_output_path = if i == 0 {
174-
self.output_file.clone()
175-
} else {
176-
namer.next_path()
177-
};
173+
let part_output_path = namer.next_path();
178174

179175
let output_path = if tracks.len() == 1 {
180176
let track_format = tracks[0].extension().and_then(|e| e.to_str());

crates/iori/src/merge/concat.rs

Lines changed: 137 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ async fn concat_merge(
9797
});
9898

9999
let mut namer = DuplicateOutputFileNamer::new(output_path.clone());
100-
let mut current_part_index: Option<u64> = None;
101-
let mut output: Option<File> = None;
102100

103101
// We don't use trim_end here because we want to handle parts individually.
104102
// However, we should still skip trailing failed segments in each part.
@@ -115,22 +113,18 @@ async fn concat_merge(
115113
let trimmed_part_segments = trim_end(part_segments, |s| !s.success);
116114

117115
if !trimmed_part_segments.is_empty() {
118-
let path = if current_part_index.is_none() {
119-
output_path.clone()
120-
} else {
121-
namer.next_path()
122-
};
116+
let path = namer.next_path();
123117

124118
let mut out = File::create(path).await?;
125119
for segment in trimmed_part_segments {
126120
if !segment.success {
127121
out = File::create(namer.next_path()).await?;
122+
continue;
128123
}
129124

130125
let mut reader = cache.open_reader(&segment.segment).await?;
131126
tokio::io::copy(&mut reader, &mut out).await?;
132127
}
133-
current_part_index = Some(part_index);
134128
}
135129

136130
part_start = part_end;
@@ -141,6 +135,10 @@ async fn concat_merge(
141135

142136
#[cfg(test)]
143137
mod tests {
138+
use super::*;
139+
use crate::cache::memory::MemoryCacheSource;
140+
use tokio::io::AsyncWriteExt;
141+
144142
#[test]
145143
fn test_trim_end() {
146144
let input = [1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0];
@@ -155,4 +153,135 @@ mod tests {
155153
let output = super::trim_end(&input, |&x| x == 0);
156154
assert_eq!(output, [1, 2, 3, 0, 0, 3]);
157155
}
156+
157+
async fn create_segment(
158+
cache: &MemoryCacheSource,
159+
sequence: u64,
160+
part_index: u64,
161+
data: &[u8],
162+
) -> ConcatSegment {
163+
let segment = SegmentInfo {
164+
sequence,
165+
part_index,
166+
..Default::default()
167+
};
168+
let mut writer = cache.open_writer(&segment).await.unwrap().unwrap();
169+
writer.write_all(data).await.unwrap();
170+
writer.shutdown().await.unwrap();
171+
drop(writer);
172+
ConcatSegment {
173+
segment,
174+
success: true,
175+
}
176+
}
177+
178+
#[tokio::test]
179+
async fn test_concat_merge_basic() {
180+
let cache = MemoryCacheSource::new();
181+
let temp_dir = tempfile::tempdir().unwrap();
182+
let output_path = temp_dir.path().join("output.ts");
183+
184+
let mut segments = vec![
185+
create_segment(&cache, 0, 0, b"part0_seq0").await,
186+
create_segment(&cache, 1, 0, b"part0_seq1").await,
187+
];
188+
189+
concat_merge(&mut segments, &cache, output_path.clone())
190+
.await
191+
.unwrap();
192+
193+
// Give some time for the namer Drop to run if needed,
194+
// but here it's sync and should have run.
195+
let content = tokio::fs::read(&output_path).await.unwrap();
196+
assert_eq!(content, b"part0_seq0part0_seq1");
197+
}
198+
199+
#[tokio::test]
200+
async fn test_concat_merge_discontinuity() {
201+
let cache = MemoryCacheSource::new();
202+
let temp_dir = tempfile::tempdir().unwrap();
203+
let output_path = temp_dir.path().join("output.ts");
204+
205+
let mut segments = vec![
206+
create_segment(&cache, 0, 0, b"part0_seq0").await,
207+
create_segment(&cache, 1, 0, b"part0_seq1").await,
208+
create_segment(&cache, 2, 1, b"part1_seq2").await,
209+
create_segment(&cache, 3, 1, b"part1_seq3").await,
210+
];
211+
212+
concat_merge(&mut segments, &cache, output_path.clone())
213+
.await
214+
.unwrap();
215+
216+
// Check first part
217+
let output_path1 = temp_dir.path().join("output.1.ts");
218+
let content1 = tokio::fs::read(&output_path1).await.unwrap();
219+
assert_eq!(content1, b"part0_seq0part0_seq1");
220+
221+
// Check second part
222+
let output_path2 = temp_dir.path().join("output.2.ts");
223+
let content2 = tokio::fs::read(&output_path2).await.unwrap();
224+
assert_eq!(content2, b"part1_seq2part1_seq3");
225+
}
226+
227+
#[tokio::test]
228+
async fn test_concat_merge_failure() {
229+
let cache = MemoryCacheSource::new();
230+
let temp_dir = tempfile::tempdir().unwrap();
231+
let output_path = temp_dir.path().join("output.ts");
232+
233+
let mut segments = vec![
234+
create_segment(&cache, 0, 0, b"part0_seq0").await,
235+
ConcatSegment {
236+
segment: SegmentInfo {
237+
sequence: 1,
238+
part_index: 0,
239+
..Default::default()
240+
},
241+
success: false,
242+
},
243+
create_segment(&cache, 2, 0, b"part0_seq2").await,
244+
];
245+
246+
concat_merge(&mut segments, &cache, output_path.clone())
247+
.await
248+
.unwrap();
249+
250+
// First part before failure
251+
let output_path1 = temp_dir.path().join("output.1.ts");
252+
let content1 = tokio::fs::read(&output_path1).await.unwrap();
253+
assert_eq!(content1, b"part0_seq0");
254+
255+
// Second part after failure
256+
let output_path2 = temp_dir.path().join("output.2.ts");
257+
let content2 = tokio::fs::read(&output_path2).await.unwrap();
258+
assert_eq!(content2, b"part0_seq2");
259+
}
260+
261+
#[tokio::test]
262+
async fn test_concat_merge_sorting() {
263+
let cache = MemoryCacheSource::new();
264+
let temp_dir = tempfile::tempdir().unwrap();
265+
let output_path = temp_dir.path().join("output.ts");
266+
267+
// Out of order segments
268+
let mut segments = vec![
269+
create_segment(&cache, 1, 0, b"part0_seq1").await,
270+
create_segment(&cache, 0, 0, b"part0_seq0").await,
271+
create_segment(&cache, 3, 1, b"part1_seq3").await,
272+
create_segment(&cache, 2, 1, b"part1_seq2").await,
273+
];
274+
275+
concat_merge(&mut segments, &cache, output_path.clone())
276+
.await
277+
.unwrap();
278+
279+
let output_path1 = temp_dir.path().join("output.1.ts");
280+
let content1 = tokio::fs::read(&output_path1).await.unwrap();
281+
assert_eq!(content1, b"part0_seq0part0_seq1");
282+
283+
let output_path2 = temp_dir.path().join("output.2.ts");
284+
let content2 = tokio::fs::read(&output_path2).await.unwrap();
285+
assert_eq!(content2, b"part1_seq2part1_seq3");
286+
}
158287
}

crates/iori/src/merge/pipe.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,7 @@ impl PipeMerger {
6666
let mut stream: OrderedStream<(u64, Option<SendSegment>)> = OrderedStream::new(rx);
6767
let future = tokio::spawn(async move {
6868
let mut namer = DuplicateOutputFileNamer::new(target_path.clone());
69-
let mut target = Some(
70-
tokio::fs::File::create(&target_path)
71-
.await
72-
.expect("Failed to create file"),
73-
);
69+
let mut target: Option<tokio::fs::File> = None;
7470
let mut current_part_index: Option<u64> = None;
7571

7672
while let Some((_, (part_index, segment))) = stream.next().await {

0 commit comments

Comments
 (0)