Skip to content

Commit ac2dee4

Browse files
committed
Handle single media streams
1 parent dbe9bf3 commit ac2dee4

File tree

3 files changed

+83
-39
lines changed

3 files changed

+83
-39
lines changed

lib/membrane_http_adaptive_stream/hls_source.ex

Lines changed: 81 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,16 @@ defmodule Membrane.HLS.Source do
2525
def_output_pad :video_output,
2626
accepted_format: any_of(H264, %RemoteStream{content_format: H264}),
2727
flow_control: :manual,
28-
demand_unit: :buffers
28+
demand_unit: :buffers,
29+
availability: :on_request,
30+
max_instances: 1
2931

3032
def_output_pad :audio_output,
3133
accepted_format: any_of(AAC, %RemoteStream{content_format: AAC}),
3234
flow_control: :manual,
33-
demand_unit: :buffers
35+
demand_unit: :buffers,
36+
availability: :on_request,
37+
max_instances: 1
3438

3539
# The boundary on how many chunks of one stream will be requested
3640
# from Membrane.HLS.Source.ClientGenServer at once.
@@ -88,6 +92,7 @@ defmodule Membrane.HLS.Source do
8892
@impl true
8993
def handle_init(_ctx, opts) do
9094
initial_pad_state = %{
95+
ref: nil,
9196
requested: 0,
9297
qex: Qex.new(),
9398
qex_size: 0,
@@ -106,6 +111,12 @@ defmodule Membrane.HLS.Source do
106111
{[], state}
107112
end
108113

114+
@impl true
115+
def handle_pad_added(Pad.ref(pad_name, _id) = pad_ref, _ctx, state) do
116+
state = state |> put_in([pad_name, :ref], pad_ref)
117+
{[], state}
118+
end
119+
109120
@impl true
110121
def handle_setup(_ctx, state) do
111122
{:ok, client_genserver} =
@@ -119,20 +130,53 @@ defmodule Membrane.HLS.Source do
119130

120131
@impl true
121132
def handle_playing(_ctx, state) do
122-
{[audio_stream_format], [video_stream_format]} =
133+
stream_formats =
123134
ClientGenServer.get_tracks_info(state.client_genserver)
124135
|> Map.values()
125-
|> Enum.split_with(&audio_stream_format?/1)
126136

127-
actions = [
128-
stream_format: {:audio_output, audio_stream_format},
129-
stream_format: {:video_output, video_stream_format}
130-
]
137+
:ok = ensure_pads_match_stream_formats!(stream_formats, state)
138+
139+
actions =
140+
stream_formats
141+
|> Enum.map(fn stream_format ->
142+
pad_ref =
143+
if audio_stream_format?(stream_format),
144+
do: state.audio_output.ref,
145+
else: state.video_output.ref
146+
147+
{:stream_format, {pad_ref, stream_format}}
148+
end)
131149

132150
state = request_media_chunks(state)
133151
{actions, state}
134152
end
135153

154+
defp ensure_pads_match_stream_formats!(stream_formats, state) do
155+
audio_format_occurs? =
156+
Enum.any?(stream_formats, &audio_stream_format?/1)
157+
158+
video_format_occurs? =
159+
Enum.any?(stream_formats, &(not audio_stream_format?(&1)))
160+
161+
if audio_format_occurs? and state.audio_output.ref == nil do
162+
raise "Audio pad should be linked"
163+
end
164+
165+
if video_format_occurs? and state.video_output.ref == nil do
166+
raise "Video pad should be linked"
167+
end
168+
169+
if not audio_format_occurs? and state.audio_output.ref != nil do
170+
raise "Audio pad should not be linked"
171+
end
172+
173+
if not video_format_occurs? and state.video_output.ref != nil do
174+
raise "Video pad should not be linked"
175+
end
176+
177+
:ok
178+
end
179+
136180
defp audio_stream_format?(stream_format) do
137181
case stream_format do
138182
%RemoteStream{content_format: AAC} -> true
@@ -151,7 +195,7 @@ defmodule Membrane.HLS.Source do
151195

152196
@impl true
153197
def handle_info({data_type, %ExHLS.Chunk{} = chunk}, _ctx, state) do
154-
pad_ref = data_type_to_pad_ref(data_type)
198+
pad_name = data_type_to_pad_name(data_type)
155199

156200
buffer = %Buffer{
157201
payload: chunk.payload,
@@ -162,55 +206,55 @@ defmodule Membrane.HLS.Source do
162206

163207
state =
164208
state
165-
|> update_in([pad_ref, :qex], &Qex.push(&1, buffer))
166-
|> update_in([pad_ref, :qex_size], &(&1 + 1))
167-
|> update_in([pad_ref, :requested], &(&1 - 1))
168-
|> update_in([pad_ref, :oldest_buffer_dts], fn
209+
|> update_in([pad_name, :qex], &Qex.push(&1, buffer))
210+
|> update_in([pad_name, :qex_size], &(&1 + 1))
211+
|> update_in([pad_name, :requested], &(&1 - 1))
212+
|> update_in([pad_name, :oldest_buffer_dts], fn
169213
nil -> buffer.dts
170214
oldest_dts -> oldest_dts
171215
end)
172216
|> request_media_chunks()
173217

174-
{[redemand: pad_ref], state}
218+
{[redemand: state[pad_name].ref], state}
175219
end
176220

177221
@impl true
178222
def handle_info({data_type, :end_of_stream}, _ctx, state) do
179-
pad_ref = data_type_to_pad_ref(data_type)
223+
pad_name = data_type_to_pad_name(data_type)
180224

181225
state =
182-
if state[pad_ref].eos_received? do
226+
if state[pad_name].eos_received? do
183227
state
184228
else
185229
state
186-
|> put_in([pad_ref, :eos_received?], true)
187-
|> update_in([pad_ref, :qex], &Qex.push(&1, :end_of_stream))
188-
|> update_in([pad_ref, :qex_size], &(&1 + 1))
230+
|> put_in([pad_name, :eos_received?], true)
231+
|> update_in([pad_name, :qex], &Qex.push(&1, :end_of_stream))
232+
|> update_in([pad_name, :qex_size], &(&1 + 1))
189233
end
190234

191-
state = state |> update_in([pad_ref, :requested], &(&1 - 1))
235+
state = state |> update_in([pad_name, :requested], &(&1 - 1))
192236

193-
{[redemand: pad_ref], state}
237+
{[redemand: state[pad_name].ref], state}
194238
end
195239

196-
defp data_type_to_pad_ref(:audio_chunk), do: :audio_output
197-
defp data_type_to_pad_ref(:video_chunk), do: :video_output
240+
defp data_type_to_pad_name(:audio_chunk), do: :audio_output
241+
defp data_type_to_pad_name(:video_chunk), do: :video_output
198242

199-
defp pop_buffers(pad_ref, demand, state) do
200-
how_many_pop = min(state[pad_ref].qex_size, demand)
243+
defp pop_buffers(Pad.ref(pad_name, _id) = pad_ref, demand, state) do
244+
how_many_pop = min(state[pad_name].qex_size, demand)
201245

202246
1..how_many_pop//1
203247
|> Enum.flat_map_reduce(state, fn _i, state ->
204-
{buffer_or_eos, qex} = state[pad_ref].qex |> Qex.pop!()
248+
{buffer_or_eos, qex} = state[pad_name].qex |> Qex.pop!()
205249

206250
state =
207251
state
208-
|> put_in([pad_ref, :qex], qex)
209-
|> update_in([pad_ref, :qex_size], &(&1 - 1))
252+
|> put_in([pad_name, :qex], qex)
253+
|> update_in([pad_name, :qex_size], &(&1 - 1))
210254

211255
case buffer_or_eos do
212256
%Buffer{} = buffer ->
213-
state = state |> put_in([pad_ref, :oldest_buffer_dts], buffer.dts)
257+
state = state |> put_in([pad_name, :oldest_buffer_dts], buffer.dts)
214258
{[buffer: {pad_ref, buffer}], state}
215259

216260
:end_of_stream ->
@@ -221,13 +265,13 @@ defmodule Membrane.HLS.Source do
221265

222266
defp request_media_chunks(state) do
223267
[:audio_output, :video_output]
224-
|> Enum.reduce(state, fn pad_ref, state ->
225-
oldest_dts = state[pad_ref].oldest_buffer_dts
226-
eos_received? = state[pad_ref].eos_received?
268+
|> Enum.reduce(state, fn pad_name, state ->
269+
%{eos_received?: eos_received?, oldest_buffer_dts: oldest_dts, ref: pad_ref} =
270+
state[pad_name]
227271

228272
request_size =
229-
case state[pad_ref].qex |> Qex.first() do
230-
_any when eos_received? ->
273+
case state[pad_name].qex |> Qex.first() do
274+
_any when pad_ref == nil or eos_received? ->
231275
0
232276

233277
# todo: maybe we should handle rollovers
@@ -236,14 +280,14 @@ defmodule Membrane.HLS.Source do
236280
0
237281

238282
_empty_or_not_new_enough ->
239-
@requested_chunks_boundary - state[pad_ref].requested
283+
@requested_chunks_boundary - state[pad_name].requested
240284
end
241285

242286
1..request_size//1
243-
|> Enum.each(fn _i -> request_single_chunk(pad_ref, state) end)
287+
|> Enum.each(fn _i -> request_single_chunk(pad_name, state) end)
244288

245289
state
246-
|> update_in([pad_ref, :requested], &(&1 + request_size))
290+
|> update_in([pad_name, :requested], &(&1 + request_size))
247291
end)
248292
end
249293

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
6969
{:membrane_aac_plugin, "~> 0.19.0"},
7070
{:membrane_h26x_plugin, "~> 0.10.0"},
7171
{:ex_hls,
72-
github: "membraneframework-labs/ex_hls", ref: "bf78d233b2f0a8409c8453df8e6dcb61a4e32c29"},
72+
github: "membraneframework-labs/ex_hls", ref: "981f02f5093979c3c1ef71d596ac53e204e2ef7d"},
7373
{:bunch, "~> 1.6"},
7474
{:qex, "~> 0.5"},
7575
{:membrane_hackney_plugin, "~> 0.11.0", only: :test},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
1313
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
1414
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
15-
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "bf78d233b2f0a8409c8453df8e6dcb61a4e32c29", [ref: "bf78d233b2f0a8409c8453df8e6dcb61a4e32c29"]},
15+
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "981f02f5093979c3c1ef71d596ac53e204e2ef7d", [ref: "981f02f5093979c3c1ef71d596ac53e204e2ef7d"]},
1616
"ex_m3u8": {:hex, :ex_m3u8, "0.15.2", "7b13d5c719fa6bd58e1fd54d3d17b27e4c2a21fa10ad74c39e7ec36be165d58a", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "160d31f132f15856fcd4a4c40e448784f3105e5fcd05ac39de6a4ccc8aa4a697"},
1717
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
1818
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},

0 commit comments

Comments
 (0)