Skip to content

Commit 731de4e

Browse files
authored
Allow to record with TCP transport (interleaved mode) (#52)
* Allow to record with TCP transport (interleaved mode) * Allow Server.Conn to handle :rtsp requests from outside the process * support handling of raw rtsp messages in Conn * do not initiate the connection closing when handling RTSP request in Conn * raise in Conn on invalid raw RTSP message * set recording? to false on successful PAUSE request * handle {:error, :closed} gracefully in Conn * refactored code, fixed dialyzer error * renamed recording? flag to recording_with_tcp? * removed the clause that allowed to process a parsed RTSP request in Conn
1 parent 0582419 commit 731de4e

File tree

4 files changed

+112
-14
lines changed

4 files changed

+112
-14
lines changed

lib/membrane_rtsp/server.ex

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,20 @@ defmodule Membrane.RTSP.Server do
9797
GenServer.call(server, :port_number)
9898
end
9999

100+
@doc """
101+
In interleaved TCP mode we want to pass control over the client connection socket to the pipeline (usually).
102+
103+
This function allows to transfer the control over such socket to a specified process.
104+
"""
105+
@spec transfer_client_socket_control(
106+
server_pid :: pid() | GenServer.name(),
107+
client_conn_pid :: pid(),
108+
new_controlling_process_pid :: pid()
109+
) :: :ok | {:error, :unknown_conn | :closed | :not_owner | :badarg | :inet.posix()}
110+
def transfer_client_socket_control(server, conn_pid, new_controlling_process) do
111+
GenServer.call(server, {:transfer_client_socket_control, conn_pid, new_controlling_process})
112+
end
113+
100114
@impl true
101115
def init(config) do
102116
address = config[:address] || :any
@@ -147,6 +161,21 @@ defmodule Membrane.RTSP.Server do
147161
{:reply, :inet.port(state.socket), state}
148162
end
149163

164+
@impl true
165+
def handle_call(
166+
{:transfer_client_socket_control, conn_pid, new_controlling_process},
167+
_from,
168+
state
169+
) do
170+
case Map.fetch(state.client_conns, conn_pid) do
171+
{:ok, socket} ->
172+
{:reply, :gen_tcp.controlling_process(socket, new_controlling_process), state}
173+
174+
:error ->
175+
{:reply, {:error, :unknown_conn}, state}
176+
end
177+
end
178+
150179
@impl true
151180
def handle_info({:new_connection, client_socket}, state) do
152181
child_state =
@@ -187,6 +216,7 @@ defmodule Membrane.RTSP.Server do
187216
defp do_listen(socket, parent_pid) do
188217
case :gen_tcp.accept(socket) do
189218
{:ok, client_socket} ->
219+
:ok = :gen_tcp.controlling_process(client_socket, parent_pid)
190220
send(parent_pid, {:new_connection, client_socket})
191221
do_listen(socket, parent_pid)
192222

lib/membrane_rtsp/server/conn.ex

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,39 @@ defmodule Membrane.RTSP.Server.Conn do
2929

3030
@impl true
3131
def handle_continue(:process_client_requests, state) do
32-
do_process_client_requests(state, state.session_timeout)
33-
state.request_handler.handle_closed_connection(state.request_handler_state)
34-
{:stop, :normal, state}
32+
case do_process_client_requests(state, state.session_timeout) do
33+
%Logic.State{recording_with_tcp?: true} = state ->
34+
{:noreply, state}
35+
36+
_other ->
37+
state.request_handler.handle_closed_connection(state.request_handler_state)
38+
{:stop, :normal, state}
39+
end
40+
end
41+
42+
@impl true
43+
def handle_info({:rtsp, raw_rtsp_request}, state) do
44+
with {:ok, request} <- Request.parse(raw_rtsp_request) do
45+
case Logic.process_request(request, state) do
46+
%Logic.State{recording_with_tcp?: true} = state ->
47+
{:noreply, state}
48+
49+
state ->
50+
handle_continue(:process_client_requests, state)
51+
end
52+
else
53+
{:error, error} ->
54+
raise "Failed to parse RTSP request: #{inspect(error)}.\nRequest: #{inspect(raw_rtsp_request)}"
55+
end
3556
end
3657

3758
defp do_process_client_requests(state, timeout) do
3859
with {:ok, request} <- get_request(state.socket, timeout) do
3960
case Logic.process_request(request, state) do
40-
%{session_state: :recording} = state ->
61+
%Logic.State{recording_with_tcp?: true} = state ->
62+
state
63+
64+
%Logic.State{session_state: :recording} = state ->
4165
do_process_client_requests(state, :infinity)
4266

4367
state ->

lib/membrane_rtsp/server/logic.ex

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@ defmodule Membrane.RTSP.Server.Logic do
4141
:rtcp_socket,
4242
:request_handler_state,
4343
:session_timeout,
44+
:transport_opts,
4445
configured_media: %{},
4546
incoming_media: %{},
4647
session_id: UUID.uuid4(),
47-
session_state: :init
48+
session_state: :init,
49+
recording_with_tcp?: false
4850
]
4951

5052
@type t :: %__MODULE__{
@@ -111,7 +113,9 @@ defmodule Membrane.RTSP.Server.Logic do
111113
)
112114

113115
{response, state} = do_handle_setup_response(request, response, transport_opts, state)
114-
{response, %{state | request_handler_state: request_handler_state}}
116+
117+
{response,
118+
%{state | request_handler_state: request_handler_state, transport_opts: transport_opts}}
115119
else
116120
error ->
117121
Logger.error("SETUP request failed due to: #{inspect(error)}")
@@ -144,7 +148,12 @@ defmodule Membrane.RTSP.Server.Logic do
144148
if state.session_state == :playing, do: :paused_playing, else: :paused_recording
145149

146150
{response,
147-
%{state | request_handler_state: request_handler_state, session_state: session_state}}
151+
%{
152+
state
153+
| request_handler_state: request_handler_state,
154+
session_state: session_state,
155+
recording_with_tcp?: false
156+
}}
148157
else
149158
{response, %{state | request_handler_state: request_handler_state}}
150159
end
@@ -176,10 +185,19 @@ defmodule Membrane.RTSP.Server.Logic do
176185
{response, handler_state} =
177186
state.request_handler.handle_record(state.incoming_media, state.request_handler_state)
178187

188+
tcp_interleaved_mode? =
189+
state.transport_opts[:transport] == :TCP
190+
179191
if Response.ok?(response) do
180-
{response, %{state | request_handler_state: handler_state, session_state: :recording}}
192+
{response,
193+
%{
194+
state
195+
| request_handler_state: handler_state,
196+
session_state: :recording,
197+
recording_with_tcp?: tcp_interleaved_mode?
198+
}}
181199
else
182-
{response, %{state | request_handler_state: handler_state}}
200+
{response, %{state | request_handler_state: handler_state, recording_with_tcp?: false}}
183201
end
184202
end
185203

@@ -189,7 +207,16 @@ defmodule Membrane.RTSP.Server.Logic do
189207

190208
Response.new(200)
191209
|> inject_session_header(state)
192-
|> then(&{&1, %{state | configured_media: %{}, incoming_media: %{}, session_state: :init}})
210+
|> then(
211+
&{&1,
212+
%{
213+
state
214+
| configured_media: %{},
215+
incoming_media: %{},
216+
session_state: :init,
217+
recording_with_tcp?: false
218+
}}
219+
)
193220
end
194221

195222
defp do_handle_request(%Request{method: "TEARDOWN"}, state) do
@@ -200,7 +227,16 @@ defmodule Membrane.RTSP.Server.Logic do
200227

201228
response
202229
|> inject_session_header(state)
203-
|> then(&{&1, %{state | session_state: :init, configured_media: %{}, incoming_media: %{}}})
230+
|> then(
231+
&{&1,
232+
%{
233+
state
234+
| session_state: :init,
235+
configured_media: %{},
236+
incoming_media: %{},
237+
recording_with_tcp?: false
238+
}}
239+
)
204240
end
205241

206242
defp do_handle_request(%Request{}, state) do
@@ -215,9 +251,6 @@ defmodule Membrane.RTSP.Server.Logic do
215251
transport_opts[:network_mode] == :multicast ->
216252
{:error, :multicast_not_supported}
217253

218-
transport_opts[:mode] == :record and transport != :UDP ->
219-
{:error, :unsupported_transport}
220-
221254
transport_opts[:mode] == :play and transport == :UDP and is_nil(state.rtp_socket) ->
222255
{:error, :udp_not_supported}
223256

test/membrane_rtsp/server/server_logic_test.exs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,17 @@ defmodule Membrane.RTSP.ServerLogicTest do
303303
%Request{method: "TEARDOWN", path: @url}
304304
|> Logic.process_request(state)
305305
end
306+
307+
test "resets recording_with_tcp? flag", %{state: state} do
308+
state = %State{state | recording_with_tcp?: true, session_state: :playing}
309+
310+
mock(FakeHandler, [respond: 2], fn nil, state -> {Response.new(200), state} end)
311+
mock(:gen_tcp, [send: 2], fn %{}, response -> assert response =~ "RTSP/1.0 200 OK" end)
312+
313+
assert %{recording_with_tcp?: false} =
314+
%Request{method: "TEARDOWN", path: @url}
315+
|> Logic.process_request(state)
316+
end
306317
end
307318

308319
test "return 501 (Not Implemented) for not supported methods", %{state: state} do

0 commit comments

Comments
 (0)