@@ -37,13 +37,19 @@ pub enum Event {
3737 Error ,
3838}
3939
40+ pub enum SourceConfig {
41+ AudioVideo {
42+ video : gst:: Element ,
43+ audio : gst:: Element ,
44+ } ,
45+ Video ( gst:: Element ) ,
46+ Audio ( gst:: Element ) ,
47+ }
48+
4049#[ cfg( not( target_os = "android" ) ) ]
4150pub struct Pipeline {
4251 inner : gst:: Pipeline ,
43- tx_sink : Option < Box < dyn TransmissionSink > > ,
44- tee : gst:: Element ,
45- preview_queue : gst:: Element ,
46- preview_appsink : gst:: Element ,
52+ tx_sink : Box < dyn TransmissionSink > ,
4753}
4854
4955#[ cfg( target_os = "android" ) ]
@@ -172,54 +178,64 @@ impl Pipeline {
172178 }
173179
174180 #[ cfg( not( target_os = "android" ) ) ]
175- pub fn new < E , S > ( preview_appsink : gst :: Element , mut on_event : E , on_sources : S ) -> Result < Self >
181+ pub fn new_rtsp < E > ( mut on_event : E , source : SourceConfig ) -> Result < Self >
176182 where
177183 E : FnMut ( Event ) + Send + Clone + ' static ,
178- S : Fn ( & [ gst:: glib:: Value ] ) -> Option < gst:: glib:: Value > + Send + Sync + ' static ,
179184 {
180- let scapsrc = gst:: ElementFactory :: make ( "scapsrc" )
181- . property ( "perform-internal-preroll" , true )
182- . build ( ) ?;
183- let tee = gst:: ElementFactory :: make ( "tee" ) . build ( ) ?;
184- let preview_queue = gst:: ElementFactory :: make ( "queue" )
185- . name ( "preview_queue" )
186- . property ( "max-size-time" , 0u64 )
187- . property ( "max-size-buffers" , 0u32 )
188- . property ( "max-size-bytes" , 0u32 )
189- . property_from_str ( "leaky" , "downstream" )
190- . property ( "silent" , true ) // Don't emit signals, can give better perf.
191- . build ( ) ?;
185+ use std:: str:: FromStr ;
192186
193- let pipeline = gst :: Pipeline :: new ( ) ;
187+ use crate :: sender :: transmission :: rtsp :: RtspSink ;
194188
195- let tx_sink = None :: < Box < dyn TransmissionSink > > ;
189+ fn setup_video_source ( pipeline : & gst:: Pipeline , src : gst:: Element ) -> Result < gst:: Element > {
190+ let videorate = gst:: ElementFactory :: make ( "videorate" )
191+ . property ( "skip-to-first" , true )
192+ . build ( ) ?;
193+ let capsfilter = gst:: ElementFactory :: make ( "capsfilter" )
194+ . name ( "video_capsfilter" )
195+ . property ( "caps" , gst:: Caps :: from_str ( "video/x-raw,framerate=25/1" ) ?)
196+ . build ( ) ?;
196197
197- // https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/3993
198- scapsrc. static_pad ( "src" ) . unwrap ( ) . add_probe (
199- gst:: PadProbeType :: QUERY_UPSTREAM . union ( gst:: PadProbeType :: PUSH ) ,
200- |_pad, info| match info. query_mut ( ) . map ( |query| query. view_mut ( ) ) {
201- Some ( gst:: QueryViewMut :: Latency ( latency) ) => {
202- let ( _live, min, max) = latency. result ( ) ;
203- latency. set ( false , min, max) ;
204- gst:: PadProbeReturn :: Handled
205- }
206- _ => gst:: PadProbeReturn :: Pass ,
207- } ,
208- ) ;
198+ pipeline. add_many ( [ & src, & videorate, & capsfilter] ) ?;
199+ gst:: Element :: link_many ( [ & src, & videorate, & capsfilter] ) ?;
200+
201+ Ok ( capsfilter)
202+ }
209203
210- scapsrc. connect ( "select-source" , false , on_sources) ;
204+ fn setup_audio_source ( pipeline : & gst:: Pipeline , src : gst:: Element ) -> Result < gst:: Element > {
205+ let capsfilter = gst:: ElementFactory :: make ( "capsfilter" )
206+ . name ( "audio_capsfilter" )
207+ . property (
208+ "caps" ,
209+ gst:: Caps :: from_str ( "audio/x-raw,channels=2,rate=48000" ) ?,
210+ )
211+ . build ( ) ?;
211212
212- pipeline. add_many ( [ & scapsrc, & tee, & preview_queue, & preview_appsink] ) ?;
213- gst:: Element :: link_many ( [ & scapsrc, & tee] ) ?;
214- gst:: Element :: link_many ( [ & preview_queue, & preview_appsink] ) ?;
213+ pipeline. add_many ( [ & src, & capsfilter] ) ?;
214+ gst:: Element :: link_many ( [ & src, & capsfilter] ) ?;
215215
216- let tee_preview_pad = tee
217- . request_pad_simple ( "src_%u" )
218- . map_or_else ( || Err ( anyhow:: anyhow!( "`request_pad_simple()` failed" ) ) , Ok ) ?;
219- let queue_preview_pad = preview_queue
220- . static_pad ( "sink" )
221- . ok_or ( anyhow:: anyhow!( "preview_queue is missing static sink pad" ) ) ?;
222- tee_preview_pad. link ( & queue_preview_pad) ?;
216+ Ok ( capsfilter)
217+ }
218+
219+ let pipeline = gst:: Pipeline :: new ( ) ;
220+
221+ let source = match source {
222+ SourceConfig :: AudioVideo { video, audio } => SourceConfig :: AudioVideo {
223+ video : setup_video_source ( & pipeline, video) ?,
224+ audio : setup_audio_source ( & pipeline, audio) ?,
225+ } ,
226+ SourceConfig :: Video ( video) => {
227+ SourceConfig :: Video ( setup_video_source ( & pipeline, video) ?)
228+ }
229+ SourceConfig :: Audio ( audio) => {
230+ SourceConfig :: Audio ( setup_audio_source ( & pipeline, audio) ?)
231+ }
232+ } ;
233+
234+ let rtsp = RtspSink :: new ( & pipeline, source, 3000 ) ?;
235+ let p = Self {
236+ inner : pipeline. clone ( ) ,
237+ tx_sink : Box :: new ( rtsp) ,
238+ } ;
223239
224240 // Start the pipeline in background thread because `scapsrc` initialization will block until
225241 // the user selects the input source.
@@ -236,9 +252,11 @@ impl Pipeline {
236252 debug ! ( "Failed to upgrade pipeline before starting" ) ;
237253 return ;
238254 } ;
239- debug ! ( "Starting pipeline" ) ;
255+ debug ! ( "Starting pipeline... " ) ;
240256 if let Err ( err) = pipeline. set_state ( gst:: State :: Playing ) {
241257 error ! ( "Failed to start pipeline: {err}" ) ;
258+ } else {
259+ debug ! ( "Pipeline started" ) ;
242260 }
243261 }
244262
@@ -278,20 +296,11 @@ impl Pipeline {
278296 }
279297 } ) ;
280298
281- Ok ( Self {
282- inner : pipeline,
283- tx_sink,
284- tee,
285- preview_queue,
286- preview_appsink,
287- } )
299+ Ok ( p)
288300 }
289301
290302 pub fn playing ( & mut self ) -> Result < ( ) > {
291- match & mut self . tx_sink {
292- Some ( sink) => sink. playing ( ) ,
293- None => Ok ( ( ) ) ,
294- }
303+ self . tx_sink . playing ( )
295304 }
296305
297306 #[ cfg( target_os = "android" ) ]
@@ -308,27 +317,7 @@ impl Pipeline {
308317 #[ cfg( not( target_os = "android" ) ) ]
309318 pub fn shutdown ( & mut self ) -> Result < ( ) > {
310319 self . inner . set_state ( gst:: State :: Null ) ?;
311-
312- self . preview_queue . unlink ( & self . preview_appsink ) ;
313- self . inner . remove ( & self . preview_appsink ) ?;
314-
315- if let Some ( sink) = & mut self . tx_sink {
316- sink. shutdown ( ) ;
317- }
318-
319- Ok ( ( ) )
320- }
321-
322- #[ cfg( not( target_os = "android" ) ) ]
323- pub fn add_hls_sink ( & mut self , port : u16 ) -> Result < ( ) > {
324- let tee_pad = self
325- . tee
326- . request_pad_simple ( "src_%u" )
327- . ok_or ( anyhow:: anyhow!( "`request_pad_simple()` failed" ) ) ?;
328- let hls = HlsSink :: new ( & self . inner , tee_pad, port) ?;
329- self . tx_sink = Some ( Box :: new ( hls) ) ;
330-
331- debug ! ( "Added HLS sink" ) ;
320+ self . tx_sink . shutdown ( ) ;
332321
333322 Ok ( ( ) )
334323 }
@@ -348,20 +337,6 @@ impl Pipeline {
348337 Ok ( ( ) )
349338 }
350339
351- #[ cfg( not( target_os = "android" ) ) ]
352- pub fn add_rtsp_sink ( & mut self , port : u16 ) -> Result < ( ) > {
353- let tee_pad = self
354- . tee
355- . request_pad_simple ( "src_%u" )
356- . ok_or ( anyhow:: anyhow!( "`request_pad_simple()` failed" ) ) ?;
357- let rtsp = transmission:: rtsp:: RtspSink :: new ( tee_pad, & self . inner , port) ?;
358- self . tx_sink = Some ( Box :: new ( rtsp) ) ;
359-
360- debug ! ( "Added RTSP sink" ) ;
361-
362- Ok ( ( ) )
363- }
364-
365340 #[ cfg( target_os = "android" ) ]
366341 pub fn add_rtp_sink ( & mut self , port : u16 , receiver_addr : IpAddr ) -> Result < ( ) > {
367342 let appsrc_pad = self
@@ -376,26 +351,9 @@ impl Pipeline {
376351 Ok ( ( ) )
377352 }
378353
379- pub fn remove_transmission_sink ( & mut self ) -> Result < ( ) > {
380- if let Some ( sink) = & mut self . tx_sink {
381- sink. shutdown ( ) ;
382- sink. unlink ( & self . inner ) ?;
383- }
384-
385- self . tx_sink = None ;
386-
387- debug ! ( "Removed transmission sink" ) ;
388-
389- Ok ( ( ) )
390- }
391-
392354 /// Get the message that should be sent to a receiver to consume the stream if a transmission
393355 /// sink is present
394356 pub fn get_play_msg ( & self , addr : IpAddr ) -> Option < PlayMessage > {
395- if let Some ( sink) = & self . tx_sink {
396- sink. get_play_msg ( addr)
397- } else {
398- None
399- }
357+ self . tx_sink . get_play_msg ( addr)
400358 }
401359}
0 commit comments