Skip to content

Commit 66ba60d

Browse files
committed
Handle case when ex_hls has to add some latency to Live HLS stream
1 parent 965ee53 commit 66ba60d

File tree

5 files changed

+58
-26
lines changed

5 files changed

+58
-26
lines changed

lib/membrane_http_adaptive_stream/source.ex

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,12 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
120120
state =
121121
Map.from_struct(opts)
122122
|> Map.merge(%{
123-
# status will be either :initialized, :waiting_on_pads or :streaming
123+
# status will be either:
124+
# - :initialized
125+
# - :waiting_on_client_genserver_ready
126+
# - :client_genserver_ready
127+
# - :waiting_on_pads
128+
# - :streaming
124129
status: :initialized,
125130
client_genserver: nil,
126131
stream: nil,
@@ -166,20 +171,14 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
166171

167172
@impl true
168173
def handle_playing(ctx, state) do
169-
# both start_streaming/1 and generate_new_tracks_notification/1 functions
170-
# call ClientGenServer.get_tracks_info/1 that triggers downloading first
171-
# segments of the HLS stream
174+
:ok = spawn_client_genserver(ctx, state)
172175

173-
state = create_client_gen_server(ctx, state)
176+
self() |> Process.send_after(:client_genserver_ready_timeout, 60_000)
174177

175-
if Map.values(state.pad_refs) != [nil, nil] do
176-
state |> start_streaming()
177-
else
178-
state |> generate_new_tracks_notification()
179-
end
178+
{[], %{state | status: :waiting_on_client_genserver_ready}}
180179
end
181180

182-
defp create_client_gen_server(ctx, state) do
181+
defp spawn_client_genserver(ctx, state) do
183182
start_link_arg = %{
184183
url: state.url,
185184
variant_selection_policy: state.variant_selection_policy,
@@ -192,18 +191,10 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
192191
{__MODULE__.ClientGenServer, start_link_arg}
193192
)
194193

195-
client_genserver =
196-
receive do
197-
{:client_genserver, client_genserver} -> client_genserver
198-
after
199-
5_000 ->
200-
raise "Timeout waiting for #{inspect(__MODULE__)}.ClientGenServer initialization"
201-
end
202-
203-
%{state | client_genserver: client_genserver}
194+
:ok
204195
end
205196

206-
defp generate_new_tracks_notification(%{status: :initialized} = state) do
197+
defp generate_new_tracks_notification(%{status: :client_genserver_ready} = state) do
207198
tracks_info = ClientGenServer.get_tracks_info(state.client_genserver)
208199

209200
new_tracks =
@@ -228,7 +219,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
228219
end
229220

230221
defp start_streaming(%{status: status} = state)
231-
when status in [:initialized, :waiting_on_pads] do
222+
when status in [:client_genserver_ready, :waiting_on_pads] do
232223
actions = get_stream_formats(state) ++ get_redemands(state)
233224
state = %{state | status: :streaming}
234225
{actions, state}
@@ -354,6 +345,46 @@ defmodule Membrane.HTTPAdaptiveStream.Source do
354345
{[], state}
355346
end
356347

348+
@impl true
349+
def handle_demand(pad_ref, demand, :buffers, _ctx, state)
350+
when state.status == :waiting_on_client_genserver_ready do
351+
Membrane.Logger.debug("""
352+
Ignoring demand (#{inspect(demand)} buffers) on pad #{inspect(pad_ref)} because this \
353+
element is still waiting for ExHLS to start downloading multimedia segments.
354+
""")
355+
356+
{[], state}
357+
end
358+
359+
@impl true
360+
def handle_info({:client_genserver_ready, client_genserver}, _ctx, state)
361+
when state.status == :waiting_on_client_genserver_ready do
362+
state = %{
363+
state
364+
| client_genserver: client_genserver,
365+
status: :client_genserver_ready
366+
}
367+
368+
# both start_streaming/1 and generate_new_tracks_notification/1 functions
369+
# call ClientGenServer.get_tracks_info/1 that triggers downloading first
370+
# segments of the HLS stream
371+
372+
if Map.values(state.pad_refs) != [nil, nil] do
373+
state |> start_streaming()
374+
else
375+
state |> generate_new_tracks_notification()
376+
end
377+
end
378+
379+
@impl true
380+
def handle_info(:client_genserver_ready_timeout, _ctx, state) do
381+
if state.status == :waiting_on_client_genserver_ready do
382+
raise "Timeout while waiting for ExHLS to download first media segments."
383+
end
384+
385+
{[], state}
386+
end
387+
357388
@impl true
358389
def handle_info({:chunk, %ExHLS.Chunk{} = chunk}, _ctx, state) do
359390
buffer =

lib/membrane_http_adaptive_stream/source/client_genserver.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ defmodule Membrane.HTTPAdaptiveStream.Source.ClientGenServer do
8383
tracks_info: tracks_info
8484
}
8585

86-
send(state.source, {:client_genserver, self()})
86+
send(state.source, {:client_genserver_ready, self()})
8787

8888
{:noreply, state}
8989
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
6969
{:membrane_aac_plugin, "~> 0.19.0"},
7070
{:membrane_h26x_plugin, "~> 0.10.0"},
7171
{:stream_split, "~> 0.1.7"},
72-
{:ex_hls, "~> 0.1.1"},
72+
{:ex_hls, "~> 0.1.2"},
7373
{:bunch, "~> 1.6"},
7474
{:qex, "~> 0.5"},
7575
{:muontrap, "~> 1.6", only: :test},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
1414
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
1515
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
16-
"ex_hls": {:hex, :ex_hls, "0.1.1", "71cfe9999dd7330ff252fc7bd03fb5ccb9d312eed7cafc30f489de6c9db2462d", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "260269c1459ab51557383c3a72e0ed21721db0dd3e38b10987decb336f8e902b"},
16+
"ex_hls": {:hex, :ex_hls, "0.1.2", "fca2c2a4ddf8459b9a47bf1fd6552c5d74cccf5dc72f56cf87129111c3e2f8ee", [:mix], [{:ex_m3u8, "~> 0.15.3", [hex: :ex_m3u8, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.2", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.36.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:mpeg_ts, "~> 2.0.0", [hex: :mpeg_ts, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "8129921a863918999cda4032cf66e5e4de9fa24faa33cee28a09b34f13438a49"},
1717
"ex_m3u8": {:hex, :ex_m3u8, "0.15.3", "c10427f450b2ed7bfd85808d8dce21214f1fe9fa18927591cbbf96fea0a6a8aa", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "99f20c0b44bab130dc6aca71fefe0d1a174413ae9ac2763220994b29bd310939"},
1818
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
1919
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},

test/membrane_http_adaptive_stream/integration_test/source_test.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,13 @@ defmodule Membrane.HTTPAdaptiveStream.Source.Test do
159159
describe "Membrane.HTTPAdaptiveStream.Source demuxes audio and video from" do
160160
@tag :live
161161
@tag :tmp_dir
162+
@tag :x
162163
test "Live HLS stream", %{tmp_dir: tmp_dir} do
163164
index_m3u8 = Path.join(tmp_dir, "index.m3u8")
164165
generate_live_hls(@bbb_33s_mp4_url, index_m3u8)
165166

166167
await_until_file_exists(index_m3u8)
167-
Process.sleep(7_000)
168+
# Process.sleep(1_000)
168169

169170
audio_result_file = Path.join(tmp_dir, "audio.aac")
170171
video_result_file = Path.join(tmp_dir, "video.h264")

0 commit comments

Comments
 (0)