diff --git a/crates/iori-hls/src/m3u8_rs.rs b/crates/iori-hls/src/m3u8_rs.rs index 3828f02..3df6f81 100644 --- a/crates/iori-hls/src/m3u8_rs.rs +++ b/crates/iori-hls/src/m3u8_rs.rs @@ -76,16 +76,29 @@ impl From for Resolution { impl From for MediaPlaylist { fn from(value: m3u8_rs::MediaPlaylist) -> Self { + let mut current_part_index = value.discontinuity_sequence; + let mut segments = Vec::with_capacity(value.segments.len()); + + for segment in value.segments { + if segment.discontinuity { + current_part_index += 1; + } + + let segment: crate::models::MediaSegment = (segment, current_part_index).into(); + segments.push(segment); + } + MediaPlaylist { media_sequence: value.media_sequence, - segments: value.segments.into_iter().map(MediaSegment::from).collect(), + segments, end_list: value.end_list, + discontinuity_sequence: value.discontinuity_sequence, } } } -impl From for MediaSegment { - fn from(value: m3u8_rs::MediaSegment) -> Self { +impl From<(m3u8_rs::MediaSegment, u64)> for MediaSegment { + fn from((value, part_index): (m3u8_rs::MediaSegment, u64)) -> Self { MediaSegment { uri: value.uri, duration: (value.duration as f64).into(), @@ -93,6 +106,7 @@ impl From for MediaSegment { byte_range: value.byte_range.map(ByteRange::from), key: value.key.map(Key::from), map: value.map.map(Map::from), + part_index, } } } diff --git a/crates/iori-hls/src/models.rs b/crates/iori-hls/src/models.rs index cea096b..de7f38e 100644 --- a/crates/iori-hls/src/models.rs +++ b/crates/iori-hls/src/models.rs @@ -104,6 +104,7 @@ impl From for AlternativeMediaType { #[derive(Debug, Clone, PartialEq, Comparable)] pub struct MediaPlaylist { pub media_sequence: u64, + pub discontinuity_sequence: u64, pub segments: Vec, pub end_list: bool, } @@ -116,6 +117,7 @@ pub struct MediaSegment { pub byte_range: Option, pub key: Option, pub map: Option, + pub part_index: u64, } impl<'a> @@ -125,15 +127,17 @@ impl<'a> Option, Option, Option, + u64, )> for MediaSegment { fn from( - (inf, uri, byte_range, key, map): ( + (inf, uri, byte_range, key, map, part_index): ( hls::Inf<'a>, Cow<'a, str>, Option, Option, Option, + u64, ), ) -> Self { Self { @@ -146,6 +150,7 @@ impl<'a> byte_range, key, map, + part_index, } } } diff --git a/crates/iori-hls/src/parse.rs b/crates/iori-hls/src/parse.rs index c71308a..beafc3e 100644 --- a/crates/iori-hls/src/parse.rs +++ b/crates/iori-hls/src/parse.rs @@ -13,18 +13,28 @@ pub fn parse_playlist_res(input: &[u8]) -> Result { let mut is_master = false; - // master playlist + // let mut variants = Vec::new(); let mut alternatives = Vec::new(); - // media playlist + // + // [RFC8216 Section 4.3.3.2](https://datatracker.ietf.org/doc/html/rfc8216#section-4.3.3.2) + // > If the Media Playlist file does not contain an EXT-X-MEDIA-SEQUENCE + // > tag, then the Media Sequence Number of the first Media Segment in the + // > Media Playlist SHALL be considered to be 0. let mut media_sequence = 0; + // [RFC8216 Section 4.3.3.3](https://datatracker.ietf.org/doc/html/rfc8216#section-4.3.3.3) + // > If the Media Playlist does not contain an EXT-X-DISCONTINUITY- + // > SEQUENCE tag, then the Discontinuity Sequence Number of the first + // > Media Segment in the Playlist SHALL be considered to be 0. + let mut discontinuity_sequence = 0; let mut segments: Vec = Vec::new(); let mut end_list = false; // Maps(initial segment information) and keys(encryption information) let mut current_key: Option = None; let mut current_map: Option = None; + let mut current_part_index = 0; // Pending tags, which should be cleared after the URI line is processed let mut pending_inf: Option = None; @@ -35,6 +45,13 @@ pub fn parse_playlist_res(input: &[u8]) -> Result { match line { HlsLine::KnownTag(KnownTag::Hls(tag)) => match tag { hls::Tag::MediaSequence(seq) => media_sequence = seq.media_sequence(), + hls::Tag::DiscontinuitySequence(seq) => { + discontinuity_sequence = seq.discontinuity_sequence(); + current_part_index = discontinuity_sequence; + } + hls::Tag::Discontinuity(_) => { + current_part_index += 1; + } hls::Tag::Inf(inf) => pending_inf = Some(inf), hls::Tag::Byterange(range) => pending_byterange = Some(range.into()), hls::Tag::Key(key) => current_key = Some(key.into()), @@ -63,6 +80,7 @@ pub fn parse_playlist_res(input: &[u8]) -> Result { pending_byterange.take(), current_key.clone(), current_map.clone(), + current_part_index, ) .into(), ); @@ -84,6 +102,7 @@ pub fn parse_playlist_res(input: &[u8]) -> Result { } else { Playlist::MediaPlaylist(MediaPlaylist { media_sequence, + discontinuity_sequence, segments, end_list, }) diff --git a/crates/iori-hls/tests/cases.rs b/crates/iori-hls/tests/cases.rs index ddbd289..7396b87 100644 --- a/crates/iori-hls/tests/cases.rs +++ b/crates/iori-hls/tests/cases.rs @@ -22,17 +22,6 @@ fn test_accuracy_archive_01() { assert_changes!(old_result, new_result, Changed::Unchanged); } -#[test] -fn test_accuracy_archive_02() { - let data = include_bytes!("./fixtures/archive_02.m3u8"); - let old_result = iori_hls::m3u8_rs::parse_playlist_res(data); - let new_result = iori_hls::parse::parse_playlist_res(data); - - let old_result = old_result.expect("Old parse engine should not error"); - let new_result = new_result.expect("New parse engine should not error"); - assert_changes!(old_result, new_result, Changed::Unchanged); -} - #[test] fn test_accuracy_archive_03() { let data = include_bytes!("./fixtures/archive_03.m3u8"); diff --git a/crates/iori-hls/tests/discontinuity.rs b/crates/iori-hls/tests/discontinuity.rs new file mode 100644 index 0000000..4fdb4df --- /dev/null +++ b/crates/iori-hls/tests/discontinuity.rs @@ -0,0 +1,57 @@ +use comparable::{Changed, assert_changes}; + +#[test] +fn test_accuracy_discontinuity() { + let data = include_bytes!("./fixtures/discontinuity/playlist.m3u8"); + let old_result = iori_hls::m3u8_rs::parse_playlist_res(data); + let new_result = iori_hls::parse::parse_playlist_res(data); + + let old_result = old_result.expect("Old parse engine should not error"); + let new_result = new_result.expect("New parse engine should not error"); + assert_changes!(old_result, new_result, Changed::Unchanged); + + let media_playlist = match new_result { + iori_hls::Playlist::MediaPlaylist(media_playlist) => media_playlist, + _ => panic!("Expected media playlist"), + }; + assert_eq!(media_playlist.discontinuity_sequence, 5); + + let segments = media_playlist.segments; + assert_eq!(segments.len(), 3); + assert_eq!(segments[0].part_index, 5); + assert_eq!(segments[1].part_index, 6); + assert_eq!(segments[2].part_index, 7); +} + +#[test] +fn test_accuracy_discontinuity_02() { + let data = include_bytes!("./fixtures/discontinuity/playlist2.m3u8"); + let old_result = iori_hls::m3u8_rs::parse_playlist_res(data); + let new_result = iori_hls::parse::parse_playlist_res(data); + + let old_result = old_result.expect("Old parse engine should not error"); + let new_result = new_result.expect("New parse engine should not error"); + assert_changes!(old_result, new_result, Changed::Unchanged); +} + +#[test] +fn test_accuracy_discontinuity_no_seq() { + let data = include_bytes!("./fixtures/discontinuity/no_seq.m3u8"); + let old_result = iori_hls::m3u8_rs::parse_playlist_res(data); + let new_result = iori_hls::parse::parse_playlist_res(data); + + let old_result = old_result.expect("Old parse engine should not error"); + let new_result = new_result.expect("New parse engine should not error"); + assert_changes!(old_result, new_result, Changed::Unchanged); + + let media_playlist = match new_result { + iori_hls::Playlist::MediaPlaylist(media_playlist) => media_playlist, + _ => panic!("Expected media playlist"), + }; + assert_eq!(media_playlist.discontinuity_sequence, 0); + + let segments = media_playlist.segments; + assert_eq!(segments.len(), 2); + assert_eq!(segments[0].part_index, 0); + assert_eq!(segments[1].part_index, 1); +} diff --git a/crates/iori-hls/tests/fixtures/discontinuity/no_seq.m3u8 b/crates/iori-hls/tests/fixtures/discontinuity/no_seq.m3u8 new file mode 100644 index 0000000..2dc12e8 --- /dev/null +++ b/crates/iori-hls/tests/fixtures/discontinuity/no_seq.m3u8 @@ -0,0 +1,11 @@ +#EXTM3U +#EXT-X-TARGETDURATION:10 +#EXT-X-VERSION:3 +#EXT-X-MEDIA-SEQUENCE:0 +#EXTINF:10.0, +segment1.ts +#EXT-X-DISCONTINUITY +#EXTINF:10.0, +segment2.ts +#EXT-X-ENDLIST + diff --git a/crates/iori-hls/tests/fixtures/discontinuity/playlist.m3u8 b/crates/iori-hls/tests/fixtures/discontinuity/playlist.m3u8 new file mode 100644 index 0000000..811dc6a --- /dev/null +++ b/crates/iori-hls/tests/fixtures/discontinuity/playlist.m3u8 @@ -0,0 +1,14 @@ +#EXTM3U +#EXT-X-TARGETDURATION:10 +#EXT-X-VERSION:3 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-DISCONTINUITY-SEQUENCE:5 +#EXTINF:10.0, +segment1.ts +#EXT-X-DISCONTINUITY +#EXTINF:10.0, +segment2.ts +#EXT-X-DISCONTINUITY +#EXTINF:10.0, +segment3.ts +#EXT-X-ENDLIST diff --git a/crates/iori-hls/tests/fixtures/archive_02.m3u8 b/crates/iori-hls/tests/fixtures/discontinuity/playlist2.m3u8 similarity index 100% rename from crates/iori-hls/tests/fixtures/archive_02.m3u8 rename to crates/iori-hls/tests/fixtures/discontinuity/playlist2.m3u8 diff --git a/crates/iori/src/hls/segment.rs b/crates/iori/src/hls/segment.rs index b68bb84..d0200c9 100644 --- a/crates/iori/src/hls/segment.rs +++ b/crates/iori/src/hls/segment.rs @@ -23,6 +23,8 @@ pub struct M3u8Segment { /// Media sequence id from the m3u8 file pub media_sequence: u64, + pub part_index: u64, + pub duration: f64, pub format: SegmentFormat, } @@ -59,6 +61,10 @@ impl StreamingSegment for M3u8Segment { fn format(&self) -> SegmentFormat { self.format.clone() } + + fn part_index(&self) -> u64 { + self.part_index + } } impl RemoteStreamingSegment for M3u8Segment { diff --git a/crates/iori/src/hls/source.rs b/crates/iori/src/hls/source.rs index 7d764d1..a99417f 100644 --- a/crates/iori/src/hls/source.rs +++ b/crates/iori/src/hls/source.rs @@ -166,6 +166,7 @@ impl HlsMediaPlaylistSource { initial_segment: initial_segment.clone(), sequence: self.sequence.fetch_add(1, Ordering::Relaxed), media_sequence, + part_index: segment.part_index, byte_range: segment.byte_range.as_ref().map(|r| crate::ByteRange { offset: r.offset.unwrap_or(next_range_start), length: Some(r.length), diff --git a/crates/iori/src/lib.rs b/crates/iori/src/lib.rs index b49bd4a..0bf35db 100644 --- a/crates/iori/src/lib.rs +++ b/crates/iori/src/lib.rs @@ -58,6 +58,7 @@ pub trait StreamingSource { ) -> impl Future>>>>; } +/// A segment of a streaming source pub trait StreamingSegment { /// Stream id fn stream_id(&self) -> u64; @@ -68,6 +69,12 @@ pub trait StreamingSegment { /// Sequence ID of the segment, starts from 0 fn sequence(&self) -> u64; + /// Part index of the segment. For segments without sub-parts or discontinuities, + /// this defaults to 0. + fn part_index(&self) -> u64 { + 0 + } + /// File name of the segment fn file_name(&self) -> &str; diff --git a/crates/iori/src/merge/auto.rs b/crates/iori/src/merge/auto.rs index 92a3264..65eb019 100644 --- a/crates/iori/src/merge/auto.rs +++ b/crates/iori/src/merge/auto.rs @@ -1,10 +1,10 @@ use super::{AutoMergerConcat, AutoMergerMerge, Merger, concat::ConcatSegment}; use crate::{ SegmentFormat, SegmentInfo, StreamType, cache::CacheSource, error::IoriResult, - util::path::IoriPathExt, + util::path::IoriPathExt, utils::DuplicateOutputFileNamer, }; use std::{ - collections::HashMap, + collections::{BTreeMap, BTreeSet, HashMap}, path::{Path, PathBuf}, }; use tokio::{fs::File, io::BufWriter}; @@ -24,7 +24,8 @@ pub use mkvmerge::MkvmergeMerger; /// If there are multiple tracks to merge, it will use mkvmerge to merge them. /// If there are any missing segments, the merge will be skipped. pub struct AutoMerger { - segments: HashMap>, + // stream_id -> part_index -> segments + segments: HashMap>>, /// Whether to recycle downloaded segments after merging. recycle: bool, @@ -73,6 +74,8 @@ where self.segments .entry(segment.stream_id) .or_default() + .entry(segment.part_index) + .or_default() .push(ConcatSegment { segment, success: true, @@ -85,6 +88,8 @@ where self.segments .entry(segment.stream_id) .or_default() + .entry(segment.part_index) + .or_default() .push(ConcatSegment { segment, success: false, @@ -104,68 +109,93 @@ where return Ok(()); } - let mut tracks = Vec::new(); - for (stream_id, segments) in self.segments.iter() { - let mut segments: Vec<_> = segments.iter().map(|s| &s.segment).collect(); - - let first_segment = segments[0]; - let mut output_path = self.output_file.to_owned(); - output_path.add_suffix(format!("{stream_id:02}")); - output_path.set_extension(first_segment.format.as_ext()); + // 1. Collect all parts for each stream + let mut streams: Vec = self.segments.keys().copied().collect(); + streams.sort(); + + // 2. Determine global parts. + // We use the unique set of part_indexes across all streams. + // If a stream doesn't have a part_index, it will be missing in that muxed part. + let mut all_part_indexes: BTreeSet = BTreeSet::new(); + for parts in self.segments.values() { + for part_index in parts.keys() { + all_part_indexes.insert(*part_index); + } + } - segments.sort_by(|a, b| a.sequence.cmp(&b.sequence)); + let mut namer = DuplicateOutputFileNamer::new(self.output_file.clone()); + let mut final_outputs = Vec::new(); + + for part_index in all_part_indexes { + let mut tracks = Vec::new(); + + for stream_id in &streams { + if let Some(segments) = self.segments[stream_id].get(&part_index) { + let mut segments: Vec<_> = segments.iter().map(|s| &s.segment).collect(); + let first_segment = segments[0]; + + // Temporary track file + let mut track_output_path = self.output_file.to_owned(); + track_output_path.add_suffix(format!("_part{part_index}_stream{stream_id}")); + track_output_path.set_extension(first_segment.format.as_ext()); + + segments.sort_by(|a, b| a.sequence.cmp(&b.sequence)); + + let can_concat = segments.iter().all(|s| { + matches!( + s.format, + SegmentFormat::Mpeg2TS | SegmentFormat::Aac | SegmentFormat::Raw(_) + ) || matches!(s.stream_type, StreamType::Subtitle) + }); + + if can_concat { + concat_merge(&segments, &cache, &track_output_path).await?; + } else { + track_output_path.set_extension(self.concat_merger.format().as_ext()); + self.concat_merger + .concat(&segments, &cache, &track_output_path) + .await?; + } + + tracks.push(track_output_path); + } + } - if output_path.exists() { - let timestamp = chrono::Utc::now().timestamp(); - output_path.add_suffix(format!("{timestamp}")); + if tracks.is_empty() { + continue; } - // TODO: if the file still exists, throw error - let can_concat = segments.iter().all(|s| { - matches!( - s.format, - SegmentFormat::Mpeg2TS | SegmentFormat::Aac | SegmentFormat::Raw(_) - ) || matches!(s.stream_type, StreamType::Subtitle) - }); - if can_concat { - concat_merge(&segments, &cache, &output_path).await?; - } else { - output_path.set_extension(self.concat_merger.format().as_ext()); - self.concat_merger - .concat(&segments, &cache, &output_path) - .await?; + tracing::info!("Merging streams for part {part_index}..."); + if let Some(parent) = self.output_file.parent() { + tokio::fs::create_dir_all(parent).await?; } - tracks.push(output_path); - } + let part_output_path = namer.next_path(); - tracing::info!("Merging streams..."); - if let Some(parent) = self.output_file.parent() { - tracing::info!("Creating directory: {}", parent.display()); - tokio::fs::create_dir_all(parent).await?; - } - let output_path = if tracks.len() == 1 { - let track_format = tracks[0].extension().and_then(|e| e.to_str()); - let output = match track_format { - Some(ext) => self - .output_file - .with_replaced_extension(ext, &self.allowed_extensions), - None => self.output_file.clone(), - } - .deduplicate()?; - tokio::fs::rename(&tracks[0], &output).await?; - output - } else { - let output = self - .output_file - .with_replaced_extension( - self.merge_merger.format().as_ext(), - &self.allowed_extensions, - ) + let output_path = if tracks.len() == 1 { + let track_format = tracks[0].extension().and_then(|e| e.to_str()); + let output = match track_format { + Some(ext) => { + part_output_path.with_replaced_extension(ext, &self.allowed_extensions) + } + None => part_output_path, + } .deduplicate()?; - self.merge_merger.merge(tracks, &output).await?; - output - }; + tokio::fs::rename(&tracks[0], &output).await?; + output + } else { + let output = part_output_path + .with_replaced_extension( + self.merge_merger.format().as_ext(), + &self.allowed_extensions, + ) + .deduplicate()?; + self.merge_merger.merge(tracks, &output).await?; + output + }; + + final_outputs.push(output_path); + } if self.recycle { tracing::info!("End of merging."); @@ -173,10 +203,12 @@ where cache.clear().await?; } - tracing::info!( - "All finished. Please checkout your files at {}", - output_path.display() - ); + for output_path in final_outputs { + tracing::info!( + "Part finished. Please checkout your files at {}", + output_path.display() + ); + } Ok(()) } } diff --git a/crates/iori/src/merge/concat.rs b/crates/iori/src/merge/concat.rs index 2360582..73d65a2 100644 --- a/crates/iori/src/merge/concat.rs +++ b/crates/iori/src/merge/concat.rs @@ -89,26 +89,56 @@ async fn concat_merge( cache: &impl CacheSource, output_path: PathBuf, ) -> IoriResult<()> { - segments.sort_by(|a, b| a.segment.sequence.cmp(&b.segment.sequence)); - let segments = trim_end(segments, |s| !s.success); + segments.sort_by(|a, b| { + a.segment + .part_index + .cmp(&b.segment.part_index) + .then(a.segment.sequence.cmp(&b.segment.sequence)) + }); let mut namer = DuplicateOutputFileNamer::new(output_path.clone()); - let mut output = File::create(output_path).await?; - for segment in segments { - let success = segment.success; - let segment = &segment.segment; - if !success { - output = File::create(namer.next_path()).await?; + + // We don't use trim_end here because we want to handle parts individually. + // However, we should still skip trailing failed segments in each part. + + let mut part_start = 0; + while part_start < segments.len() { + let part_index = segments[part_start].segment.part_index; + let mut part_end = part_start + 1; + while part_end < segments.len() && segments[part_end].segment.part_index == part_index { + part_end += 1; + } + + let part_segments = &mut segments[part_start..part_end]; + let trimmed_part_segments = trim_end(part_segments, |s| !s.success); + + if !trimmed_part_segments.is_empty() { + let path = namer.next_path(); + + let mut out = File::create(path).await?; + for segment in trimmed_part_segments { + if !segment.success { + out = File::create(namer.next_path()).await?; + continue; + } + + let mut reader = cache.open_reader(&segment.segment).await?; + tokio::io::copy(&mut reader, &mut out).await?; + } } - let mut reader = cache.open_reader(segment).await?; - tokio::io::copy(&mut reader, &mut output).await?; + part_start = part_end; } + Ok(()) } #[cfg(test)] mod tests { + use super::*; + use crate::cache::memory::MemoryCacheSource; + use tokio::io::AsyncWriteExt; + #[test] fn test_trim_end() { let input = [1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0]; @@ -123,4 +153,135 @@ mod tests { let output = super::trim_end(&input, |&x| x == 0); assert_eq!(output, [1, 2, 3, 0, 0, 3]); } + + async fn create_segment( + cache: &MemoryCacheSource, + sequence: u64, + part_index: u64, + data: &[u8], + ) -> ConcatSegment { + let segment = SegmentInfo { + sequence, + part_index, + ..Default::default() + }; + let mut writer = cache.open_writer(&segment).await.unwrap().unwrap(); + writer.write_all(data).await.unwrap(); + writer.shutdown().await.unwrap(); + drop(writer); + ConcatSegment { + segment, + success: true, + } + } + + #[tokio::test] + async fn test_concat_merge_basic() { + let cache = MemoryCacheSource::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let output_path = temp_dir.path().join("output.ts"); + + let mut segments = vec![ + create_segment(&cache, 0, 0, b"part0_seq0").await, + create_segment(&cache, 1, 0, b"part0_seq1").await, + ]; + + concat_merge(&mut segments, &cache, output_path.clone()) + .await + .unwrap(); + + // Give some time for the namer Drop to run if needed, + // but here it's sync and should have run. + let content = tokio::fs::read(&output_path).await.unwrap(); + assert_eq!(content, b"part0_seq0part0_seq1"); + } + + #[tokio::test] + async fn test_concat_merge_discontinuity() { + let cache = MemoryCacheSource::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let output_path = temp_dir.path().join("output.ts"); + + let mut segments = vec![ + create_segment(&cache, 0, 0, b"part0_seq0").await, + create_segment(&cache, 1, 0, b"part0_seq1").await, + create_segment(&cache, 2, 1, b"part1_seq2").await, + create_segment(&cache, 3, 1, b"part1_seq3").await, + ]; + + concat_merge(&mut segments, &cache, output_path.clone()) + .await + .unwrap(); + + // Check first part + let output_path1 = temp_dir.path().join("output.1.ts"); + let content1 = tokio::fs::read(&output_path1).await.unwrap(); + assert_eq!(content1, b"part0_seq0part0_seq1"); + + // Check second part + let output_path2 = temp_dir.path().join("output.2.ts"); + let content2 = tokio::fs::read(&output_path2).await.unwrap(); + assert_eq!(content2, b"part1_seq2part1_seq3"); + } + + #[tokio::test] + async fn test_concat_merge_failure() { + let cache = MemoryCacheSource::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let output_path = temp_dir.path().join("output.ts"); + + let mut segments = vec![ + create_segment(&cache, 0, 0, b"part0_seq0").await, + ConcatSegment { + segment: SegmentInfo { + sequence: 1, + part_index: 0, + ..Default::default() + }, + success: false, + }, + create_segment(&cache, 2, 0, b"part0_seq2").await, + ]; + + concat_merge(&mut segments, &cache, output_path.clone()) + .await + .unwrap(); + + // First part before failure + let output_path1 = temp_dir.path().join("output.1.ts"); + let content1 = tokio::fs::read(&output_path1).await.unwrap(); + assert_eq!(content1, b"part0_seq0"); + + // Second part after failure + let output_path2 = temp_dir.path().join("output.2.ts"); + let content2 = tokio::fs::read(&output_path2).await.unwrap(); + assert_eq!(content2, b"part0_seq2"); + } + + #[tokio::test] + async fn test_concat_merge_sorting() { + let cache = MemoryCacheSource::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let output_path = temp_dir.path().join("output.ts"); + + // Out of order segments + let mut segments = vec![ + create_segment(&cache, 1, 0, b"part0_seq1").await, + create_segment(&cache, 0, 0, b"part0_seq0").await, + create_segment(&cache, 3, 1, b"part1_seq3").await, + create_segment(&cache, 2, 1, b"part1_seq2").await, + ]; + + concat_merge(&mut segments, &cache, output_path.clone()) + .await + .unwrap(); + + let output_path1 = temp_dir.path().join("output.1.ts"); + let content1 = tokio::fs::read(&output_path1).await.unwrap(); + assert_eq!(content1, b"part0_seq0part0_seq1"); + + let output_path2 = temp_dir.path().join("output.2.ts"); + let content2 = tokio::fs::read(&output_path2).await.unwrap(); + assert_eq!(content2, b"part1_seq2part1_seq3"); + } } diff --git a/crates/iori/src/merge/pipe.rs b/crates/iori/src/merge/pipe.rs index 2fa9daf..739f601 100644 --- a/crates/iori/src/merge/pipe.rs +++ b/crates/iori/src/merge/pipe.rs @@ -25,7 +25,7 @@ type SendSegment = ( pub struct PipeMerger { recycle: bool, - sender: Option)>>, + sender: Option))>>, future: Option>, } @@ -40,9 +40,9 @@ impl PipeMerger { ) -> Self { let (tx, rx) = mpsc::unbounded_channel(); - let mut stream: OrderedStream> = OrderedStream::new(rx); + let mut stream: OrderedStream<(u64, Option)> = OrderedStream::new(rx); let future = tokio::spawn(async move { - while let Some((_, segment)) = stream.next().await { + while let Some((_, (_, segment))) = stream.next().await { if let Some((mut reader, _type, invalidate)) = segment { _ = tokio::io::copy(&mut reader, &mut writer).await; if recycle { @@ -63,16 +63,20 @@ impl PipeMerger { pub fn file(recycle: bool, target_path: PathBuf) -> Self { let (tx, rx) = mpsc::unbounded_channel(); - let mut stream: OrderedStream> = OrderedStream::new(rx); + let mut stream: OrderedStream<(u64, Option)> = OrderedStream::new(rx); let future = tokio::spawn(async move { let mut namer = DuplicateOutputFileNamer::new(target_path.clone()); - let mut target = Some( - tokio::fs::File::create(&target_path) - .await - .expect("Failed to create file"), - ); - while let Some((_, segment)) = stream.next().await { + let mut target: Option = None; + let mut current_part_index: Option = None; + + while let Some((_, (part_index, segment))) = stream.next().await { if let Some((mut reader, _type, invalidate)) = segment { + if let Some(prev) = current_part_index { + if part_index != prev { + target = None; + } + } + if target.is_none() { let file = tokio::fs::File::create(namer.next_path()) .await @@ -86,6 +90,7 @@ impl PipeMerger { if recycle { _ = invalidate.await; } + current_part_index = Some(part_index); } else { target = None; } @@ -103,7 +108,7 @@ impl PipeMerger { pub fn mux(recycle: bool, output: PathBuf, extra_command: Option) -> Self { let (tx, rx) = mpsc::unbounded_channel(); - let mut stream: OrderedStream> = OrderedStream::new(rx); + let mut stream: OrderedStream<(u64, Option)> = OrderedStream::new(rx); #[cfg(target_os = "windows")] let (mut audio_pipe, audio_receiver) = { @@ -207,7 +212,7 @@ impl PipeMerger { } }); - while let Some((_, segment)) = stream.next().await { + while let Some((_, (_, segment))) = stream.next().await { if let Some((reader, r#type, invalidate)) = segment { match r#type { StreamType::Video => { @@ -242,7 +247,7 @@ impl PipeMerger { } } - fn send(&self, message: (u64, u64, Option)) { + fn send(&self, message: (u64, u64, (u64, Option))) { if let Some(sender) = &self.sender { sender.send(message).expect("Failed to send segment"); } @@ -255,6 +260,7 @@ impl Merger for PipeMerger { async fn update(&mut self, segment: SegmentInfo, cache: impl CacheSource) -> IoriResult<()> { let stream_id = segment.stream_id; let sequence = segment.sequence; + let part_index = segment.part_index; let stream_type = segment.stream_type; let reader = cache.open_reader(&segment).await?; let invalidate = async move { cache.invalidate(&segment).await }; @@ -262,7 +268,10 @@ impl Merger for PipeMerger { self.send(( stream_id, sequence, - Some((Box::pin(reader), stream_type, Box::pin(invalidate))), + ( + part_index, + Some((Box::pin(reader), stream_type, Box::pin(invalidate))), + ), )); Ok(()) @@ -272,7 +281,7 @@ impl Merger for PipeMerger { let stream_id = segment.stream_id; cache.invalidate(&segment).await?; - self.send((stream_id, segment.sequence, None)); + self.send((stream_id, segment.sequence, (segment.part_index, None))); Ok(()) } diff --git a/crates/iori/src/merge/proxy.rs b/crates/iori/src/merge/proxy.rs index 46066f8..36271d8 100644 --- a/crates/iori/src/merge/proxy.rs +++ b/crates/iori/src/merge/proxy.rs @@ -183,30 +183,19 @@ fn generate_media_playlist( playlist.push_str("#EXT-X-TARGETDURATION:10\n"); // TODO: validate whether this is correct playlist.push_str("#EXT-X-MEDIA-SEQUENCE:0\n"); - // Calculate discontinuity sequence - // Discontinuity occurs when: - // 1. There's a gap in sequence numbers (missing segments) - // 2. The previous segment failed (marked in failed_segments) - let mut prev_sequence: Option = None; + let mut prev_part_index: Option = None; let mut discontinuity_sequence = 0u64; - let mut prev_failed = false; for (sequence, segment) in segments.iter() { let current_failed = failed_segments.contains(&(stream_id, *sequence)); // Mark discontinuity if: - // 1. There's a gap in sequences, OR - // 2. The previous segment failed (we're resuming after failure) - let should_add_discontinuity = if let Some(prev) = prev_sequence { - // Gap in sequence or previous segment failed - *sequence != prev + 1 || prev_failed - } else { - false - }; - - if should_add_discontinuity { - playlist.push_str("#EXT-X-DISCONTINUITY\n"); - discontinuity_sequence += 1; + // 1. part_index changed + if let Some(prev) = prev_part_index { + if segment.part_index != prev { + playlist.push_str("#EXT-X-DISCONTINUITY\n"); + discontinuity_sequence += 1; + } } // Skip failed segments in the playlist output @@ -218,8 +207,7 @@ fn generate_media_playlist( playlist.push_str(&format!("/segment/{}/{}\n", stream_id, sequence)); } - prev_sequence = Some(*sequence); - prev_failed = current_failed; + prev_part_index = Some(segment.part_index); } // Add discontinuity sequence at the beginning if there were any discontinuities diff --git a/crates/iori/src/segment.rs b/crates/iori/src/segment.rs index fd3dc7b..4b7dbbf 100644 --- a/crates/iori/src/segment.rs +++ b/crates/iori/src/segment.rs @@ -74,6 +74,7 @@ pub struct SegmentInfo { pub key: Option>, pub duration: Option, pub format: SegmentFormat, + pub part_index: u64, } impl From<&T> for SegmentInfo @@ -90,6 +91,7 @@ where duration: segment.duration(), stream_type: segment.stream_type(), format: segment.format(), + part_index: segment.part_index(), } } } @@ -126,6 +128,10 @@ impl StreamingSegment for Box { fn format(&self) -> SegmentFormat { self.as_ref().format() } + + fn part_index(&self) -> u64 { + self.as_ref().part_index() + } } impl StreamingSegment for &Box { @@ -160,6 +166,10 @@ impl StreamingSegment for &Box { fn format(&self) -> SegmentFormat { self.as_ref().format() } + + fn part_index(&self) -> u64 { + self.as_ref().part_index() + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]