Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ This is the list of operational codes that can help you understand your deployme
| Code | Description |
| ---------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TopicNameRequired | You are trying to use Realtime without a topic name set |
| InvalidJoinPayload | The payload provided to Realtime on connect is invalid |
| RealtimeDisabledForConfiguration | The configuration provided to Realtime on connect will not be able to provide you any Postgres Changes |
| TenantNotFound | The tenant you are trying to connect to does not exist |
| ErrorConnectingToWebsocket | Error when trying to connect to the WebSocket server |
Expand Down
17 changes: 17 additions & 0 deletions lib/realtime_web/channels/payloads/broadcast.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule RealtimeWeb.Channels.Payloads.Broadcast do
@moduledoc """
Validate broadcast field of the join payload.
"""
use Ecto.Schema
import Ecto.Changeset
alias RealtimeWeb.Channels.Payloads.Join

embedded_schema do
field :ack, :boolean, default: false
field :self, :boolean, default: false
end

def changeset(broadcast, attrs) do
cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2)
end
end
26 changes: 26 additions & 0 deletions lib/realtime_web/channels/payloads/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule RealtimeWeb.Channels.Payloads.Config do
@moduledoc """
Validate config field of the join payload.
"""
use Ecto.Schema
import Ecto.Changeset
alias RealtimeWeb.Channels.Payloads.Join
alias RealtimeWeb.Channels.Payloads.Broadcast
alias RealtimeWeb.Channels.Payloads.Presence
alias RealtimeWeb.Channels.Payloads.PostgresChange

embedded_schema do
embeds_one :broadcast, Broadcast
embeds_one :presence, Presence
embeds_many :postgres_changes, PostgresChange
field :private, :boolean, default: false
end

def changeset(config, attrs) do
config
|> cast(attrs, [:private], message: &Join.error_message/2)
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")
|> cast_embed(:presence, invalid_message: "unable to parse, expected a map")
|> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps")
end
end
58 changes: 58 additions & 0 deletions lib/realtime_web/channels/payloads/join.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule RealtimeWeb.Channels.Payloads.Join do
@moduledoc """
Payload validation for the phx_join event.
"""
use Ecto.Schema
import Ecto.Changeset
alias RealtimeWeb.Channels.Payloads.Config
alias RealtimeWeb.Channels.Payloads.Broadcast
alias RealtimeWeb.Channels.Payloads.Presence

embedded_schema do
embeds_one :config, Config
field :access_token, :string
field :user_token, :string
end

def changeset(join, attrs) do
join
|> cast(attrs, [:access_token, :user_token], message: &error_message/2)
|> cast_embed(:config, invalid_message: "unable to parse, expected a map")
end

@spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()}
def validate(params) do
case changeset(%__MODULE__{}, params) do
%Ecto.Changeset{valid?: true} = changeset ->
{:ok, Ecto.Changeset.apply_changes(changeset)}

%Ecto.Changeset{valid?: false} = changeset ->
errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
{:error, :invalid_join_payload, errors}
end
end

def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled
def presence_enabled?(_), do: true

def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: ""}}}), do: UUID.uuid1()
def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key
def presence_key(_), do: UUID.uuid1()

def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack
def ack_broadcast?(_), do: false

def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self
def self_broadcast?(_), do: false

def private?(%__MODULE__{config: %Config{private: private}}), do: private
def private?(_), do: false

def error_message(_field, meta) do
type = Keyword.get(meta, :type)

if type,
do: "unable to parse, expected #{type}",
else: "unable to parse"
end
end
19 changes: 19 additions & 0 deletions lib/realtime_web/channels/payloads/postgres_change.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule RealtimeWeb.Channels.Payloads.PostgresChange do
@moduledoc """
Validate postgres_changes field of the join payload.
"""
use Ecto.Schema
import Ecto.Changeset
alias RealtimeWeb.Channels.Payloads.Join

embedded_schema do
field :event, :string
field :schema, :string
field :table, :string
field :filter, :string
end

def changeset(postgres_change, attrs) do
cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2)
end
end
17 changes: 17 additions & 0 deletions lib/realtime_web/channels/payloads/presence.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule RealtimeWeb.Channels.Payloads.Presence do
@moduledoc """
Validate presence field of the join payload.
"""
use Ecto.Schema
import Ecto.Changeset
alias RealtimeWeb.Channels.Payloads.Join

embedded_schema do
field :enabled, :boolean, default: true
field :key, :string, default: UUID.uuid1()
end

def changeset(presence, attrs) do
cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2)
end
end
133 changes: 67 additions & 66 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel do
use RealtimeWeb, :channel
use RealtimeWeb.RealtimeChannel.Logging

alias RealtimeWeb.SocketDisconnect
alias DBConnection.Backoff
alias Phoenix.Socket

alias Realtime.Crypto
alias Realtime.GenCounter
Expand All @@ -21,11 +21,15 @@ defmodule RealtimeWeb.RealtimeChannel do
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
alias Realtime.Tenants.Connect

alias RealtimeWeb.Channels.Payloads.Join
alias RealtimeWeb.Channels.Payloads.Config
alias RealtimeWeb.Channels.Payloads.PostgresChange
alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.RealtimeChannel.BroadcastHandler
alias RealtimeWeb.RealtimeChannel.MessageDispatcher
alias RealtimeWeb.RealtimeChannel.PresenceHandler
alias RealtimeWeb.RealtimeChannel.Tracker
alias RealtimeWeb.SocketDisconnect

@confirm_token_ms_interval :timer.minutes(5)

Expand All @@ -46,15 +50,11 @@ defmodule RealtimeWeb.RealtimeChannel do
Logger.metadata(external_id: tenant_id, project: tenant_id)
Logger.put_process_level(self(), log_level)

socket =
socket
|> assign_access_token(params)
|> assign_counter()
|> assign_presence_counter()
|> assign(:private?, !!params["config"]["private"])
|> assign(:policies, nil)
# We always need to assign the access token so we can get the logs metadata working as expected
socket = assign_access_token(socket, params)

with :ok <- SignalHandler.shutdown_in_progress?(),
with {:ok, %Socket{} = socket, %Join{} = configuration} <- configure_socket(socket, params),
:ok <- SignalHandler.shutdown_in_progress?(),
:ok <- only_private?(tenant_id, socket),
:ok <- limit_joins(socket),
:ok <- limit_channels(socket),
Expand All @@ -64,7 +64,6 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?)

# fastlane subscription
metadata =
MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id)
Expand All @@ -73,15 +72,11 @@ defmodule RealtimeWeb.RealtimeChannel do

Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)

is_new_api = new_api?(params)
# TODO: Default will be moved to false in the future
presence_enabled? =
case get_in(params, ["config", "presence", "enabled"]) do
enabled when is_boolean(enabled) -> enabled
_ -> true
end
is_new_api = new_api?(configuration)

pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic)
presence_enabled? = Join.presence_enabled?(configuration)

pg_change_params = pg_change_params(is_new_api, configuration, channel_pid, claims, sub_topic)

opts = %{
is_new_api: is_new_api,
Expand All @@ -98,13 +93,13 @@ defmodule RealtimeWeb.RealtimeChannel do
state = %{postgres_changes: add_id_to_postgres_changes(pg_change_params)}

assigns = %{
ack_broadcast: !!params["config"]["broadcast"]["ack"],
ack_broadcast: Join.ack_broadcast?(configuration),
confirm_token_ref: confirm_token_ref,
is_new_api: is_new_api,
pg_sub_ref: nil,
pg_change_params: pg_change_params,
presence_key: presence_key(params),
self_broadcast: !!params["config"]["broadcast"]["self"],
presence_key: Join.presence_key(configuration),
self_broadcast: Join.self_broadcast?(configuration),
tenant_topic: tenant_topic,
channel_name: sub_topic,
presence_enabled?: presence_enabled?
Expand All @@ -118,6 +113,9 @@ defmodule RealtimeWeb.RealtimeChannel do

{:ok, state, assign(socket, assigns)}
else
{:error, :invalid_join_payload, errors, socket} ->
log_error(socket, "InvalidJoinPayload", errors)

{:error, :expired_token, msg} ->
maybe_log_warning(socket, "InvalidJWTToken", msg)

Expand Down Expand Up @@ -194,6 +192,23 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

defp configure_socket(socket, params) do
case Join.validate(params) do
{:ok, configuration} ->
socket =
socket
|> assign_counter()
|> assign_presence_counter()
|> assign(:private?, Join.private?(configuration))
|> assign(:policies, nil)

{:ok, socket, configuration}

{:error, :invalid_join_payload, errors} ->
{:error, :invalid_join_payload, errors, socket}
end
end

@impl true
def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do
count(socket)
Expand Down Expand Up @@ -531,40 +546,24 @@ defmodule RealtimeWeb.RealtimeChannel do

defp count(%{assigns: %{rate_counter: counter}}), do: GenCounter.add(counter.id)

defp presence_key(params) do
case params["config"]["presence"]["key"] do
key when is_binary(key) and key != "" -> key
_ -> UUID.uuid1()
end
end

defp assign_access_token(%{assigns: %{headers: headers}} = socket, params) do
access_token = Map.get(params, "access_token") || Map.get(params, "user_token")
defp assign_access_token(socket, params) do
%{assigns: %{tenant_token: tenant_token, headers: headers}} = socket
{_, header} = Enum.find(headers, {nil, nil}, fn {k, _} -> k == "x-api-key" end)

case access_token do
nil -> assign(socket, :access_token, header)
"sb_" <> _ -> assign(socket, :access_token, header)
_ -> handle_access_token(socket, params)
end
end
access_token = Map.get(params, "access_token")
user_token = Map.get(params, "user_token")

defp assign_access_token(socket, params), do: handle_access_token(socket, params)

defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"user_token" => user_token})
when is_binary(user_token) do
assign(socket, :access_token, user_token)
end
access_token =
cond do
is_binary(access_token) and !String.starts_with?(access_token, "sb_") -> access_token
is_binary(user_token) and !String.starts_with?(user_token, "sb_") -> user_token
is_binary(tenant_token) and !String.starts_with?(tenant_token, "sb_") -> tenant_token
true -> header
end

defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"access_token" => access_token})
when is_binary(access_token) do
assign(socket, :access_token, access_token)
end

defp handle_access_token(%{assigns: %{tenant_token: tenant_token}} = socket, _params) when is_binary(tenant_token) do
assign(socket, :access_token, tenant_token)
end

defp confirm_token(%{assigns: assigns}) do
%{jwt_secret: jwt_secret, access_token: access_token} = assigns

Expand Down Expand Up @@ -631,28 +630,30 @@ defmodule RealtimeWeb.RealtimeChannel do
})
end

defp new_api?(%{"config" => _}), do: true
defp new_api?(%Join{config: config}) when not is_nil(config), do: true
defp new_api?(_), do: false

defp pg_change_params(true, params, channel_pid, claims, _) do
case get_in(params, ["config", "postgres_changes"]) do
[_ | _] = params_list ->
params_list
|> Enum.reject(&is_nil/1)
|> Enum.map(fn params ->
%{
id: UUID.uuid1(),
channel_pid: channel_pid,
claims: claims,
params: params
}
end)

_ ->
[]
end
defp pg_change_params(true, %Join{config: %Config{postgres_changes: postgres_changes}}, channel_pid, claims, _)
when not is_nil(postgres_changes) do
postgres_changes
|> Enum.reject(&is_nil/1)
|> Enum.map(fn %PostgresChange{table: table, event: event, schema: schema, filter: filter} ->
params =
%{"table" => table, "filter" => filter, "schema" => schema, "event" => event}
|> Enum.reject(fn {_, v} -> is_nil(v) end)
|> Map.new()

%{
id: UUID.uuid1(),
channel_pid: channel_pid,
claims: claims,
params: params
}
end)
end

defp pg_change_params(true, _, _, _, _), do: []

defp pg_change_params(false, _, channel_pid, claims, sub_topic) do
params =
case String.split(sub_topic, ":", parts: 3) do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.43.1",
version: "2.43.3",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading
Loading