@@ -32,6 +32,8 @@ defmodule Membrane.HLS.Source do
3232 flow_control: :manual ,
3333 demand_unit: :buffers
3434
35+ # The boundary on how many samples of one stream will be requested
36+ # from Membrane.HLS.Source.ClientGenServer at once.
3537 @ requested_samples_boundary 5
3638
3739 @ variant_selection_policy_description """
@@ -68,6 +70,9 @@ defmodule Membrane.HLS.Source do
6870 Amount of time of stream, that will be buffered by #{ inspect ( __MODULE__ ) } .
6971
7072 Defaults to 1 second.
73+
74+ Due to implementation details, the amount of the buffered stream might
75+ be slightly different than specyfied value.
7176 """
7277 ] ,
7378 variant_selection_policy: [
@@ -82,7 +87,13 @@ defmodule Membrane.HLS.Source do
8287
8388 @ impl true
8489 def handle_init ( _ctx , opts ) do
85- initial_pad_state = % { requested: 0 , qex: Qex . new ( ) , qex_size: 0 , oldest_buffer_dts: nil }
90+ initial_pad_state = % {
91+ requested: 0 ,
92+ qex: Qex . new ( ) ,
93+ qex_size: 0 ,
94+ oldest_buffer_dts: nil ,
95+ eos_received?: false
96+ }
8697
8798 state =
8899 Map . from_struct ( opts )
@@ -101,7 +112,7 @@ defmodule Membrane.HLS.Source do
101112 ClientGenServer . start_link ( state . url , state . variant_selection_policy )
102113
103114 # todo: maybe we should call here `get_tracks_info/1` to start downloading segments
104- # or we should start buffering the frames?
115+ # or we should start buffering frames?
105116
106117 { [ ] , % { state | client_genserver: client_genserver } }
107118 end
@@ -133,19 +144,14 @@ defmodule Membrane.HLS.Source do
133144
134145 @ impl true
135146 def handle_demand ( pad_ref , demand , :buffers , _ctx , state ) do
136- { buffers , state } = pop_buffers ( pad_ref , demand , state )
147+ { actions , state } = pop_buffers ( pad_ref , demand , state )
137148 state = request_samples ( state )
138- { [ buffer: { pad_ref , buffers } ] , state }
149+ { actions , state }
139150 end
140151
141152 @ impl true
142- def handle_info ( { data_type , % ExHLS.Sample { } = sample } , _ctx , state )
143- when data_type in [ :audio_sample_ , :video_sample ] do
144- pad_ref =
145- case data_type do
146- :audio_sample -> :audio_output
147- :video_sample -> :video_output
148- end
153+ def handle_info ( { data_type , % ExHLS.Sample { } = sample } , _ctx , state ) do
154+ pad_ref = data_type_to_pad_ref ( data_type )
149155
150156 buffer = % Buffer {
151157 payload: sample . payload ,
@@ -168,30 +174,62 @@ defmodule Membrane.HLS.Source do
168174 { [ redemand: pad_ref ] , state }
169175 end
170176
177+ @ impl true
178+ def handle_info ( { data_type , :end_of_stream } , _ctx , state ) do
179+ pad_ref = data_type_to_pad_ref ( data_type )
180+
181+ state =
182+ if state [ pad_ref ] . eos_received? do
183+ state
184+ else
185+ state
186+ |> put_in ( [ pad_ref , :eos_received? ] , true )
187+ |> update_in ( [ pad_ref , :qex ] , & Qex . push ( & 1 , :end_of_stream ) )
188+ |> update_in ( [ pad_ref , :qex_size ] , & ( & 1 + 1 ) )
189+ end
190+
191+ state = state |> update_in ( [ pad_ref , :requested ] , & ( & 1 - 1 ) )
192+
193+ { [ redemand: pad_ref ] , state }
194+ end
195+
196+ defp data_type_to_pad_ref ( :audio_sample ) , do: :audio_output
197+ defp data_type_to_pad_ref ( :video_sample ) , do: :video_output
198+
171199 defp pop_buffers ( pad_ref , demand , state ) do
172200 how_many_pop = min ( state [ pad_ref ] . qex_size , demand )
173201
174202 1 .. how_many_pop // 1
175- |> Enum . map_reduce ( state , fn _i , state ->
176- { % Buffer { } = buffer , qex } = state [ pad_ref ] . qex |> Qex . pop! ( )
203+ |> Enum . flat_map_reduce ( state , fn _i , state ->
204+ { buffer_or_eos , qex } = state [ pad_ref ] . qex |> Qex . pop! ( )
177205
178206 state =
179207 state
180- |> put_in ( [ pad_ref , :oldest_buffer_dts ] , buffer . dts )
181208 |> put_in ( [ pad_ref , :qex ] , qex )
182209 |> update_in ( [ pad_ref , :qex_size ] , & ( & 1 - 1 ) )
183210
184- { buffer , state }
211+ case buffer_or_eos do
212+ % Buffer { } = buffer ->
213+ state = state |> put_in ( [ pad_ref , :oldest_buffer_dts ] , buffer . dts )
214+ { [ buffer: { pad_ref , buffer } ] , state }
215+
216+ :end_of_stream ->
217+ { [ end_of_stream: pad_ref ] , state }
218+ end
185219 end )
186220 end
187221
188222 defp request_samples ( state ) do
189223 [ :audio_output , :video_output ]
190224 |> Enum . reduce ( state , fn pad_ref , state ->
191225 oldest_dts = state [ pad_ref ] . oldest_buffer_dts
226+ eos_received? = state [ pad_ref ] . eos_received?
192227
193228 request_size =
194229 case state [ pad_ref ] . qex |> Qex . first ( ) do
230+ _any when eos_received? ->
231+ 0
232+
195233 # todo: maybe we should handle rollovers
196234 { :value , % Buffer { dts: newest_dts } }
197235 when newest_dts - oldest_dts >= state . buffered_stream_time ->
0 commit comments