Skip to content

Commit ae7685d

Browse files
committed
Fix credo
1 parent eff2abc commit ae7685d

File tree

4 files changed

+81
-99
lines changed

4 files changed

+81
-99
lines changed

lib/membrane_http_adaptive_stream/hls_source.ex

Lines changed: 39 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,11 @@ defmodule Membrane.HLS.Source do
1212

1313
def_output_pad :video_output,
1414
accepted_format: any_of(H264, %RemoteStream{content_format: H264}),
15-
availability: :on_request,
16-
max_instances: 1,
1715
flow_control: :manual,
1816
demand_unit: :buffers
1917

2018
def_output_pad :audio_output,
2119
accepted_format: any_of(AAC, %RemoteStream{content_format: AAC}),
22-
availability: :on_request,
23-
max_instances: 1,
2420
flow_control: :manual,
2521
demand_unit: :buffers
2622

@@ -34,7 +30,6 @@ defmodule Membrane.HLS.Source do
3430

3531
def_options url: [spec: String.t()],
3632
buffer_size: [spec: pos_integer(), default: 0],
37-
container: [spec: :mpeg_ts | :fmp4, default: :mpeg_ts],
3833
variant_selection_policy: [
3934
spec: variant_selection_policy(),
4035
default: :highest_resolution
@@ -45,8 +40,8 @@ defmodule Membrane.HLS.Source do
4540
state =
4641
Map.from_struct(opts)
4742
|> Map.merge(%{
48-
audio_output: %{pad_ref: nil, requested: 0, qex: Qex.new(), qex_size: 0},
49-
video_output: %{pad_ref: nil, requested: 0, qex: Qex.new(), qex_size: 0},
43+
audio_output: %{requested: 0, qex: Qex.new(), qex_size: 0},
44+
video_output: %{requested: 0, qex: Qex.new(), qex_size: 0},
5045
client_genserver: nil
5146
})
5247

@@ -55,30 +50,14 @@ defmodule Membrane.HLS.Source do
5550

5651
@impl true
5752
def handle_setup(_ctx, state) do
58-
demuxing_engine =
59-
case state.container do
60-
:mpeg_ts -> ExHLS.DemuxingEngine.MPEGTS
61-
:fmp4 -> ExHLS.DemuxingEngine.CMAF
62-
end
63-
64-
{:ok, clinet_genserver} =
65-
ClientGenServer.start_link(state.url, demuxing_engine, state.variant_selection_policy)
66-
67-
{[], %{state | client_genserver: clinet_genserver}}
68-
end
53+
{:ok, client_genserver} =
54+
ClientGenServer.start_link(state.url, state.variant_selection_policy)
6955

70-
@impl true
71-
def handle_pad_added(Pad.ref(pad_name, _id) = pad_ref, _ctx, state) do
72-
state = state |> put_in([pad_name, :pad_ref], pad_ref)
73-
{[], state}
56+
{[], %{state | client_genserver: client_genserver}}
7457
end
7558

7659
@impl true
7760
def handle_playing(ctx, state) do
78-
if ctx.pads |> map_size() < 2 do
79-
raise "HLS Source requires both audio and video output pads to be present"
80-
end
81-
8261
{[audio_stream_format], [video_stream_format]} =
8362
ClientGenServer.get_tracks_info(state.client_genserver)
8463
|> Map.values()
@@ -90,8 +69,8 @@ defmodule Membrane.HLS.Source do
9069
end)
9170

9271
actions = [
93-
stream_format: {state.audio_output.pad_ref, audio_stream_format},
94-
stream_format: {state.video_output.pad_ref, video_stream_format}
72+
stream_format: {:audio_output, audio_stream_format},
73+
stream_format: {:video_output, video_stream_format}
9574
]
9675

9776
{actions, state}
@@ -105,42 +84,42 @@ defmodule Membrane.HLS.Source do
10584
end
10685

10786
@impl true
108-
def handle_info({stream_type, frame}, _ctx, state)
109-
when stream_type in [:audio_stream, :video_stream] do
110-
pad_name =
111-
case stream_type do
112-
:audio_stream -> :audio_output
113-
:video_stream -> :video_output
87+
def handle_info({data_type, sample}, _ctx, state)
88+
when data_type in [:audio_sample_, :video_sample] do
89+
pad_ref =
90+
case data_type do
91+
:audio_sample -> :audio_output
92+
:video_sample -> :video_output
11493
end
11594

11695
state =
11796
state
118-
|> update_in([pad_name, :qex], &Qex.push(&1, frame))
119-
|> update_in([pad_name, :qex_size], &(&1 + 1))
120-
|> update_in([pad_name, :requested], &(&1 - 1))
97+
|> update_in([pad_ref, :qex], &Qex.push(&1, sample))
98+
|> update_in([pad_ref, :qex_size], &(&1 + 1))
99+
|> update_in([pad_ref, :requested], &(&1 - 1))
121100

122-
{[redemand: state[pad_name].pad_ref], state}
101+
{[redemand: pad_ref], state}
123102
end
124103

125-
defp pop_buffers(Pad.ref(pad_name, _id), demand, state) do
126-
range_upperbound = min(state[pad_name].qex_size, demand)
104+
defp pop_buffers(pad_ref, demand, state) do
105+
range_upperbound = min(state[pad_ref].qex_size, demand)
127106

128107
if range_upperbound > 0 do
129108
1..range_upperbound
130109
|> Enum.map_reduce(state, fn _i, state ->
131-
{frame, qex} = state[pad_name].qex |> Qex.pop!()
110+
{%ExHLS.Sample{} = sample, qex} = state[pad_ref].qex |> Qex.pop!()
132111

133112
buffer = %Membrane.Buffer{
134-
payload: frame.payload,
135-
pts: frame.pts |> Membrane.Time.milliseconds(),
136-
dts: frame.dts |> Membrane.Time.milliseconds(),
137-
metadata: frame.metadata
113+
payload: sample.payload,
114+
pts: sample.pts_ms |> Membrane.Time.milliseconds(),
115+
dts: sample.dts_ms |> Membrane.Time.milliseconds(),
116+
metadata: sample.metadata
138117
}
139118

140119
state =
141120
state
142-
|> put_in([pad_name, :qex], qex)
143-
|> update_in([pad_name, :qex_size], &(&1 - 1))
121+
|> put_in([pad_ref, :qex], qex)
122+
|> update_in([pad_ref, :qex_size], &(&1 - 1))
144123

145124
{buffer, state}
146125
end)
@@ -149,30 +128,24 @@ defmodule Membrane.HLS.Source do
149128
end
150129
end
151130

152-
defp request_frames(state) do
131+
defp request_samples(state) do
153132
[:audio_output, :video_output]
154-
|> Enum.reduce(state, fn pad_name, state ->
155-
request_size = state.buffer_size - state[pad_name].qex_size - state[pad_name].requested
156-
:ok = do_request(state, pad_name, request_size)
133+
|> Enum.reduce(state, fn pad_ref, state ->
134+
request_size = state.buffer_size - state[pad_ref].qex_size - state[pad_ref].requested
135+
136+
if request_size > 0 do
137+
1..request_size
138+
|> Enum.each(fn _i -> reuqest_single_sample(pad_ref, state) end)
139+
end
157140

158141
state
159-
|> update_in([pad_name, :requested], &(&1 + request_size))
142+
|> update_in([pad_ref, :requested], &(&1 + request_size))
160143
end)
161144
end
162145

163-
defp do_request(_state, _pad_name, request_size) when request_size < 1, do: :ok
146+
defp request_single_sample(:audio_output, state),
147+
do: ClientGenServer.request_audio_sample(state.client_genserver)
164148

165-
defp do_request(state, :audio_output, request_size) do
166-
1..request_size
167-
|> Enum.each(fn _i ->
168-
ClientGenServer.request_audio(state.client_genserver)
169-
end)
170-
end
171-
172-
defp do_request(state, :video_output, request_size) do
173-
1..request_size
174-
|> Enum.each(fn _i ->
175-
ClientGenServer.request_video(state.client_genserver)
176-
end)
177-
end
149+
defp request_single_sample(:video_output, state),
150+
do: ClientGenServer.request_video_sample(state.client_genserver)
178151
end

lib/membrane_http_adaptive_stream/hls_source/client_genserver.ex

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,24 @@ defmodule Membrane.HLS.Source.ClientGenServer do
88

99
@spec start_link(
1010
String.t(),
11-
ExHLS.DemuxingEngine.CMAF | ExHLS.DemuxingEngine.MPEGTS,
1211
Membrane.HLS.Source.variant_selection_policy()
1312
) ::
1413
GenServer.on_start()
15-
def start_link(url, demuxing_engine, variant_selection_policy) do
14+
def start_link(url, variant_selection_policy) do
1615
GenServer.start_link(__MODULE__,
1716
url: url,
18-
demuxing_engine: demuxing_engine,
1917
variant_selection_policy: variant_selection_policy
2018
)
2119
end
2220

23-
@spec request_audio(pid()) :: :ok
24-
def request_audio(client_genserver) do
25-
GenServer.cast(client_genserver, {:request_audio, self()})
21+
@spec request_audio_sample(pid()) :: :ok
22+
def request_audio_sample(client_genserver) do
23+
GenServer.cast(client_genserver, {:request_audio_sample, self()})
2624
end
2725

28-
@spec request_video(pid()) :: :ok
29-
def request_video(client_genserver) do
30-
GenServer.cast(client_genserver, {:request_video, self()})
26+
@spec request_video_sample(pid()) :: :ok
27+
def request_video_sample(client_genserver) do
28+
GenServer.cast(client_genserver, {:request_video_sample, self()})
3129
end
3230

3331
@spec get_tracks_info(pid()) :: map()
@@ -36,37 +34,48 @@ defmodule Membrane.HLS.Source.ClientGenServer do
3634
end
3735

3836
@impl true
39-
def init(
40-
url: url,
41-
demuxing_engine: demuxing_engine,
42-
variant_selection_policy: variant_selection_policy
43-
) do
37+
def init(url: url, variant_selection_policy: variant_selection_policy) do
4438
state = %{
45-
client: Client.new(url, demuxing_engine),
46-
variant_selection_policy: variant_selection_policy
39+
url: url,
40+
variant_selection_policy: variant_selection_policy,
41+
client: nil
4742
}
4843

49-
{:ok, state |> choose_variant()}
44+
# let's create Client and choose variant asnychronously
45+
# beyond init/1, because it requires doing some HTTP requests
46+
# so it can take some time
47+
self() |> send(:setup)
48+
49+
{:ok, state}
50+
end
51+
52+
@impl true
53+
def handle_info(:setup, state) do
54+
state =
55+
%{state | client: Client.new(state.url)}
56+
|> choose_variant()
57+
58+
{:noreply, state}
5059
end
5160

5261
defp choose_variant(state) do
5362
variants = Client.get_variants(state.client)
54-
get_resolution = fn {_id, %{resolution: {width, height}}} -> width * height end
55-
get_bandwidth = fn {_id, %{bandwidth: bandwidth}} -> bandwidth end
63+
get_resolution_fn = fn {_id, %{resolution: {width, height}}} -> width * height end
64+
get_bandwidth_fn = fn {_id, %{bandwidth: bandwidth}} -> bandwidth end
5665

5766
chosen_variant_id =
5867
case state.variant_selection_policy do
5968
:lowest_resolution ->
60-
variants |> Enum.min_by(get_resolution) |> elem(0)
69+
variants |> Enum.min_by(get_resolution_fn) |> elem(0)
6170

6271
:highest_resolution ->
63-
variants |> Enum.max_by(get_resolution) |> elem(0)
72+
variants |> Enum.max_by(get_resolution_fn) |> elem(0)
6473

6574
:lowest_bandwidth ->
66-
variants |> Enum.min_by(get_bandwidth) |> elem(0)
75+
variants |> Enum.min_by(get_bandwidth_fn) |> elem(0)
6776

6877
:highest_bandwidth ->
69-
variants |> Enum.max_by(get_bandwidth) |> elem(0)
78+
variants |> Enum.max_by(get_bandwidth_fn) |> elem(0)
7079

7180
custom_policy when is_function(custom_policy, 1) ->
7281
variants |> custom_policy.()
@@ -77,16 +86,16 @@ defmodule Membrane.HLS.Source.ClientGenServer do
7786
end
7887

7988
@impl true
80-
def handle_cast({:request_audio, pid}, state) do
81-
{frame, client} = Client.read_audio_frame(state.client)
82-
send(pid, {:audio_stream, frame})
89+
def handle_cast({:request_audio_sample, pid}, state) do
90+
{sample, client} = Client.read_audio_sample(state.client)
91+
send(pid, {:audio_sample, sample})
8392
{:noreply, %{state | client: client}}
8493
end
8594

8695
@impl true
87-
def handle_cast({:request_video, pid}, state) do
88-
{frame, client} = Client.read_video_frame(state.client)
89-
send(pid, {:video_stream, frame})
96+
def handle_cast({:request_video_sample, pid}, state) do
97+
{sample, client} = Client.read_video_sample(state.client)
98+
send(pid, {:video_sample, sample})
9099
{:noreply, %{state | client: client}}
91100
end
92101

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
6969
{:membrane_aac_plugin, "~> 0.19.0"},
7070
{:membrane_h26x_plugin, "~> 0.10.0"},
7171
{:ex_hls,
72-
github: "membraneframework-labs/ex_hls", ref: "d72569595f3a4eeb9d1521dfbc8b2445b6e50943"},
72+
github: "membraneframework-labs/ex_hls", ref: "8de4292b21ebca8080496aade56233fb69e1ba29"},
7373
{:bunch, "~> 1.6"},
7474
{:qex, "~> 0.5"},
7575
{:membrane_hackney_plugin, "~> 0.11.0", only: :test},

mix.lock

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
1010
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
1111
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
12-
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "d72569595f3a4eeb9d1521dfbc8b2445b6e50943", [ref: "d72569595f3a4eeb9d1521dfbc8b2445b6e50943"]},
13-
"ex_m3u8": {:git, "https://github.com/membraneframework/ex_m3u8.git", "8a83e4f6dd14e749aadf551bf1adb7ad4d4eb495", [branch: "allow-empty-default"]},
12+
"ex_hls": {:git, "https://github.com/membraneframework-labs/ex_hls.git", "8de4292b21ebca8080496aade56233fb69e1ba29", [ref: "8de4292b21ebca8080496aade56233fb69e1ba29"]},
13+
"ex_m3u8": {:hex, :ex_m3u8, "0.15.2", "7b13d5c719fa6bd58e1fd54d3d17b27e4c2a21fa10ad74c39e7ec36be165d58a", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "160d31f132f15856fcd4a4c40e448784f3105e5fcd05ac39de6a4ccc8aa4a697"},
1414
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
1515
"finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"},
1616
"hackney": {:hex, :hackney, "1.24.1", "f5205a125bba6ed4587f9db3cc7c729d11316fa8f215d3e57ed1c067a9703fa9", [:rebar3], [{:certifi, "~> 2.15.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.4", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "f4a7392a0b53d8bbc3eb855bdcc919cd677358e65b2afd3840b5b3690c4c8a39"},
@@ -41,7 +41,7 @@
4141
"mimerl": {:hex, :mimerl, "1.4.0", "3882a5ca67fbbe7117ba8947f27643557adec38fa2307490c4c4207624cb213b", [:rebar3], [], "hexpm", "13af15f9f68c65884ecca3a3891d50a7b57d82152792f3e19d88650aa126b144"},
4242
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
4343
"mockery": {:hex, :mockery, "2.3.3", "3dba87bd0422a513e6af6e0d811383f38f82ac6be5d3d285a5fcca9c299bd0ac", [:mix], [], "hexpm", "17282be00613286254298117cd25e607a39f15ac03b41c631f60e52f5b5ec974"},
44-
"mpeg_ts": {:git, "https://github.com/kim-company/kim_mpeg_ts.git", "f3e529b4945c185001884fbdc50d668f1af02c5b", []},
44+
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "8c036fca6558a4339033a5a8697ebf147728f36b", [branch: "backport-v1.0.3"]},
4545
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
4646
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
4747
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},

0 commit comments

Comments
 (0)