Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,35 @@ defmodule Membrane.RTMPServer do
like sending the reference to another process. The function should return a `t:#{inspect(__MODULE__)}.client_behaviour_spec/0`
which defines how the client should behave.
- port: Port on which RTMP server will listen. Defaults to 1935.
- use_ssl?: If true, SSL socket (for RTMPS) will be used. Othwerwise, TCP socket (for RTMP) will be used. Defaults to false.
- use_ssl?: If true, SSL socket (for RTMPS) will be used. Otherwise, TCP socket (for RTMP) will be used. Defaults to false.
- ssl_options: SSL options to configure the SSL socket.
- client_timeout: Time after which an unused client connection is automatically closed, expressed in `Membrane.Time.t()` units. Defaults to 5 seconds.
- name: If not nil, value of this field will be used as a name under which the server's process will be registered. Defaults to nil.

## SSL Configuration

SSL options can be configured at the application level or passed as runtime options.

### Application Configuration

config :membrane_rtmp_plugin, :ssl,
certfile: "/path/to/cert.pem",
keyfile: "/path/to/key.pem",
verify: :verify_none,
fail_if_no_peer_cert: false,
versions: [:"tlsv1.2", :"tlsv1.3"]

### Runtime Options

Membrane.RTMPServer.start_link(
port: 1935,
use_ssl?: true,
ssl_options: [
certfile: "/path/to/cert.pem",
keyfile: "/path/to/key.pem"
],
handle_new_client: &my_handler/3
)
"""
use GenServer

Expand All @@ -26,6 +52,7 @@ defmodule Membrane.RTMPServer do
@type t :: [
port: :inet.port_number(),
use_ssl?: boolean(),
ssl_options: keyword() | nil,
name: atom() | nil,
handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
client_behaviour_spec()),
Expand All @@ -35,6 +62,7 @@ defmodule Membrane.RTMPServer do
@default_options %{
port: 1935,
use_ssl?: false,
ssl_options: nil,
name: nil,
client_timeout: Membrane.Time.seconds(5)
}
Expand All @@ -58,6 +86,15 @@ defmodule Membrane.RTMPServer do
server_options_map = Enum.into(server_options, %{})
server_options_map = Map.merge(@default_options, server_options_map)

ssl_options_map =
if is_nil(server_options[:ssl_options]) do
%{ssl_options: Application.get_env(:membrane_rtmp_plugin, :ssl, [])}
else
%{ssl_options: server_options[:ssl_options]}
end

server_options_map = Map.merge(server_options_map, ssl_options_map)

GenServer.start_link(__MODULE__, server_options_map, gen_server_opts)
end

Expand Down
107 changes: 88 additions & 19 deletions lib/membrane_rtmp_plugin/rtmp_server/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,114 @@ defmodule Membrane.RTMPServer.Listener do
require Logger
alias Membrane.RTMPServer.ClientHandler

@listen_opts [
:binary,
packet: :raw,
active: false,
reuseaddr: true
]

@ssl_handshake_opts [
:certfile,
:keyfile,
:cacertfile,
:password,
:versions
]

@ssl_listen_opts [
:verify,
:fail_if_no_peer_cert,
:versions,
:ciphers,
:honor_cipher_order,
:secure_renegotiate,
:reuse_sessions,
:cacertfile,
:depth,
:log_level
]

@spec run(
options :: %{
use_ssl?: boolean(),
socket_module: :gen_tcp | :ssl,
server: pid(),
port: non_neg_integer()
port: non_neg_integer(),
ssl_options: keyword()
}
) :: no_return()
def run(options) do
options = Map.merge(options, %{socket_module: if(options.use_ssl?, do: :ssl, else: :gen_tcp)})

listen_options =
if options.use_ssl? do
certfile = System.get_env("CERT_PATH")
keyfile = System.get_env("CERT_KEY_PATH")

[
:binary,
packet: :raw,
active: false,
certfile: certfile,
keyfile: keyfile
]
ssl_opts =
options
|> Map.get(:ssl_options, [])
|> Keyword.take(@ssl_listen_opts)

Logger.debug("SSL options for listen: #{inspect(ssl_opts)}")

combined = @listen_opts ++ ssl_opts
Logger.debug("Combined listen options: #{inspect(combined)}")
combined
else
[
:binary,
packet: :raw,
active: false
]
@listen_opts
end

{:ok, socket} = options.socket_module.listen(options.port, listen_options)
send(options.server, {:port, :inet.port(socket)})

port =
case options.socket_module do
:gen_tcp ->
{:ok, {_ip, port}} = :inet.sockname(socket)
port

:ssl ->
{:ok, {_ip, port}} = :ssl.sockname(socket)
port
end

send(options.server, {:port, port})

accept_loop(socket, options)
end

defp accept_loop(socket, options) do
{:ok, client} = :gen_tcp.accept(socket)
client =
case options.socket_module do
:gen_tcp ->
{:ok, client} = :gen_tcp.accept(socket)
client

:ssl ->
{:ok, client} = :ssl.transport_accept(socket)
Logger.debug("SSL transport accept successful, starting handshake...")

ssl_handshake_opts =
options
|> Map.get(:ssl_options, [])
|> Keyword.take(@ssl_handshake_opts)

ssl_handshake_opts =
ssl_handshake_opts
|> Keyword.put(:verify, :verify_none)
|> Keyword.put(:fail_if_no_peer_cert, false)

Logger.debug("SSL handshake options: #{inspect(ssl_handshake_opts)}")

case :ssl.handshake(client, ssl_handshake_opts, 10_000) do
{:ok, ssl_socket} ->
Logger.info("SSL handshake successful")
ssl_socket

{:error, reason} ->
Logger.error("SSL handshake failed: #{inspect(reason)}")
:ssl.close(client)
accept_loop(socket, options)
end
end

{:ok, client_reference} =
GenServer.start_link(ClientHandler,
Expand All @@ -56,7 +125,7 @@ defmodule Membrane.RTMPServer.Listener do
client_timeout: options.client_timeout
)

case :gen_tcp.controlling_process(client, client_reference) do
case options.socket_module.controlling_process(client, client_reference) do
:ok ->
send(client_reference, :control_granted)

Expand Down
49 changes: 45 additions & 4 deletions test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,27 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
assert ffmpeg_result == :error
end

@tag :tmp_dir
@tag :rtmps
test "SourceBin allows for RTMPS connection" do
test "SourceBin allows for RTMPS connection", %{tmp_dir: tmp_dir} do
self = self()

cert_path = Path.join(tmp_dir, "test_cert.pem")
key_path = Path.join(tmp_dir, "test_key.pem")
generate_test_certificates(cert_path, key_path)

# Test SSL listen options separately
ssl_config = [
certfile: cert_path,
keyfile: key_path,
verify: :verify_none,
fail_if_no_peer_cert: false,
versions: [:"tlsv1.2", :"tlsv1.3"]
]

pipeline_startup_task =
Task.async(fn ->
start_pipeline_with_external_rtmp_server(@app, @stream_key, self, 0, true)
start_pipeline_with_external_rtmp_server(@app, @stream_key, self, 0, true, ssl_config)
end)

port =
Expand Down Expand Up @@ -216,7 +230,8 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
stream_key,
parent,
port \\ 0,
use_ssl? \\ false
use_ssl? \\ false,
ssl_options \\ []
) do
parent_process_pid = self()

Expand All @@ -229,11 +244,12 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
Membrane.RTMPServer.start_link(
port: port,
use_ssl?: use_ssl?,
ssl_options: ssl_options,
handle_new_client: handle_new_client,
client_timeout: Membrane.Time.seconds(3)
)

{:ok, assigned_port} = Membrane.RTMPServer.get_port(server_pid)
assigned_port = Membrane.RTMPServer.get_port(server_pid)

send(parent, {:port, assigned_port})

Expand Down Expand Up @@ -327,4 +343,29 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
|> Map.merge(%{stream_length: stream_length, last_dts: -1, buffers: 0})
|> assert_buffers()
end

defp generate_test_certificates(cert_path, key_path) do
{_, 0} =
System.cmd("openssl", [
"genrsa",
"-out",
key_path,
"2048"
])

{_, 0} =
System.cmd("openssl", [
"req",
"-new",
"-x509",
"-key",
key_path,
"-out",
cert_path,
"-days",
"1",
"-subj",
"/C=US/ST=Test/L=Test/O=Test/CN=localhost"
])
end
end
2 changes: 1 addition & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ExUnit.start(capture_log: true, exclude: [:rtmps])
ExUnit.start(capture_log: true, exclude: [])