Skip to content

Commit 7c30ba8

Browse files
committed
Implement CR (beyond tests)
1 parent ae7685d commit 7c30ba8

File tree

1 file changed

+83
-20
lines changed

1 file changed

+83
-20
lines changed

lib/membrane_http_adaptive_stream/hls_source.ex

Lines changed: 83 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
11
defmodule Membrane.HLS.Source do
2+
@moduledoc """
3+
A Membrane Source element that fetches and demuxes HLS streams.
4+
5+
It uses the ExHLS library to handle the HLS protocol.
6+
7+
It is recommended to plug `Membrane.H264.Parser` and `Membrane.AAC.Parser`
8+
after this element to parse the video and audio streams respectively,
9+
because the stream formats returned by this element can differ depending
10+
on the type of the HLS stream (MPEG-TS or fMP4).
11+
"""
12+
213
use Membrane.Source
314
require Membrane.Pad, as: Pad
415

516
alias __MODULE__.ClientGenServer
617

718
alias Membrane.{
819
AAC,
20+
Buffer,
921
H264,
1022
RemoteStream
1123
}
@@ -20,28 +32,63 @@ defmodule Membrane.HLS.Source do
2032
flow_control: :manual,
2133
demand_unit: :buffers
2234

35+
@requested_samples_boundary 5
36+
37+
@variant_selection_policy_description """
38+
The policy used to select a variant from the list of available variants.
39+
40+
The policy can be one of the predefined ones or a custom function that takes a map of
41+
variant IDs to their descriptions and returns the ID of the selected variant.
42+
43+
The predefined policies are:
44+
- `:lowest_resolution` - selects the variant with the lowest value of video width * height.
45+
- `:highest_resolution` - selects the variant with the highest value of video width * height.
46+
- `:lowest_bandwidth` - selects the variant with the lowest bandwidth.
47+
- `:highest_bandwidth` - selects the variant with the highest bandwidth.
48+
"""
49+
50+
@typedoc @variant_selection_policy_description
2351
@type variant_selection_policy() ::
2452
:lowest_resolution
2553
| :highest_resolution
2654
| :lowest_bandwidth
2755
| :highest_bandwidth
28-
| (variants_map :: %{optional(integer()) => ExHLS.Client.variant_description()} ->
56+
| (variants_map :: %{integer() => ExHLS.Client.variant_description()} ->
2957
variant_id :: integer())
3058

31-
def_options url: [spec: String.t()],
32-
buffer_size: [spec: pos_integer(), default: 0],
59+
def_options url: [
60+
spec: String.t(),
61+
description: "URL of the HLS playlist manifest"
62+
],
63+
buffered_stream_time: [
64+
spec: Membrane.Time.t(),
65+
default: Membrane.Time.seconds(1),
66+
inspector: &Membrane.Time.inspect/1,
67+
description: """
68+
Amount of time of stream, that will be buffered by #{inspect(__MODULE__)}.
69+
70+
Defaults to 1 second.
71+
"""
72+
],
3373
variant_selection_policy: [
3474
spec: variant_selection_policy(),
35-
default: :highest_resolution
75+
default: :highest_resolution,
76+
description: """
77+
#{@variant_selection_policy_description}
78+
79+
Defaults to `:highest_resolution`.
80+
"""
3681
]
3782

3883
@impl true
3984
def handle_init(_ctx, opts) do
85+
initial_pad_state = %{requested: 0, qex: Qex.new(), oldest_buffer_dts: nil}
86+
4087
state =
4188
Map.from_struct(opts)
4289
|> Map.merge(%{
43-
audio_output: %{requested: 0, qex: Qex.new(), qex_size: 0},
44-
video_output: %{requested: 0, qex: Qex.new(), qex_size: 0},
90+
audio_output: initial_pad_state,
91+
video_output: initial_pad_state,
4592
client_genserver: nil
4693
})
4794

@@ -53,20 +100,18 @@ defmodule Membrane.HLS.Source do
53100
{:ok, client_genserver} =
54101
ClientGenServer.start_link(state.url, state.variant_selection_policy)
55102

103+
# todo: maybe we should call here `get_tracks_info/1` to start downloading segments
104+
# or we should start buffering the frames?
105+
56106
{[], %{state | client_genserver: client_genserver}}
57107
end
58108

59109
@impl true
60-
def handle_playing(ctx, state) do
110+
def handle_playing(_ctx, state) do
61111
{[audio_stream_format], [video_stream_format]} =
62112
ClientGenServer.get_tracks_info(state.client_genserver)
63113
|> Map.values()
64-
|> Enum.split_with(fn
65-
%RemoteStream{content_format: AAC} -> true
66-
%RemoteStream{content_format: H264} -> false
67-
%AAC{} -> true
68-
%H264{} -> false
69-
end)
114+
|> Enum.split_with(&is_audio_stream_format/1)
70115

71116
actions = [
72117
stream_format: {:audio_output, audio_stream_format},
@@ -76,10 +121,19 @@ defmodule Membrane.HLS.Source do
76121
{actions, state}
77122
end
78123

124+
defp is_audio_stream_format(stream_format) do
125+
case stream_format do
126+
%RemoteStream{content_format: AAC} -> true
127+
%RemoteStream{content_format: H264} -> false
128+
%AAC{} -> true
129+
%H264{} -> false
130+
end
131+
end
132+
79133
@impl true
80134
def handle_demand(pad_ref, demand, :buffers, _ctx, state) do
81135
{buffers, state} = pop_buffers(pad_ref, demand, state)
82-
state = request_frames(state)
136+
state = request_samples(state)
83137
{[buffer: {pad_ref, buffers}], state}
84138
end
85139

@@ -109,7 +163,7 @@ defmodule Membrane.HLS.Source do
109163
|> Enum.map_reduce(state, fn _i, state ->
110164
{%ExHLS.Sample{} = sample, qex} = state[pad_ref].qex |> Qex.pop!()
111165

112-
buffer = %Membrane.Buffer{
166+
buffer = %Buffer{
113167
payload: sample.payload,
114168
pts: sample.pts_ms |> Membrane.Time.milliseconds(),
115169
dts: sample.dts_ms |> Membrane.Time.milliseconds(),
@@ -131,12 +185,21 @@ defmodule Membrane.HLS.Source do
131185
defp request_samples(state) do
132186
[:audio_output, :video_output]
133187
|> Enum.reduce(state, fn pad_ref, state ->
134-
request_size = state.buffer_size - state[pad_ref].qex_size - state[pad_ref].requested
188+
oldest_dts = state[pad_ref].oldest_buffer_dts
135189

136-
if request_size > 0 do
137-
1..request_size
138-
|> Enum.each(fn _i -> reuqest_single_sample(pad_ref, state) end)
139-
end
190+
request_size =
191+
case state[pad_ref].qex |> Qex.first() do
192+
# todo: maybe we should handle rollovers
193+
{:value, %Buffer{dts: newest_dts}}
194+
when newest_dts - oldest_dts >= state.buffered_stream_time ->
195+
0
196+
197+
_empty_or_not_new_enough ->
198+
@requested_samples_boundary - state[pad_ref].qex_size - state[pad_ref].requested
199+
end
200+
201+
1..request_size//1
202+
|> Enum.each(fn _i -> request_single_sample(pad_ref, state) end)
140203

141204
state
142205
|> update_in([pad_ref, :requested], &(&1 + request_size))

0 commit comments

Comments
 (0)