Skip to content

Commit cd823c7

Browse files
authored
Fix handle closed connection not getting the right state (#57)
* Fix handle closed connection not getting the right state * Bump version * Add catchall mapping for response status * Fix credo warning * Fix code reviews * Fix code reviews
1 parent 0625acb commit cd823c7

File tree

11 files changed

+61
-37
lines changed

11 files changed

+61
-37
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ of dependencies in `mix.exs`:
1818
```elixir
1919
def deps do
2020
[
21-
{:membrane_rtsp, "~> 0.11.0"}
21+
{:membrane_rtsp, "~> 0.12.0"}
2222
]
2323
end
2424
```

lib/membrane_rtsp/request.ex

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,8 @@ defmodule Membrane.RTSP.Request do
165165
algorithm
166166
"""
167167
@spec process_uri(t(), URI.t()) :: binary()
168-
def process_uri(request, uri) do
169-
%URI{uri | userinfo: nil}
170-
|> apply_path(request)
168+
def process_uri(request, %URI{} = uri) do
169+
apply_path(%URI{uri | userinfo: nil}, request)
171170
end
172171

173172
defp apply_path(%URI{} = base_url, %__MODULE__{path: nil}), do: base_url

lib/membrane_rtsp/response.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ defmodule Membrane.RTSP.Response do
5959
def parse(response) do
6060
[headers, body] = String.split(response, ["\r\n\r\n", "\n\n", "\r\r"], parts: 2)
6161

62-
with {:ok, {response, headers}} <- parse_start_line(headers),
62+
with {:ok, {%__MODULE__{} = response, headers}} <- parse_start_line(headers),
6363
{:ok, headers} <- parse_headers(headers),
6464
{:ok, body} <- parse_body(body, headers) do
6565
{:ok, %__MODULE__{response | headers: headers, body: body}}
@@ -110,7 +110,7 @@ defmodule Membrane.RTSP.Response do
110110
headers = Enum.at(split_response, 0)
111111
body = Enum.at(split_response, 1)
112112

113-
with {:ok, {response, headers}} <- parse_start_line(headers),
113+
with {:ok, {%__MODULE__{} = response, headers}} <- parse_start_line(headers),
114114
{:ok, headers} <- parse_headers(headers),
115115
false <- is_nil(body),
116116
body_size <- byte_size(body),
@@ -239,6 +239,8 @@ defmodule Membrane.RTSP.Response do
239239
defp render_status(404), do: "Not Found"
240240
defp render_status(405), do: "Method Not Allowed"
241241
defp render_status(455), do: "Method Not Valid In This State"
242+
defp render_status(461), do: "Unsupported Transport"
242243
defp render_status(500), do: "Internal Server Error"
243244
defp render_status(501), do: "Not Implemented"
245+
defp render_status(_status), do: "Generic Error"
244246
end

lib/membrane_rtsp/rtsp.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ defmodule Membrane.RTSP do
7575

7676
@doc """
7777
Transfer the control of the TCP socket the session was using to a new process. For more information see `:gen_tcp.controlling_process/2`.
78-
From now on the session won't try to receive responses to requests from the socket, since now an other process is controlling it.
78+
From now on the session won't try to receive responses to requests from the socket, since now an other process is controlling it.
7979
Instead of this, the session will synchronously wait for the response to be supplied with `handle_response/2`.
8080
"""
8181
@spec transfer_socket_control(t(), pid()) ::
@@ -267,7 +267,7 @@ defmodule Membrane.RTSP do
267267
defp parse_response(raw_response, state) do
268268
with {:ok, parsed_response} <- Response.parse(raw_response),
269269
{:ok, state} <- handle_session_id(parsed_response, state),
270-
{:ok, state} <- detect_authentication_type(parsed_response, state) do
270+
{:ok, %State{} = state} <- detect_authentication_type(parsed_response, state) do
271271
state = %State{state | cseq: state.cseq + 1}
272272
{:ok, parsed_response, state}
273273
end
@@ -338,7 +338,7 @@ defmodule Membrane.RTSP do
338338
# Some responses do not have to return the Session ID
339339
# If it does return one, it needs to match one stored in the state.
340340
@spec handle_session_id(Response.t(), State.t()) :: {:ok, State.t()} | {:error, reason :: any()}
341-
defp handle_session_id(%Response{} = response, state) do
341+
defp handle_session_id(%Response{} = response, %State{} = state) do
342342
with {:ok, session_value} <- Response.get_header(response, "Session") do
343343
[session_id | _rest] = String.split(session_value, ";")
344344

lib/membrane_rtsp/server.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ defmodule Membrane.RTSP.Server do
124124
reuseaddr: true
125125
])
126126

127+
{:ok, port} = :inet.port(socket)
128+
Logger.info("RTSP server listening on port #{port}")
129+
127130
udp_rtp_port = config[:udp_rtp_port]
128131
udp_rtcp_port = config[:udp_rtcp_port]
129132

lib/membrane_rtsp/server/conn.ex

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ defmodule Membrane.RTSP.Server.Conn do
3333
%Logic.State{recording_with_tcp?: true} = state ->
3434
{:noreply, state}
3535

36-
_other ->
36+
{:error, _reason, state} ->
3737
state.request_handler.handle_closed_connection(state.request_handler_state)
3838
{:stop, :normal, state}
3939
end
@@ -56,17 +56,21 @@ defmodule Membrane.RTSP.Server.Conn do
5656
end
5757

5858
defp do_process_client_requests(state, timeout) do
59-
with {:ok, request} <- get_request(state.socket, timeout) do
60-
case Logic.process_request(request, state) do
61-
%Logic.State{recording_with_tcp?: true} = state ->
62-
state
59+
case get_request(state.socket, timeout) do
60+
{:ok, request} ->
61+
case Logic.process_request(request, state) do
62+
%Logic.State{recording_with_tcp?: true} = state ->
63+
state
6364

64-
%Logic.State{session_state: :recording} = state ->
65-
do_process_client_requests(state, :infinity)
65+
%Logic.State{session_state: :recording} = state ->
66+
do_process_client_requests(state, :infinity)
6667

67-
state ->
68-
do_process_client_requests(state, state.session_timeout)
69-
end
68+
state ->
69+
do_process_client_requests(state, state.session_timeout)
70+
end
71+
72+
{:error, reason} ->
73+
{:error, reason, state}
7074
end
7175
end
7276

@@ -90,7 +94,7 @@ defmodule Membrane.RTSP.Server.Conn do
9094

9195
defp do_parse_request([raw_request, body]) do
9296
case Request.parse(raw_request <> "\r\n\r\n") do
93-
{:ok, request} ->
97+
{:ok, %Request{} = request} ->
9498
content_length =
9599
case Request.get_header(request, "Content-Length") do
96100
{:ok, value} -> String.to_integer(value)

lib/membrane_rtsp/server/handler.ex

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,27 @@ defmodule Membrane.RTSP.Server.Handler do
201201
@impl true
202202
def init(config), do: config
203203

204+
@impl true
205+
def handle_open_connection(_conn, state), do: state
206+
204207
@impl true
205208
def handle_announce(_req, state), do: {Response.new(501), state}
206209

207210
@impl true
208211
def handle_record(_req, state), do: {Response.new(501), state}
209212

210-
defoverridable init: 1, handle_announce: 2, handle_record: 2
213+
@impl true
214+
def handle_pause(state), do: {Response.new(501), state}
215+
216+
@impl true
217+
def handle_closed_connection(_state), do: :ok
218+
219+
defoverridable init: 1,
220+
handle_open_connection: 2,
221+
handle_announce: 2,
222+
handle_record: 2,
223+
handle_pause: 1,
224+
handle_closed_connection: 1
211225
end
212226
end
213227
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Membrane.RTSP.MixProject do
22
use Mix.Project
33

4-
@version "0.11.0"
4+
@version "0.12.0"
55
@github_url "https://github.com/membraneframework/membrane_rtsp"
66

77
def project do

mix.lock

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
%{
22
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
33
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
4-
"credo": {:hex, :credo, "1.7.13", "126a0697df6b7b71cd18c81bc92335297839a806b6f62b61d417500d1070ff4e", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "47641e6d2bbff1e241e87695b29f617f1a8f912adea34296fb10ecc3d7e9e84f"},
4+
"credo": {:hex, :credo, "1.7.15", "283da72eeb2fd3ccf7248f4941a0527efb97afa224bcdef30b4b580bc8258e1c", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "291e8645ea3fea7481829f1e1eb0881b8395db212821338e577a90bf225c5607"},
55
"dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"},
66
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
77
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
88
"erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"},
9-
"ex_doc": {:hex, :ex_doc, "0.39.1", "e19d356a1ba1e8f8cfc79ce1c3f83884b6abfcb79329d435d4bbb3e97ccc286e", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "8abf0ed3e3ca87c0847dfc4168ceab5bedfe881692f1b7c45f4a11b232806865"},
10-
"ex_sdp": {:hex, :ex_sdp, "1.1.1", "1a7b049491e5ec02dad9251c53d960835dc5631321ae978ec331831f3e4f6d5f", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "1b13a72ac9c5c695b8824dbdffc671be8cbb4c0d1ccb4ff76a04a6826759f233"},
9+
"ex_doc": {:hex, :ex_doc, "0.39.3", "519c6bc7e84a2918b737aec7ef48b96aa4698342927d080437f61395d361dcee", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "0590955cf7ad3b625780ee1c1ea627c28a78948c6c0a9b0322bd976a079996e1"},
10+
"ex_sdp": {:hex, :ex_sdp, "1.1.2", "7e7465cb13b557cc76ef3e854bad7626b73cc1d1f480d38b5fbcf539c7d8a45d", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "50a27c2d745924679acca32b3d5499d0b35d135a180b83422df82c289afce564"},
1111
"file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"},
1212
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
1313
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
1414
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
15-
"makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"},
15+
"makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"},
1616
"mix_test_watch": {:hex, :mix_test_watch, "1.4.0", "d88bcc4fbe3198871266e9d2f00cd8ae350938efbb11d3fa1da091586345adbb", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "2b4693e17c8ead2ef56d4f48a0329891e8c2d0d73752c0f09272a2b17dc38d1b"},
1717
"mockery": {:hex, :mockery, "2.5.0", "a87acd74fd733aa3b9cb5663d6f690178b056608f2652f18e4ec423ddd5496ed", [:mix], [], "hexpm", "52492b2eba61055df1c626e894663b624b5e6fdfaaaba1d9a8596236fbf4da69"},
1818
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},

test/membrane_rtsp/server/server_logic_test.exs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ defmodule Membrane.RTSP.ServerLogicTest do
224224
|> Logic.process_request(state)
225225
end
226226

227-
test "not allowed when playing", %{state: state} do
227+
test "not allowed when playing", %{state: %State{} = state} do
228228
state = %State{state | session_state: :playing}
229229

230230
mock(:gen_tcp, [send: 2], fn %{}, response ->
@@ -238,7 +238,7 @@ defmodule Membrane.RTSP.ServerLogicTest do
238238
end
239239

240240
describe "handle PLAY request" do
241-
test "handle PLAY request", %{state: state} do
241+
test "handle PLAY request", %{state: %State{} = state} do
242242
uri = %URI{@url | path: "/stream/trackId=0"} |> URI.to_string()
243243

244244
configured_media = %{
@@ -279,7 +279,7 @@ defmodule Membrane.RTSP.ServerLogicTest do
279279
end
280280

281281
describe "handle TEARDOWN request" do
282-
test "Re-initialize the session if it's not playing", %{state: state} do
282+
test "Re-initialize the session if it's not playing", %{state: %State{} = state} do
283283
state = %State{
284284
state
285285
| session_state: :ready,
@@ -293,7 +293,7 @@ defmodule Membrane.RTSP.ServerLogicTest do
293293
|> Logic.process_request(state)
294294
end
295295

296-
test "free resources", %{state: state} do
296+
test "free resources", %{state: %State{} = state} do
297297
state = %State{state | session_state: :playing}
298298

299299
mock(FakeHandler, [respond: 2], fn nil, state -> {Response.new(200), state} end)
@@ -304,7 +304,7 @@ defmodule Membrane.RTSP.ServerLogicTest do
304304
|> Logic.process_request(state)
305305
end
306306

307-
test "resets recording_with_tcp? flag", %{state: state} do
307+
test "resets recording_with_tcp? flag", %{state: %State{} = state} do
308308
state = %State{state | recording_with_tcp?: true, session_state: :playing}
309309

310310
mock(FakeHandler, [respond: 2], fn nil, state -> {Response.new(200), state} end)

0 commit comments

Comments
 (0)