22mod live;
33mod non_live;
44
5+ use async_trait:: async_trait;
56use bytes:: Bytes ;
67
8+ #[ cfg( feature = "ffmpeg" ) ]
9+ use bytes:: BytesMut ;
10+
11+ #[ cfg( feature = "ffmpeg" ) ]
12+ use std:: { process:: Stdio , sync:: Arc } ;
13+
14+ #[ cfg( feature = "ffmpeg" ) ]
15+ use tokio:: {
16+ io:: { AsyncReadExt , AsyncWriteExt } ,
17+ process:: { Child , Command } ,
18+ sync:: {
19+ mpsc:: { channel, Receiver } ,
20+ Mutex , Notify ,
21+ } ,
22+ task:: JoinHandle ,
23+ } ;
24+
725#[ cfg( feature = "live" ) ]
826pub use live:: { LiveStream , LiveStreamOptions } ;
927pub use non_live:: { NonLiveStream , NonLiveStreamOptions } ;
1028
29+ #[ cfg( feature = "ffmpeg" ) ]
30+ use crate :: constants:: DEFAULT_HEADERS ;
1131use crate :: VideoError ;
12- use async_trait:: async_trait;
1332
1433#[ async_trait]
1534pub trait Stream {
@@ -25,3 +44,156 @@ pub trait Stream {
2544 0
2645 }
2746}
47+
48+ #[ cfg( feature = "ffmpeg" ) ]
49+ pub struct FFmpegStreamOptions {
50+ pub client : reqwest_middleware:: ClientWithMiddleware ,
51+ pub link : String ,
52+ pub content_length : u64 ,
53+ pub dl_chunk_size : u64 ,
54+ pub start : u64 ,
55+ pub end : u64 ,
56+ pub ffmpeg_args : Vec < String > ,
57+ }
58+
59+ #[ cfg( feature = "ffmpeg" ) ]
60+ pub ( crate ) struct FFmpegStream {
61+ pub refined_data_reciever : Option < Arc < Mutex < Receiver < Bytes > > > > ,
62+ download_notify : Arc < Notify > ,
63+ ffmpeg_child : Child ,
64+
65+ tasks : Vec < JoinHandle < Result < ( ) , VideoError > > > ,
66+ }
67+
68+ #[ cfg( feature = "ffmpeg" ) ]
69+ impl FFmpegStream {
70+ pub fn new ( options : FFmpegStreamOptions ) -> Result < Self , VideoError > {
71+ let ( tx, mut rx) = channel :: < Bytes > ( 16384 ) ;
72+ let ( refined_tx, refined_rx) = channel :: < Bytes > ( 16384 ) ;
73+
74+ // Spawn FFmpeg process
75+ let mut ffmpeg_child = Command :: new ( "ffmpeg" )
76+ . args ( & options. ffmpeg_args )
77+ . stdin ( Stdio :: piped ( ) )
78+ . stdout ( Stdio :: piped ( ) )
79+ . kill_on_drop ( true )
80+ . spawn ( )
81+ . map_err ( |x| VideoError :: FFmpeg ( x. to_string ( ) ) ) ?;
82+
83+ let mut stdin = ffmpeg_child. stdin . take ( ) . unwrap ( ) ;
84+ let mut stdout = ffmpeg_child. stdout . take ( ) . unwrap ( ) ;
85+
86+ let read_stdout_task = tokio:: spawn ( async move {
87+ let mut buffer = vec ! [ 0u8 ; 16384 ] ;
88+ while let Ok ( line) = stdout. read ( & mut buffer) . await {
89+ match line {
90+ 0 => {
91+ break ;
92+ }
93+ n => {
94+ if let Err ( _err) = refined_tx. send ( Bytes :: from ( buffer[ ..n] . to_vec ( ) ) ) . await
95+ {
96+ return Err ( VideoError :: FFmpeg ( "channel closed" . to_string ( ) ) ) ;
97+ // Error or channel closed
98+ } ;
99+ }
100+ }
101+ }
102+
103+ Ok ( ( ) )
104+ } ) ;
105+
106+ let write_stdin_task = tokio:: spawn ( async move {
107+ while let Some ( data) = rx. recv ( ) . await {
108+ if let Err ( err) = stdin. write_all ( & data) . await {
109+ return Err ( VideoError :: FFmpeg ( err. to_string ( ) ) ) ; // Error or channel closed
110+ }
111+ }
112+ Ok ( ( ) )
113+ } ) ;
114+
115+ let download_notify = Arc :: new ( Notify :: new ( ) ) ;
116+ let download_notify_task = download_notify. clone ( ) ;
117+
118+ let download_task = tokio:: spawn ( async move {
119+ let mut end = options. end ;
120+ let mut start = options. start ;
121+ let content_length = options. content_length ;
122+ let client = options. client ;
123+ let link = options. link ;
124+ let dl_chunk_size = options. dl_chunk_size ;
125+
126+ download_notify_task. notified ( ) . await ;
127+
128+ loop {
129+ // Nothing else remain send break to finish
130+ if end == 0 {
131+ break ;
132+ }
133+
134+ if end >= content_length {
135+ end = 0 ;
136+ }
137+
138+ let mut headers = DEFAULT_HEADERS . clone ( ) ;
139+
140+ let range_end = if end == 0 {
141+ "" . to_string ( )
142+ } else {
143+ end. to_string ( )
144+ } ;
145+
146+ headers. insert (
147+ reqwest:: header:: RANGE ,
148+ format ! ( "bytes={}-{}" , start, range_end) . parse ( ) . unwrap ( ) ,
149+ ) ;
150+
151+ let mut response = client
152+ . get ( & link)
153+ . headers ( headers)
154+ . send ( )
155+ . await
156+ . map_err ( VideoError :: ReqwestMiddleware ) ?;
157+
158+ let mut buf: BytesMut = BytesMut :: new ( ) ;
159+
160+ while let Some ( chunk) = response. chunk ( ) . await . map_err ( VideoError :: Reqwest ) ? {
161+ buf. extend ( chunk) ;
162+ }
163+
164+ if end != 0 {
165+ start = end + 1 ;
166+
167+ end += dl_chunk_size;
168+ }
169+
170+ tx. send ( buf. into ( ) )
171+ . await
172+ . map_err ( |x| VideoError :: FFmpeg ( x. to_string ( ) ) ) ?;
173+ }
174+
175+ Ok ( ( ) )
176+ } ) ;
177+
178+ Ok ( Self {
179+ refined_data_reciever : Some ( Arc :: new ( Mutex :: new ( refined_rx) ) ) ,
180+ download_notify,
181+ ffmpeg_child,
182+ tasks : vec ! [ download_task, write_stdin_task, read_stdout_task] ,
183+ } )
184+ }
185+
186+ pub fn start_download ( & self ) {
187+ self . download_notify . notify_one ( ) ;
188+ }
189+ }
190+
191+ #[ cfg( feature = "ffmpeg" ) ]
192+ impl Drop for FFmpegStream {
193+ fn drop ( & mut self ) {
194+ // kill tasks if they are still running
195+ for handle in & self . tasks {
196+ handle. abort ( ) ;
197+ }
198+ }
199+ }
0 commit comments