Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_webrtc_plugin` to your list of
```elixir
def deps do
[
{:membrane_webrtc_plugin, "~> 0.23.3"}
{:membrane_webrtc_plugin, "~> 0.24.0"}
]
end
```
Expand Down
16 changes: 8 additions & 8 deletions lib/membrane_webrtc/ex_webrtc/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

alias ExRTCP.Packet.PayloadFeedback.PLI

alias Membrane.WebRTC.{ExWebRTCUtils, SignalingChannel, SimpleWebSocketServer}
alias Membrane.WebRTC.{ExWebRTCUtils, Signaling, SimpleWebSocketServer}

def_options signaling: [], tracks: [], video_codec: [], ice_servers: []

Expand Down Expand Up @@ -53,7 +53,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
SimpleWebSocketServer.start_link_supervised(ctx.utility_supervisor, opts)

{:whip, opts} ->
signaling = SignalingChannel.new()
signaling = Signaling.new()

Membrane.UtilitySupervisor.start_link_child(
ctx.utility_supervisor,
Expand All @@ -74,7 +74,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
)

Process.monitor(signaling.pid)
SignalingChannel.register_element(signaling)
Signaling.register_element(signaling)
state = %{state | pc: pc, status: :connecting, signaling: signaling}
state = maybe_negotiate_tracks(state)
{[setup: :incomplete], state}
Expand Down Expand Up @@ -196,7 +196,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

@impl true
def handle_info({:ex_webrtc, _from, {:ice_candidate, candidate}}, _ctx, state) do
SignalingChannel.signal(state.signaling, candidate)
Signaling.signal(state.signaling, candidate)
{[], state}
end

Expand Down Expand Up @@ -252,7 +252,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

@impl true
def handle_info(
{SignalingChannel, _pid, %SessionDescription{type: :offer}, _metadata},
{Signaling, _pid, %SessionDescription{type: :offer}, _metadata},
_ctx,
_state
) do
Expand All @@ -261,7 +261,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

@impl true
def handle_info(
{SignalingChannel, _pid, %SessionDescription{type: :answer} = sdp, _metadata},
{Signaling, _pid, %SessionDescription{type: :answer} = sdp, _metadata},
_ctx,
state
) do
Expand Down Expand Up @@ -290,7 +290,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
end

@impl true
def handle_info({SignalingChannel, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
def handle_info({Signaling, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
:ok = PeerConnection.add_ice_candidate(state.pc, candidate)
{[], state}
end
Expand Down Expand Up @@ -327,7 +327,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

{:ok, offer} = PeerConnection.create_offer(pc)
:ok = PeerConnection.set_local_description(pc, offer)
SignalingChannel.signal(state.signaling, offer)
Signaling.signal(state.signaling, offer)
%{state | negotiating_tracks: negotiating_tracks, queued_tracks: []}
end

Expand Down
16 changes: 8 additions & 8 deletions lib/membrane_webrtc/ex_webrtc/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
require Membrane.Logger

alias ExWebRTC.{ICECandidate, PeerConnection, SessionDescription}
alias Membrane.WebRTC.{ExWebRTCUtils, SignalingChannel, SimpleWebSocketServer, WhipServer}
alias Membrane.WebRTC.{ExWebRTCUtils, Signaling, SimpleWebSocketServer, WhipServer}

def_options signaling: [],
allowed_video_codecs: [],
Expand Down Expand Up @@ -36,7 +36,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
output_tracks: %{(pad_id :: term()) => output_track()},
awaiting_outputs: [{:video | :audio, Membrane.Pad.ref()}],
awaiting_candidates: [ExWebRTC.ICECandidate.t()],
signaling: SignalingChannel.t() | {:websocket, SimpleWebSocketServer.options()},
signaling: Signaling.t() | {:websocket, SimpleWebSocketServer.options()},
status: :init | :connecting | :connected | :closed,
audio_params: [ExWebRTC.RTPCodecParameters.t()],
video_params: [ExWebRTC.RTPCodecParameters.t()],
Expand Down Expand Up @@ -103,7 +103,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
@impl true
def handle_playing(_ctx, state) do
Process.monitor(state.signaling.pid)
SignalingChannel.register_element(state.signaling)
Signaling.register_element(state.signaling)

{[], %{state | status: :connecting}}
end
Expand Down Expand Up @@ -193,7 +193,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do

@impl true
def handle_info({:ex_webrtc, _from, {:ice_candidate, candidate}}, _ctx, state) do
SignalingChannel.signal(state.signaling, candidate)
Signaling.signal(state.signaling, candidate)
{[], state}
end

Expand All @@ -210,7 +210,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do

@impl true
def handle_info(
{SignalingChannel, _pid, %SessionDescription{type: :offer} = sdp, metadata},
{Signaling, _pid, %SessionDescription{type: :offer} = sdp, metadata},
_ctx,
state
) do
Expand Down Expand Up @@ -269,7 +269,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
end

@impl true
def handle_info({SignalingChannel, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
def handle_info({Signaling, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
case PeerConnection.add_ice_candidate(state.pc, candidate) do
:ok ->
{[], state}
Expand Down Expand Up @@ -356,7 +356,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
answer
end

SignalingChannel.signal(state.signaling, answer)
Signaling.signal(state.signaling, answer)
%{state | awaiting_candidates: [], candidates_in_sdp: false}
else
state
Expand Down Expand Up @@ -394,7 +394,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
end

defp setup_whip(ctx, opts) do
signaling = SignalingChannel.new()
signaling = Signaling.new()
clients_cnt = :atomics.new(1, [])
{token, opts} = Keyword.pop(opts, :token, fn _token -> true end)
validate_token = if is_function(token), do: token, else: &(&1 == token)
Expand Down
219 changes: 219 additions & 0 deletions lib/membrane_webrtc/signaling.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
defmodule Membrane.WebRTC.Signaling do
@moduledoc """
Signaling channel for sending WebRTC signaling messages between Membrane elements
and other WebRTC peers.

The flow of using the signaling channel is the following:
- Create it with `new/0`.
- Register the peer process (the one to send and receive signaling messages)
with `register_peer/2`.
- Pass the signaling to `Membrane.WebRTC.Source` or `Membrane.WebRTC.Sink` (this
can also be done before the call to `register_peer/2`).
- Send and receive signaling messages. Messages can be sent by calling `signal/2`.
The signaling channel sends `t:message/0` to the peer.
"""
use GenServer

require Logger

alias ExWebRTC.{ICECandidate, SessionDescription}

@enforce_keys [:pid]
defstruct @enforce_keys

@type t :: %__MODULE__{pid: pid()}

@typedoc """
Messages sent by the signaling channel to the peer.
"""
@type message :: {__MODULE__, pid(), message_content, metadata :: map}

@typedoc """
Messages that the peer sends with `signal/2` and receives in `t:message/0`.

If the `message_format` of the peer is `ex_webrtc` (default), they should be
`t:ex_webrtc_message/0`.
If the `message_format` is `json_data`, they should be `t:json_data_message/0`.

The `message_format` of the peer can be set in `register_peer/2`.
"""
@type message_content :: ex_webrtc_message | json_data_message

@typedoc """
Messages sent and received if `message_format` is `ex_webrtc`.
"""
@type ex_webrtc_message :: ICECandidate.t() | SessionDescription.t()

@typedoc """
Messages sent and received if `message_format` is `json_data`.

The keys and values are the following
- `%{"type" => "sdp_offer", "data" => data}`, where data is the return value of
`ExWebRTC.SessionDescription.to_json/1` or `RTCPeerConnection.create_offer` in the JavaScript API
- `%{"type" => "sdp_answer", "data" => data}`, where data is the return value of
`ExWebRTC.SessionDescription.to_json/1` or `RTCPeerConnection.create_answer` in the JavaScript API
- `%{"type" => "ice_candidate", "data" => data}`, where data is the return value of
`ExWebRTC.ICECandidate.to_json/1` or `event.candidate` from the `RTCPeerConnection.onicecandidate`
callback in the JavaScript API.
"""
@type json_data_message :: %{String.t() => term}

@spec new() :: t
def new() do
{:ok, pid} = GenServer.start_link(__MODULE__, [])
%__MODULE__{pid: pid}
end

@doc """
Registers a process as a peer, so that it can send and receive signaling messages.

Options:
- `pid` - pid of the peer, `self()` by default
- `message_format` - `:ex_webrtc` by default, see `t:message_content/0`

See the moduledoc for details.
"""
@spec register_peer(t, message_format: :ex_webrtc | :json_data, pid: pid) :: :ok
def register_peer(%__MODULE__{pid: pid}, opts \\ []) do
opts =
opts
|> Keyword.validate!(message_format: :ex_webrtc, pid: self())
|> Map.new()
|> Map.put(:is_element, false)

GenServer.call(pid, {:register_peer, opts})
end

@doc false
@spec register_element(t) :: :ok
def register_element(%__MODULE__{pid: pid}) do
GenServer.call(
pid,
{:register_peer, %{pid: self(), message_format: :ex_webrtc, is_element: true}}
)
end

@doc """
Sends a signaling message to the signaling channel.

The calling process must be previously registered with `register_peer/2`.
See the moduledoc for details.
"""
@spec signal(t, message_content, metadata :: map) :: :ok
def signal(%__MODULE__{pid: pid}, message, metadata \\ %{}) do
send(pid, {:signal, self(), message, metadata})
:ok
end

@spec close(t) :: :ok
def close(%__MODULE__{pid: pid}) do
GenServer.stop(pid)
end

@impl true
def init(_opts) do
state = %{
peer_a: nil,
peer_b: nil,
message_queue: []
}

{:ok, state}
end

@impl true
def handle_call({:register_peer, peer}, _from, state) do
Process.monitor(peer.pid)

case state do
%{peer_a: nil} ->
{:reply, :ok, %{state | peer_a: peer}}

%{peer_b: nil, message_queue: queue} ->
state = %{state | peer_b: peer}

queue
|> Enum.reverse()
|> Enum.each(fn {message, metadata} ->
send_peer(state.peer_a, state.peer_b, message, metadata)
end)

{:reply, :ok, %{state | message_queue: []}}

state ->
raise """
Cannot register a peer, both peers already registered: \
#{inspect(state.peer_a.pid)}, #{inspect(state.peer_b.pid)}
"""
end
end

@impl true
def handle_info({:signal, _from_pid, message, metadata}, %{peer_b: nil} = state) do
{:noreply, %{state | message_queue: [{message, metadata} | state.message_queue]}}
end

@impl true
def handle_info({:signal, from_pid, message, metadata}, state) do
{from_peer, to_peer} = get_peers(from_pid, state)
send_peer(from_peer, to_peer, message, metadata)
{:noreply, state}
end

@impl true
def handle_info({:DOWN, _monitor, :process, pid, reason}, state) do
{peer, _other_peer} = get_peers(pid, state)
reason = if peer.is_element, do: reason, else: :normal
{:stop, reason, state}
end

defp get_peers(pid, state) do
case state do
%{peer_a: %{pid: ^pid} = peer_a, peer_b: peer_b} -> {peer_a, peer_b}
%{peer_a: peer_a, peer_b: %{pid: ^pid} = peer_b} -> {peer_b, peer_a}
end
end

defp send_peer(
%{message_format: format},
%{message_format: format, pid: pid},
message,
metadata
) do
send(pid, {__MODULE__, self(), message, metadata})
end

defp send_peer(
%{message_format: :ex_webrtc},
%{message_format: :json_data, pid: pid},
message,
metadata
) do
json_data =
case message do
%ICECandidate{} ->
%{"type" => "ice_candidate", "data" => ICECandidate.to_json(message)}

%SessionDescription{type: type} ->
%{"type" => "sdp_#{type}", "data" => SessionDescription.to_json(message)}
end

send(pid, {__MODULE__, self(), json_data, metadata})
end

defp send_peer(
%{message_format: :json_data},
%{message_format: :ex_webrtc, pid: pid},
message,
metadata
) do
message =
case message do
%{"type" => "ice_candidate", "data" => candidate} -> ICECandidate.from_json(candidate)
%{"type" => "sdp_offer", "data" => offer} -> SessionDescription.from_json(offer)
%{"type" => "sdp_answer", "data" => answer} -> SessionDescription.from_json(answer)
end

send(pid, {__MODULE__, self(), message, metadata})
end
end
Loading