Skip to content

Commit 46d3223

Browse files
authored
Merge pull request #113 from membraneframework/ex-hls-source
ExHLS Source
2 parents f4e3ff1 + dbe9bf3 commit 46d3223

File tree

10 files changed

+534
-22
lines changed

10 files changed

+534
-22
lines changed
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
defmodule Membrane.HLS.Source do
2+
@moduledoc """
3+
A Membrane Source element that fetches and demuxes HLS streams.
4+
5+
It uses the ExHLS library to handle the HLS protocol.
6+
7+
It is recommended to plug `Membrane.H264.Parser` and `Membrane.AAC.Parser`
8+
after this element to parse the video and audio streams respectively,
9+
because the stream formats returned by this element can differ depending
10+
on the type of the HLS stream (MPEG-TS or fMP4).
11+
"""
12+
13+
use Membrane.Source
14+
require Membrane.Pad, as: Pad
15+
16+
alias __MODULE__.ClientGenServer
17+
18+
alias Membrane.{
19+
AAC,
20+
Buffer,
21+
H264,
22+
RemoteStream
23+
}
24+
25+
def_output_pad :video_output,
26+
accepted_format: any_of(H264, %RemoteStream{content_format: H264}),
27+
flow_control: :manual,
28+
demand_unit: :buffers
29+
30+
def_output_pad :audio_output,
31+
accepted_format: any_of(AAC, %RemoteStream{content_format: AAC}),
32+
flow_control: :manual,
33+
demand_unit: :buffers
34+
35+
# The boundary on how many chunks of one stream will be requested
36+
# from Membrane.HLS.Source.ClientGenServer at once.
37+
@requested_chunks_boundary 5
38+
39+
@variant_selection_policy_description """
40+
The policy used to select a variant from the list of available variants.
41+
42+
The policy can be one of the predefined ones or a custom function that takes a map of
43+
variant IDs to their descriptions and returns the ID of the selected variant.
44+
45+
The predefined policies are:
46+
- `:lowest_resolution` - selects the variant with the lowest value of video width * height.
47+
- `:highest_resolution` - selects the variant with the highest value of video width * height.
48+
- `:lowest_bandwidth` - selects the variant with the lowest bandwidth.
49+
- `:highest_bandwidth` - selects the variant with the highest bandwidth.
50+
"""
51+
52+
@typedoc @variant_selection_policy_description
53+
@type variant_selection_policy() ::
54+
:lowest_resolution
55+
| :highest_resolution
56+
| :lowest_bandwidth
57+
| :highest_bandwidth
58+
| (variants_map :: %{integer() => ExHLS.Client.variant_description()} ->
59+
variant_id :: integer())
60+
61+
def_options url: [
62+
spec: String.t(),
63+
description: "URL of the HLS playlist manifest"
64+
],
65+
buffered_stream_time: [
66+
spec: Membrane.Time.t(),
67+
default: Membrane.Time.seconds(5),
68+
inspector: &Membrane.Time.inspect/1,
69+
description: """
70+
Amount of time of stream, that will be buffered by #{inspect(__MODULE__)}.
71+
72+
Defaults to 5 seconds.
73+
74+
Due to implementation details, the amount of the buffered stream might
75+
be slightly different than specified value.
76+
"""
77+
],
78+
variant_selection_policy: [
79+
spec: variant_selection_policy(),
80+
default: :highest_resolution,
81+
description: """
82+
#{@variant_selection_policy_description}
83+
84+
Defaults to `:highest_resolution`.
85+
"""
86+
]
87+
88+
@impl true
89+
def handle_init(_ctx, opts) do
90+
initial_pad_state = %{
91+
requested: 0,
92+
qex: Qex.new(),
93+
qex_size: 0,
94+
oldest_buffer_dts: nil,
95+
eos_received?: false
96+
}
97+
98+
state =
99+
Map.from_struct(opts)
100+
|> Map.merge(%{
101+
audio_output: initial_pad_state,
102+
video_output: initial_pad_state,
103+
client_genserver: nil
104+
})
105+
106+
{[], state}
107+
end
108+
109+
@impl true
110+
def handle_setup(_ctx, state) do
111+
{:ok, client_genserver} =
112+
ClientGenServer.start_link(state.url, state.variant_selection_policy)
113+
114+
# todo: maybe we should call here `get_tracks_info/1` to start downloading segments
115+
# or we should start buffering frames?
116+
117+
{[], %{state | client_genserver: client_genserver}}
118+
end
119+
120+
@impl true
121+
def handle_playing(_ctx, state) do
122+
{[audio_stream_format], [video_stream_format]} =
123+
ClientGenServer.get_tracks_info(state.client_genserver)
124+
|> Map.values()
125+
|> Enum.split_with(&audio_stream_format?/1)
126+
127+
actions = [
128+
stream_format: {:audio_output, audio_stream_format},
129+
stream_format: {:video_output, video_stream_format}
130+
]
131+
132+
state = request_media_chunks(state)
133+
{actions, state}
134+
end
135+
136+
defp audio_stream_format?(stream_format) do
137+
case stream_format do
138+
%RemoteStream{content_format: AAC} -> true
139+
%RemoteStream{content_format: H264} -> false
140+
%AAC{} -> true
141+
%H264{} -> false
142+
end
143+
end
144+
145+
@impl true
146+
def handle_demand(pad_ref, demand, :buffers, _ctx, state) do
147+
{actions, state} = pop_buffers(pad_ref, demand, state)
148+
state = request_media_chunks(state)
149+
{actions, state}
150+
end
151+
152+
@impl true
153+
def handle_info({data_type, %ExHLS.Chunk{} = chunk}, _ctx, state) do
154+
pad_ref = data_type_to_pad_ref(data_type)
155+
156+
buffer = %Buffer{
157+
payload: chunk.payload,
158+
pts: chunk.pts_ms |> Membrane.Time.milliseconds(),
159+
dts: chunk.dts_ms |> Membrane.Time.milliseconds(),
160+
metadata: chunk.metadata
161+
}
162+
163+
state =
164+
state
165+
|> update_in([pad_ref, :qex], &Qex.push(&1, buffer))
166+
|> update_in([pad_ref, :qex_size], &(&1 + 1))
167+
|> update_in([pad_ref, :requested], &(&1 - 1))
168+
|> update_in([pad_ref, :oldest_buffer_dts], fn
169+
nil -> buffer.dts
170+
oldest_dts -> oldest_dts
171+
end)
172+
|> request_media_chunks()
173+
174+
{[redemand: pad_ref], state}
175+
end
176+
177+
@impl true
178+
def handle_info({data_type, :end_of_stream}, _ctx, state) do
179+
pad_ref = data_type_to_pad_ref(data_type)
180+
181+
state =
182+
if state[pad_ref].eos_received? do
183+
state
184+
else
185+
state
186+
|> put_in([pad_ref, :eos_received?], true)
187+
|> update_in([pad_ref, :qex], &Qex.push(&1, :end_of_stream))
188+
|> update_in([pad_ref, :qex_size], &(&1 + 1))
189+
end
190+
191+
state = state |> update_in([pad_ref, :requested], &(&1 - 1))
192+
193+
{[redemand: pad_ref], state}
194+
end
195+
196+
defp data_type_to_pad_ref(:audio_chunk), do: :audio_output
197+
defp data_type_to_pad_ref(:video_chunk), do: :video_output
198+
199+
defp pop_buffers(pad_ref, demand, state) do
200+
how_many_pop = min(state[pad_ref].qex_size, demand)
201+
202+
1..how_many_pop//1
203+
|> Enum.flat_map_reduce(state, fn _i, state ->
204+
{buffer_or_eos, qex} = state[pad_ref].qex |> Qex.pop!()
205+
206+
state =
207+
state
208+
|> put_in([pad_ref, :qex], qex)
209+
|> update_in([pad_ref, :qex_size], &(&1 - 1))
210+
211+
case buffer_or_eos do
212+
%Buffer{} = buffer ->
213+
state = state |> put_in([pad_ref, :oldest_buffer_dts], buffer.dts)
214+
{[buffer: {pad_ref, buffer}], state}
215+
216+
:end_of_stream ->
217+
{[end_of_stream: pad_ref], state}
218+
end
219+
end)
220+
end
221+
222+
defp request_media_chunks(state) do
223+
[:audio_output, :video_output]
224+
|> Enum.reduce(state, fn pad_ref, state ->
225+
oldest_dts = state[pad_ref].oldest_buffer_dts
226+
eos_received? = state[pad_ref].eos_received?
227+
228+
request_size =
229+
case state[pad_ref].qex |> Qex.first() do
230+
_any when eos_received? ->
231+
0
232+
233+
# todo: maybe we should handle rollovers
234+
{:value, %Buffer{dts: newest_dts}}
235+
when newest_dts - oldest_dts >= state.buffered_stream_time ->
236+
0
237+
238+
_empty_or_not_new_enough ->
239+
@requested_chunks_boundary - state[pad_ref].requested
240+
end
241+
242+
1..request_size//1
243+
|> Enum.each(fn _i -> request_single_chunk(pad_ref, state) end)
244+
245+
state
246+
|> update_in([pad_ref, :requested], &(&1 + request_size))
247+
end)
248+
end
249+
250+
defp request_single_chunk(:audio_output, state),
251+
do: ClientGenServer.request_audio_chunk(state.client_genserver)
252+
253+
defp request_single_chunk(:video_output, state),
254+
do: ClientGenServer.request_video_chunk(state.client_genserver)
255+
end
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
defmodule Membrane.HLS.Source.ClientGenServer do
2+
# This GenServer is used by Membrane.HLS.Source to allow downloading the
3+
# HLS segments asynchronously.
4+
@moduledoc false
5+
6+
use GenServer
7+
alias ExHLS.Client
8+
9+
@spec start_link(
10+
String.t(),
11+
Membrane.HLS.Source.variant_selection_policy()
12+
) ::
13+
GenServer.on_start()
14+
def start_link(url, variant_selection_policy) do
15+
GenServer.start_link(__MODULE__,
16+
url: url,
17+
variant_selection_policy: variant_selection_policy
18+
)
19+
end
20+
21+
@spec request_audio_chunk(pid()) :: :ok
22+
def request_audio_chunk(client_genserver) do
23+
GenServer.cast(client_genserver, {:request_audio_chunk, self()})
24+
end
25+
26+
@spec request_video_chunk(pid()) :: :ok
27+
def request_video_chunk(client_genserver) do
28+
GenServer.cast(client_genserver, {:request_video_chunk, self()})
29+
end
30+
31+
# this function should be called by Membrane.HLS.Source
32+
# before we start buffering the chunks, to avoid waiting
33+
# on downloading many segments
34+
@spec get_tracks_info(pid()) :: map()
35+
def get_tracks_info(client_genserver) do
36+
GenServer.call(client_genserver, :get_tracks_info)
37+
end
38+
39+
@impl true
40+
def init(url: url, variant_selection_policy: variant_selection_policy) do
41+
state = %{
42+
url: url,
43+
variant_selection_policy: variant_selection_policy,
44+
client: nil
45+
}
46+
47+
{:ok, state, {:continue, :setup}}
48+
end
49+
50+
@impl true
51+
def handle_continue(:setup, state) do
52+
state =
53+
%{state | client: Client.new(state.url)}
54+
|> choose_variant()
55+
56+
{:noreply, state}
57+
end
58+
59+
defp choose_variant(state) do
60+
variants = Client.get_variants(state.client)
61+
62+
if variants != %{} do
63+
get_resolution_fn = fn {_id, %{resolution: {width, height}}} -> width * height end
64+
get_bandwidth_fn = fn {_id, %{bandwidth: bandwidth}} -> bandwidth end
65+
66+
chosen_variant_id =
67+
case state.variant_selection_policy do
68+
:lowest_resolution ->
69+
variants |> Enum.min_by(get_resolution_fn) |> elem(0)
70+
71+
:highest_resolution ->
72+
variants |> Enum.max_by(get_resolution_fn) |> elem(0)
73+
74+
:lowest_bandwidth ->
75+
variants |> Enum.min_by(get_bandwidth_fn) |> elem(0)
76+
77+
:highest_bandwidth ->
78+
variants |> Enum.max_by(get_bandwidth_fn) |> elem(0)
79+
80+
custom_policy when is_function(custom_policy, 1) ->
81+
variants |> custom_policy.()
82+
end
83+
84+
client = state.client |> Client.choose_variant(chosen_variant_id)
85+
%{state | client: client}
86+
else
87+
state
88+
end
89+
end
90+
91+
@impl true
92+
def handle_cast({:request_audio_chunk, pid}, state) do
93+
{chunk, client} = Client.read_audio_chunk(state.client)
94+
send(pid, {:audio_chunk, chunk})
95+
{:noreply, %{state | client: client}}
96+
end
97+
98+
@impl true
99+
def handle_cast({:request_video_chunk, pid}, state) do
100+
{chunk, client} = Client.read_video_chunk(state.client)
101+
send(pid, {:video_chunk, chunk})
102+
{:noreply, %{state | client: client}}
103+
end
104+
105+
@impl true
106+
def handle_call(:get_tracks_info, _from, state) do
107+
{:ok, tracks_info, client} = Client.get_tracks_info(state.client)
108+
{:reply, tracks_info, %{state | client: client}}
109+
end
110+
end

lib/membrane_http_adaptive_stream/manifest/track.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ defmodule Membrane.HTTPAdaptiveStream.Manifest.Track do
654654
new_window_duration = window_duration - segment.duration
655655

656656
new_header =
657-
case segment.attributes |> Enum.find(&match?({:discontinuity, {_, _}}, &1)) do
657+
case segment.attributes |> Enum.find(&match?({:discontinuity, {_arg1, _arg2}}, &1)) do
658658
{:discontinuity, {new_header, seq_number}} ->
659659
{new_header, seq_number}
660660

@@ -664,7 +664,7 @@ defmodule Membrane.HTTPAdaptiveStream.Manifest.Track do
664664

665665
headers_acc =
666666
if new_header != header do
667-
{header_name, _} = header
667+
{header_name, _arg} = header
668668
[header_name | headers_acc]
669669
else
670670
headers_acc

0 commit comments

Comments
 (0)