1+ use async_trait:: async_trait;
2+ use tokio:: sync:: RwLock ;
3+
14use crate :: constants:: DEFAULT_HEADERS ;
25use crate :: stream:: streams:: Stream ;
36use crate :: structs:: VideoError ;
4- use async_trait:: async_trait;
5- use tokio:: sync:: RwLock ;
7+
8+ #[ cfg( feature = "ffmpeg" ) ]
9+ use crate :: { structs:: FFmpegArgs , utils:: ffmpeg_cmd_run} ;
610
711pub struct NonLiveStreamOptions {
812 pub client : Option < reqwest_middleware:: ClientWithMiddleware > ,
@@ -11,6 +15,9 @@ pub struct NonLiveStreamOptions {
1115 pub dl_chunk_size : u64 ,
1216 pub start : u64 ,
1317 pub end : u64 ,
18+
19+ #[ cfg( feature = "ffmpeg" ) ]
20+ pub ffmpeg_args : Option < FFmpegArgs > ,
1421}
1522
1623pub struct NonLiveStream {
@@ -21,6 +28,13 @@ pub struct NonLiveStream {
2128 end : RwLock < u64 > ,
2229
2330 client : reqwest_middleware:: ClientWithMiddleware ,
31+
32+ #[ cfg( feature = "ffmpeg" ) ]
33+ ffmpeg_args : Option < FFmpegArgs > ,
34+ #[ cfg( feature = "ffmpeg" ) ]
35+ ffmpeg_start_byte : RwLock < Vec < u8 > > ,
36+ #[ cfg( feature = "ffmpeg" ) ]
37+ ffmpeg_end_byte : RwLock < usize > ,
2438}
2539
2640impl NonLiveStream {
@@ -52,6 +66,12 @@ impl NonLiveStream {
5266 dl_chunk_size : options. dl_chunk_size ,
5367 start : RwLock :: new ( options. start ) ,
5468 end : RwLock :: new ( options. end ) ,
69+ #[ cfg( feature = "ffmpeg" ) ]
70+ ffmpeg_args : options. ffmpeg_args ,
71+ #[ cfg( feature = "ffmpeg" ) ]
72+ ffmpeg_end_byte : RwLock :: new ( 0 ) ,
73+ #[ cfg( feature = "ffmpeg" ) ]
74+ ffmpeg_start_byte : RwLock :: new ( vec ! [ ] ) ,
5575 } )
5676 }
5777
@@ -66,6 +86,16 @@ impl NonLiveStream {
6686 async fn start_index ( & self ) -> u64 {
6787 * self . start . read ( ) . await
6888 }
89+
90+ #[ cfg( feature = "ffmpeg" ) ]
91+ async fn ffmpeg_end_byte_index ( & self ) -> usize {
92+ * self . ffmpeg_end_byte . read ( ) . await
93+ }
94+
95+ #[ cfg( feature = "ffmpeg" ) ]
96+ async fn ffmpeg_start_byte_index ( & self ) -> Vec < u8 > {
97+ self . ffmpeg_start_byte . read ( ) . await . to_vec ( )
98+ }
6999}
70100
71101#[ async_trait]
@@ -100,13 +130,13 @@ impl Stream for NonLiveStream {
100130 . unwrap ( ) ,
101131 ) ;
102132
103- let response = self . client . get ( & self . link ) . headers ( headers ) . send ( ) . await ;
104-
105- if response . is_err ( ) {
106- return Err ( VideoError :: ReqwestMiddleware ( response . err ( ) . unwrap ( ) ) ) ;
107- }
108-
109- let mut response = response . expect ( "IMPOSSIBLE" ) ;
133+ let mut response = self
134+ . client
135+ . get ( & self . link )
136+ . headers ( headers )
137+ . send ( )
138+ . await
139+ . map_err ( VideoError :: ReqwestMiddleware ) ? ;
110140
111141 let mut buf: Vec < u8 > = vec ! [ ] ;
112142
@@ -115,6 +145,39 @@ impl Stream for NonLiveStream {
115145 buf. extend ( chunk. iter ( ) ) ;
116146 }
117147
148+ #[ cfg( feature = "ffmpeg" ) ]
149+ {
150+ let ffmpeg_args = self
151+ . ffmpeg_args
152+ . clone ( )
153+ . map ( |x| x. build ( ) )
154+ . unwrap_or_default ( ) ;
155+
156+ if !ffmpeg_args. is_empty ( ) {
157+ let ffmpeg_start_byte_index = self . ffmpeg_start_byte_index ( ) . await ;
158+
159+ let cmd_output = ffmpeg_cmd_run (
160+ & ffmpeg_args,
161+ & [ & ffmpeg_start_byte_index, buf. as_slice ( ) ] . concat ( ) ,
162+ )
163+ . await ?;
164+
165+ let end_index = self . ffmpeg_end_byte_index ( ) . await ;
166+
167+ let mut first_buffer_trim = 1 ;
168+ if ffmpeg_start_byte_index. is_empty ( ) {
169+ let mut start_byte = self . ffmpeg_start_byte . write ( ) . await ;
170+ * start_byte = buf. clone ( ) ;
171+ let mut end_byte = self . ffmpeg_end_byte . write ( ) . await ;
172+ * end_byte = cmd_output. len ( ) ;
173+
174+ first_buffer_trim = 0 ;
175+ }
176+
177+ buf = ( cmd_output[ ( end_index + first_buffer_trim) ..] ) . to_vec ( ) ;
178+ }
179+ }
180+
118181 if end != 0 {
119182 let mut start = self . start . write ( ) . await ;
120183 * start = end + 1 ;
0 commit comments