Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_webrtc_plugin` to your list of
```elixir
def deps do
[
{:membrane_webrtc_plugin, "~> 0.23.3"}
{:membrane_webrtc_plugin, "~> 0.24.0"}
]
end
```
Expand Down
16 changes: 8 additions & 8 deletions lib/membrane_webrtc/ex_webrtc/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

alias ExRTCP.Packet.PayloadFeedback.PLI

alias Membrane.WebRTC.{ExWebRTCUtils, SignalingChannel, SimpleWebSocketServer}
alias Membrane.WebRTC.{ExWebRTCUtils, Signaling, SimpleWebSocketServer}

def_options signaling: [], tracks: [], video_codec: [], ice_servers: []

Expand Down Expand Up @@ -53,7 +53,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
SimpleWebSocketServer.start_link_supervised(ctx.utility_supervisor, opts)

{:whip, opts} ->
signaling = SignalingChannel.new()
signaling = Signaling.new()

Membrane.UtilitySupervisor.start_link_child(
ctx.utility_supervisor,
Expand All @@ -74,7 +74,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
)

Process.monitor(signaling.pid)
SignalingChannel.register_element(signaling)
Signaling.register_element(signaling)
state = %{state | pc: pc, status: :connecting, signaling: signaling}
state = maybe_negotiate_tracks(state)
{[setup: :incomplete], state}
Expand Down Expand Up @@ -196,7 +196,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

@impl true
def handle_info({:ex_webrtc, _from, {:ice_candidate, candidate}}, _ctx, state) do
SignalingChannel.signal(state.signaling, candidate)
Signaling.signal(state.signaling, candidate)
{[], state}
end

Expand Down Expand Up @@ -252,7 +252,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

@impl true
def handle_info(
{SignalingChannel, _pid, %SessionDescription{type: :offer}, _metadata},
{Signaling, _pid, %SessionDescription{type: :offer}, _metadata},
_ctx,
_state
) do
Expand All @@ -261,7 +261,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

@impl true
def handle_info(
{SignalingChannel, _pid, %SessionDescription{type: :answer} = sdp, _metadata},
{Signaling, _pid, %SessionDescription{type: :answer} = sdp, _metadata},
_ctx,
state
) do
Expand Down Expand Up @@ -290,7 +290,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
end

@impl true
def handle_info({SignalingChannel, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
def handle_info({Signaling, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
:ok = PeerConnection.add_ice_candidate(state.pc, candidate)
{[], state}
end
Expand Down Expand Up @@ -327,7 +327,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

{:ok, offer} = PeerConnection.create_offer(pc)
:ok = PeerConnection.set_local_description(pc, offer)
SignalingChannel.signal(state.signaling, offer)
Signaling.signal(state.signaling, offer)
%{state | negotiating_tracks: negotiating_tracks, queued_tracks: []}
end

Expand Down
16 changes: 8 additions & 8 deletions lib/membrane_webrtc/ex_webrtc/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
require Membrane.Logger

alias ExWebRTC.{ICECandidate, PeerConnection, SessionDescription}
alias Membrane.WebRTC.{ExWebRTCUtils, SignalingChannel, SimpleWebSocketServer, WhipServer}
alias Membrane.WebRTC.{ExWebRTCUtils, Signaling, SimpleWebSocketServer, WhipServer}

def_options signaling: [],
allowed_video_codecs: [],
Expand Down Expand Up @@ -36,7 +36,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
output_tracks: %{(pad_id :: term()) => output_track()},
awaiting_outputs: [{:video | :audio, Membrane.Pad.ref()}],
awaiting_candidates: [ExWebRTC.ICECandidate.t()],
signaling: SignalingChannel.t() | {:websocket, SimpleWebSocketServer.options()},
signaling: Signaling.t() | {:websocket, SimpleWebSocketServer.options()},
status: :init | :connecting | :connected | :closed,
audio_params: [ExWebRTC.RTPCodecParameters.t()],
video_params: [ExWebRTC.RTPCodecParameters.t()],
Expand Down Expand Up @@ -103,7 +103,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
@impl true
def handle_playing(_ctx, state) do
Process.monitor(state.signaling.pid)
SignalingChannel.register_element(state.signaling)
Signaling.register_element(state.signaling)

{[], %{state | status: :connecting}}
end
Expand Down Expand Up @@ -193,7 +193,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do

@impl true
def handle_info({:ex_webrtc, _from, {:ice_candidate, candidate}}, _ctx, state) do
SignalingChannel.signal(state.signaling, candidate)
Signaling.signal(state.signaling, candidate)
{[], state}
end

Expand All @@ -210,7 +210,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do

@impl true
def handle_info(
{SignalingChannel, _pid, %SessionDescription{type: :offer} = sdp, metadata},
{Signaling, _pid, %SessionDescription{type: :offer} = sdp, metadata},
_ctx,
state
) do
Expand Down Expand Up @@ -269,7 +269,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
end

@impl true
def handle_info({SignalingChannel, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
def handle_info({Signaling, _pid, %ICECandidate{} = candidate, _metadata}, _ctx, state) do
case PeerConnection.add_ice_candidate(state.pc, candidate) do
:ok ->
{[], state}
Expand Down Expand Up @@ -356,7 +356,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
answer
end

SignalingChannel.signal(state.signaling, answer)
Signaling.signal(state.signaling, answer)
%{state | awaiting_candidates: [], candidates_in_sdp: false}
else
state
Expand Down Expand Up @@ -394,7 +394,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
end

defp setup_whip(ctx, opts) do
signaling = SignalingChannel.new()
signaling = Signaling.new()
clients_cnt = :atomics.new(1, [])
{token, opts} = Keyword.pop(opts, :token, fn _token -> true end)
validate_token = if is_function(token), do: token, else: &(&1 == token)
Expand Down
219 changes: 219 additions & 0 deletions lib/membrane_webrtc/signaling.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
defmodule Membrane.WebRTC.Signaling do
@moduledoc """
Signaling channel for sending WebRTC signaling messages between Membrane elements
and other WebRTC peers.

The flow of using the signaling channel is the following:
- Create it with `new/0`.
- Register the peer process (the one to send and receive signaling messages)
with `register_peer/2`.
- Pass the signaling to `Membrane.WebRTC.Source` or `Membrane.WebRTC.Sink` (this
can also be done before the call to `register_peer/2`).
- Send and receive signaling messages. Messages can be sent by calling `signal/2`.
The signaling channel sends `t:message/0` to the peer.
"""
use GenServer

require Logger

alias ExWebRTC.{ICECandidate, SessionDescription}

@enforce_keys [:pid]
defstruct @enforce_keys

@type t :: %__MODULE__{pid: pid()}

@typedoc """
Messages sent by the signaling channel to the peer.
"""
@type message :: {__MODULE__, pid(), message_content, metadata :: map}

@typedoc """
Messages that the peer sends with `signal/2` and receives in `t:message/0`.

If the `message_format` of the peer is `ex_webrtc` (default), they should be
`t:ex_webrtc_message/0`.
If the `message_format` is `json_data`, they should be `t:json_data_message/0`.

The `message_format` of the peer can be set in `register_peer/2`.
"""
@type message_content :: ex_webrtc_message | json_data_message

@typedoc """
Messages sent and received if `message_format` is `ex_webrtc`.
"""
@type ex_webrtc_message :: ICECandidate.t() | SessionDescription.t()

@typedoc """
Messages sent and received if `message_format` is `json_data`.

The keys and values are the following
- `%{"type" => "sdp_offer", "data" => data}`, where data is the return value of
`ExWebRTC.SessionDescription.to_json/1` or `RTCPeerConnection.create_offer` in the JavaScript API
- `%{"type" => "sdp_answer", "data" => data}`, where data is the return value of
`ExWebRTC.SessionDescription.to_json/1` or `RTCPeerConnection.create_answer` in the JavaScript API
- `%{"type" => "ice_candidate", "data" => data}`, where data is the return value of
`ExWebRTC.ICECandidate.to_json/1` or `event.candidate` from the `RTCPeerConnection.onicecandidate`
callback in the JavaScript API.
"""
@type json_data_message :: %{String.t() => term}

@spec new() :: t
def new() do
{:ok, pid} = GenServer.start_link(__MODULE__, [])
%__MODULE__{pid: pid}
end

@doc """
Registers a process as a peer, so that it can send and receive signaling messages.

Options:
- `pid` - pid of the peer, `self()` by default
- `message_format` - `:ex_webrtc` by default, see `t:message_content/0`

See the moduledoc for details.
"""
@spec register_peer(t, message_format: :ex_webrtc | :json_data, pid: pid) :: :ok
def register_peer(%__MODULE__{pid: pid}, opts \\ []) do
opts =
opts
|> Keyword.validate!(message_format: :ex_webrtc, pid: self())
|> Map.new()
|> Map.put(:is_element, false)

GenServer.call(pid, {:register_peer, opts})
end

@doc false
@spec register_element(t) :: :ok
def register_element(%__MODULE__{pid: pid}) do
GenServer.call(
pid,
{:register_peer, %{pid: self(), message_format: :ex_webrtc, is_element: true}}
)
end

@doc """
Sends a signaling message to the signaling channel.

The calling process must be previously registered with `register_peer/2`.
See the moduledoc for details.
"""
@spec signal(t, message_content, metadata :: map) :: :ok
def signal(%__MODULE__{pid: pid}, message, metadata \\ %{}) do
send(pid, {:signal, self(), message, metadata})
:ok
end

@spec close(t) :: :ok
def close(%__MODULE__{pid: pid}) do
GenServer.stop(pid)
end

@impl true
def init(_opts) do
state = %{
peer_a: nil,
peer_b: nil,
message_queue: []
}

{:ok, state}
end

@impl true
def handle_call({:register_peer, peer}, _from, state) do
Process.monitor(peer.pid)

case state do
%{peer_a: nil} ->
{:reply, :ok, %{state | peer_a: peer}}

%{peer_b: nil, message_queue: queue} ->
state = %{state | peer_b: peer}

queue
|> Enum.reverse()
|> Enum.each(fn {message, metadata} ->
send_peer(state.peer_a, state.peer_b, message, metadata)
end)

{:reply, :ok, %{state | message_queue: []}}

state ->
raise """
Cannot register a peer, both peers already registered: \
#{inspect(state.peer_a.pid)}, #{inspect(state.peer_b.pid)}
"""
end
end

@impl true
def handle_info({:signal, _from_pid, message, metadata}, %{peer_b: nil} = state) do
{:noreply, %{state | message_queue: [{message, metadata} | state.message_queue]}}
end

@impl true
def handle_info({:signal, from_pid, message, metadata}, state) do
{from_peer, to_peer} = get_peers(from_pid, state)
send_peer(from_peer, to_peer, message, metadata)
{:noreply, state}
end

@impl true
def handle_info({:DOWN, _monitor, :process, pid, reason}, state) do
{peer, _other_peer} = get_peers(pid, state)
reason = if peer.is_element, do: reason, else: :normal
{:stop, reason, state}
end

defp get_peers(pid, state) do
case state do
%{peer_a: %{pid: ^pid} = peer_a, peer_b: peer_b} -> {peer_a, peer_b}
%{peer_a: peer_a, peer_b: %{pid: ^pid} = peer_b} -> {peer_b, peer_a}
end
end

defp send_peer(
%{message_format: format},
%{message_format: format, pid: pid},
message,
metadata
) do
send(pid, {__MODULE__, self(), message, metadata})
end

defp send_peer(
%{message_format: :ex_webrtc},
%{message_format: :json_data, pid: pid},
message,
metadata
) do
json_data =
case message do
%ICECandidate{} ->
%{"type" => "ice_candidate", "data" => ICECandidate.to_json(message)}

%SessionDescription{type: type} ->
%{"type" => "sdp_#{type}", "data" => SessionDescription.to_json(message)}
end

send(pid, {__MODULE__, self(), json_data, metadata})
end

defp send_peer(
%{message_format: :json_data},
%{message_format: :ex_webrtc, pid: pid},
message,
metadata
) do
message =
case message do
%{"type" => "ice_candidate", "data" => candidate} -> ICECandidate.from_json(candidate)
%{"type" => "sdp_offer", "data" => offer} -> SessionDescription.from_json(offer)
%{"type" => "sdp_answer", "data" => answer} -> SessionDescription.from_json(answer)
end

send(pid, {__MODULE__, self(), message, metadata})
end
end
Loading