@@ -82,7 +82,7 @@ defmodule Membrane.HLS.Source do
8282
8383 @ impl true
8484 def handle_init ( _ctx , opts ) do
85- initial_pad_state = % { requested: 0 , qex: Qex . new ( ) , oldest_buffer_dts: nil }
85+ initial_pad_state = % { requested: 0 , qex: Qex . new ( ) , qex_size: 0 , oldest_buffer_dts: nil }
8686
8787 state =
8888 Map . from_struct ( opts )
@@ -118,6 +118,7 @@ defmodule Membrane.HLS.Source do
118118 stream_format: { :video_output , video_stream_format }
119119 ]
120120
121+ state = request_samples ( state )
121122 { actions , state }
122123 end
123124
@@ -138,49 +139,50 @@ defmodule Membrane.HLS.Source do
138139 end
139140
140141 @ impl true
141- def handle_info ( { data_type , sample } , _ctx , state )
142+ def handle_info ( { data_type , % ExHLS.Sample { } = sample } , _ctx , state )
142143 when data_type in [ :audio_sample_ , :video_sample ] do
143144 pad_ref =
144145 case data_type do
145146 :audio_sample -> :audio_output
146147 :video_sample -> :video_output
147148 end
148149
150+ buffer = % Buffer {
151+ payload: sample . payload ,
152+ pts: sample . pts_ms |> Membrane.Time . milliseconds ( ) ,
153+ dts: sample . dts_ms |> Membrane.Time . milliseconds ( ) ,
154+ metadata: sample . metadata
155+ }
156+
149157 state =
150158 state
151- |> update_in ( [ pad_ref , :qex ] , & Qex . push ( & 1 , sample ) )
159+ |> update_in ( [ pad_ref , :qex ] , & Qex . push ( & 1 , buffer ) )
152160 |> update_in ( [ pad_ref , :qex_size ] , & ( & 1 + 1 ) )
153161 |> update_in ( [ pad_ref , :requested ] , & ( & 1 - 1 ) )
162+ |> update_in ( [ pad_ref , :oldest_buffer_dts ] , fn
163+ nil -> buffer . dts
164+ oldest_dts -> oldest_dts
165+ end )
154166 |> request_samples ( )
155167
156168 { [ redemand: pad_ref ] , state }
157169 end
158170
159171 defp pop_buffers ( pad_ref , demand , state ) do
160- range_upperbound = min ( state [ pad_ref ] . qex_size , demand )
161-
162- if range_upperbound > 0 do
163- 1 .. range_upperbound
164- |> Enum . map_reduce ( state , fn _i , state ->
165- { % ExHLS.Sample { } = sample , qex } = state [ pad_ref ] . qex |> Qex . pop! ( )
166-
167- buffer = % Buffer {
168- payload: sample . payload ,
169- pts: sample . pts_ms |> Membrane.Time . milliseconds ( ) ,
170- dts: sample . dts_ms |> Membrane.Time . milliseconds ( ) ,
171- metadata: sample . metadata
172- }
173-
174- state =
175- state
176- |> put_in ( [ pad_ref , :qex ] , qex )
177- |> update_in ( [ pad_ref , :qex_size ] , & ( & 1 - 1 ) )
178-
179- { buffer , state }
180- end )
181- else
182- { [ ] , state }
183- end
172+ how_many_pop = min ( state [ pad_ref ] . qex_size , demand )
173+
174+ 1 .. how_many_pop // 1
175+ |> Enum . map_reduce ( state , fn _i , state ->
176+ { % Buffer { } = buffer , qex } = state [ pad_ref ] . qex |> Qex . pop! ( )
177+
178+ state =
179+ state
180+ |> put_in ( [ pad_ref , :oldest_buffer_dts ] , buffer . dts )
181+ |> put_in ( [ pad_ref , :qex ] , qex )
182+ |> update_in ( [ pad_ref , :qex_size ] , & ( & 1 - 1 ) )
183+
184+ { buffer , state }
185+ end )
184186 end
185187
186188 defp request_samples ( state ) do
@@ -196,7 +198,7 @@ defmodule Membrane.HLS.Source do
196198 0
197199
198200 _empty_or_not_new_enough ->
199- @ requested_samples_boundary - state [ pad_ref ] . qex_size - state [ pad_ref ] . requested
201+ @ requested_samples_boundary - state [ pad_ref ] . requested
200202 end
201203
202204 1 .. request_size // 1
0 commit comments