Skip to content

Commit f905b03

Browse files
committed
fix: improve join payload handling
improves error handling on join by doing basic checks on types received by realtime. it's the scafold for future validations with more informative errors to the users
1 parent 5190510 commit f905b03

File tree

7 files changed

+346
-89
lines changed

7 files changed

+346
-89
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ This is the list of operational codes that can help you understand your deployme
220220
| Code | Description |
221221
| ---------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
222222
| TopicNameRequired | You are trying to use Realtime without a topic name set |
223+
| InvalidJoinPayload | The payload provided to Realtime on connect is invalid |
223224
| RealtimeDisabledForConfiguration | The configuration provided to Realtime on connect will not be able to provide you any Postgres Changes |
224225
| TenantNotFound | The tenant you are trying to connect to does not exist |
225226
| ErrorConnectingToWebsocket | Error when trying to connect to the WebSocket server |

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 67 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel do
55
use RealtimeWeb, :channel
66
use RealtimeWeb.RealtimeChannel.Logging
77

8-
alias RealtimeWeb.SocketDisconnect
98
alias DBConnection.Backoff
9+
alias Phoenix.Socket
1010

1111
alias Realtime.Crypto
1212
alias Realtime.GenCounter
@@ -21,11 +21,15 @@ defmodule RealtimeWeb.RealtimeChannel do
2121
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
2222
alias Realtime.Tenants.Connect
2323

24+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join
25+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config
26+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange
2427
alias RealtimeWeb.ChannelsAuthorization
2528
alias RealtimeWeb.RealtimeChannel.BroadcastHandler
2629
alias RealtimeWeb.RealtimeChannel.MessageDispatcher
2730
alias RealtimeWeb.RealtimeChannel.PresenceHandler
2831
alias RealtimeWeb.RealtimeChannel.Tracker
32+
alias RealtimeWeb.SocketDisconnect
2933

3034
@confirm_token_ms_interval :timer.minutes(5)
3135

@@ -46,15 +50,11 @@ defmodule RealtimeWeb.RealtimeChannel do
4650
Logger.metadata(external_id: tenant_id, project: tenant_id)
4751
Logger.put_process_level(self(), log_level)
4852

49-
socket =
50-
socket
51-
|> assign_access_token(params)
52-
|> assign_counter()
53-
|> assign_presence_counter()
54-
|> assign(:private?, !!params["config"]["private"])
55-
|> assign(:policies, nil)
53+
# We always need to assign the access token so we can get the logs metadata working as expected
54+
socket = assign_access_token(socket, params)
5655

57-
with :ok <- SignalHandler.shutdown_in_progress?(),
56+
with {:ok, %Socket{} = socket, %Join{} = configuration} <- configure_socket(socket, params),
57+
:ok <- SignalHandler.shutdown_in_progress?(),
5858
:ok <- only_private?(tenant_id, socket),
5959
:ok <- limit_joins(socket),
6060
:ok <- limit_channels(socket),
@@ -64,7 +64,6 @@ defmodule RealtimeWeb.RealtimeChannel do
6464
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
6565
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do
6666
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?)
67-
6867
# fastlane subscription
6968
metadata =
7069
MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id)
@@ -73,15 +72,11 @@ defmodule RealtimeWeb.RealtimeChannel do
7372

7473
Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)
7574

76-
is_new_api = new_api?(params)
77-
# TODO: Default will be moved to false in the future
78-
presence_enabled? =
79-
case get_in(params, ["config", "presence", "enabled"]) do
80-
enabled when is_boolean(enabled) -> enabled
81-
_ -> true
82-
end
75+
is_new_api = new_api?(configuration)
8376

84-
pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic)
77+
presence_enabled? = Join.presence_enabled?(configuration)
78+
79+
pg_change_params = pg_change_params(is_new_api, configuration, channel_pid, claims, sub_topic)
8580

8681
opts = %{
8782
is_new_api: is_new_api,
@@ -98,13 +93,13 @@ defmodule RealtimeWeb.RealtimeChannel do
9893
state = %{postgres_changes: add_id_to_postgres_changes(pg_change_params)}
9994

10095
assigns = %{
101-
ack_broadcast: !!params["config"]["broadcast"]["ack"],
96+
ack_broadcast: Join.ack_broadcast?(configuration),
10297
confirm_token_ref: confirm_token_ref,
10398
is_new_api: is_new_api,
10499
pg_sub_ref: nil,
105100
pg_change_params: pg_change_params,
106-
presence_key: presence_key(params),
107-
self_broadcast: !!params["config"]["broadcast"]["self"],
101+
presence_key: Join.presence_key(configuration),
102+
self_broadcast: Join.self_broadcast?(configuration),
108103
tenant_topic: tenant_topic,
109104
channel_name: sub_topic,
110105
presence_enabled?: presence_enabled?
@@ -118,6 +113,9 @@ defmodule RealtimeWeb.RealtimeChannel do
118113

119114
{:ok, state, assign(socket, assigns)}
120115
else
116+
{:error, :invalid_join_payload, errors, socket} ->
117+
log_error(socket, "InvalidJoinPayload", errors)
118+
121119
{:error, :expired_token, msg} ->
122120
maybe_log_warning(socket, "InvalidJWTToken", msg)
123121

@@ -194,6 +192,23 @@ defmodule RealtimeWeb.RealtimeChannel do
194192
end
195193
end
196194

195+
defp configure_socket(socket, params) do
196+
case Join.validate(params) do
197+
{:ok, configuration} ->
198+
socket =
199+
socket
200+
|> assign_counter()
201+
|> assign_presence_counter()
202+
|> assign(:private?, Join.private?(configuration))
203+
|> assign(:policies, nil)
204+
205+
{:ok, socket, configuration}
206+
207+
{:error, :invalid_join_payload, errors} ->
208+
{:error, :invalid_join_payload, errors, socket}
209+
end
210+
end
211+
197212
@impl true
198213
def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do
199214
count(socket)
@@ -531,40 +546,24 @@ defmodule RealtimeWeb.RealtimeChannel do
531546

532547
defp count(%{assigns: %{rate_counter: counter}}), do: GenCounter.add(counter.id)
533548

534-
defp presence_key(params) do
535-
case params["config"]["presence"]["key"] do
536-
key when is_binary(key) and key != "" -> key
537-
_ -> UUID.uuid1()
538-
end
539-
end
540-
541-
defp assign_access_token(%{assigns: %{headers: headers}} = socket, params) do
542-
access_token = Map.get(params, "access_token") || Map.get(params, "user_token")
549+
defp assign_access_token(socket, params) do
550+
%{assigns: %{tenant_token: tenant_token, headers: headers}} = socket
543551
{_, header} = Enum.find(headers, {nil, nil}, fn {k, _} -> k == "x-api-key" end)
544552

545-
case access_token do
546-
nil -> assign(socket, :access_token, header)
547-
"sb_" <> _ -> assign(socket, :access_token, header)
548-
_ -> handle_access_token(socket, params)
549-
end
550-
end
553+
access_token = Map.get(params, "access_token")
554+
user_token = Map.get(params, "user_token")
551555

552-
defp assign_access_token(socket, params), do: handle_access_token(socket, params)
553-
554-
defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"user_token" => user_token})
555-
when is_binary(user_token) do
556-
assign(socket, :access_token, user_token)
557-
end
556+
access_token =
557+
cond do
558+
access_token != nil and !String.starts_with?(access_token, "sb_") -> access_token
559+
user_token != nil and !String.starts_with?(user_token, "sb_") -> user_token
560+
tenant_token != nil and !String.starts_with?(tenant_token, "sb_") -> tenant_token
561+
true -> header
562+
end
558563

559-
defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"access_token" => access_token})
560-
when is_binary(access_token) do
561564
assign(socket, :access_token, access_token)
562565
end
563566

564-
defp handle_access_token(%{assigns: %{tenant_token: tenant_token}} = socket, _params) when is_binary(tenant_token) do
565-
assign(socket, :access_token, tenant_token)
566-
end
567-
568567
defp confirm_token(%{assigns: assigns}) do
569568
%{jwt_secret: jwt_secret, access_token: access_token} = assigns
570569

@@ -631,28 +630,30 @@ defmodule RealtimeWeb.RealtimeChannel do
631630
})
632631
end
633632

634-
defp new_api?(%{"config" => _}), do: true
633+
defp new_api?(%Join{config: config}) when not is_nil(config), do: true
635634
defp new_api?(_), do: false
636635

637-
defp pg_change_params(true, params, channel_pid, claims, _) do
638-
case get_in(params, ["config", "postgres_changes"]) do
639-
[_ | _] = params_list ->
640-
params_list
641-
|> Enum.reject(&is_nil/1)
642-
|> Enum.map(fn params ->
643-
%{
644-
id: UUID.uuid1(),
645-
channel_pid: channel_pid,
646-
claims: claims,
647-
params: params
648-
}
649-
end)
650-
651-
_ ->
652-
[]
653-
end
636+
defp pg_change_params(true, %Join{config: %Config{postgres_changes: postgres_changes}}, channel_pid, claims, _)
637+
when not is_nil(postgres_changes) do
638+
postgres_changes
639+
|> Enum.reject(&is_nil/1)
640+
|> Enum.map(fn %PostgresChange{table: table, event: event, schema: schema, filter: filter} ->
641+
params =
642+
%{"table" => table, "filter" => filter, "schema" => schema, "event" => event}
643+
|> Enum.reject(fn {_, v} -> is_nil(v) end)
644+
|> Map.new()
645+
646+
%{
647+
id: UUID.uuid1(),
648+
channel_pid: channel_pid,
649+
claims: claims,
650+
params: params
651+
}
652+
end)
654653
end
655654

655+
defp pg_change_params(true, _, _, _, _), do: []
656+
656657
defp pg_change_params(false, _, channel_pid, claims, sub_topic) do
657658
params =
658659
case String.split(sub_topic, ":", parts: 3) do
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange do
2+
@moduledoc """
3+
Validate postgres_changes field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join
8+
9+
embedded_schema do
10+
field :event, :string
11+
field :schema, :string
12+
field :table, :string
13+
field :filter, :string
14+
end
15+
16+
def changeset(postgres_change, attrs) do
17+
cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2)
18+
end
19+
end
20+
21+
defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast do
22+
@moduledoc """
23+
Validate broadcast field of the join payload.
24+
"""
25+
use Ecto.Schema
26+
import Ecto.Changeset
27+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join
28+
29+
embedded_schema do
30+
field :ack, :boolean, default: false
31+
field :self, :boolean, default: false
32+
end
33+
34+
def changeset(broadcast, attrs) do
35+
cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2)
36+
end
37+
end
38+
39+
defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence do
40+
@moduledoc """
41+
Validate presence field of the join payload.
42+
"""
43+
use Ecto.Schema
44+
import Ecto.Changeset
45+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join
46+
47+
embedded_schema do
48+
field :enabled, :boolean, default: true
49+
field :key, :string, default: UUID.uuid1()
50+
end
51+
52+
def changeset(presence, attrs) do
53+
cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2)
54+
end
55+
end
56+
57+
defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config do
58+
@moduledoc """
59+
Validate config field of the join payload.
60+
"""
61+
use Ecto.Schema
62+
import Ecto.Changeset
63+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join
64+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast
65+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence
66+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange
67+
68+
embedded_schema do
69+
embeds_one :broadcast, Broadcast
70+
embeds_one :presence, Presence
71+
embeds_many :postgres_changes, PostgresChange
72+
field :private, :boolean, default: false
73+
end
74+
75+
def changeset(config, attrs) do
76+
config
77+
|> cast(attrs, [:private], message: &Join.error_message/2)
78+
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")
79+
|> cast_embed(:presence, invalid_message: "unable to parse, expected a map ")
80+
|> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps")
81+
end
82+
end
83+
84+
defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join do
85+
@moduledoc """
86+
Payload validation for the phx_join event.
87+
"""
88+
use Ecto.Schema
89+
import Ecto.Changeset
90+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config
91+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast
92+
alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence
93+
94+
embedded_schema do
95+
embeds_one :config, Config
96+
field :access_token, :string
97+
field :user_token, :string
98+
end
99+
100+
def changeset(join, attrs) do
101+
join
102+
|> cast(attrs, [:access_token, :user_token], message: &error_message/2)
103+
|> cast_embed(:config, invalid_message: "unable to parse, expected a map")
104+
end
105+
106+
@spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()}
107+
def validate(params) do
108+
case changeset(%__MODULE__{}, params) do
109+
%Ecto.Changeset{valid?: true} = changeset ->
110+
{:ok, Ecto.Changeset.apply_changes(changeset)}
111+
112+
%Ecto.Changeset{valid?: false} = changeset ->
113+
errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
114+
{:error, :invalid_join_payload, errors}
115+
end
116+
end
117+
118+
def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled
119+
def presence_enabled?(_), do: true
120+
121+
def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key
122+
def presence_key(_), do: UUID.uuid1()
123+
124+
def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack
125+
def ack_broadcast?(_), do: false
126+
127+
def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self
128+
def self_broadcast?(_), do: false
129+
130+
def private?(%__MODULE__{config: %Config{private: private}}), do: private
131+
def private?(_), do: false
132+
133+
def error_message(_field, meta) do
134+
type = Keyword.get(meta, :type)
135+
136+
if type,
137+
do: "unable to parse, expected #{type}",
138+
else: "unable to parse"
139+
end
140+
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.43.1",
7+
version: "2.43.2",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

0 commit comments

Comments
 (0)