Skip to content

Commit 76a09c5

Browse files
committed
fix: improve join payload handling
improves error handling on join by doing basic checks on types received by realtime. it's the scafold for future validations with more informative errors to the users
1 parent 9daf263 commit 76a09c5

File tree

10 files changed

+384
-1
lines changed

10 files changed

+384
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ This is the list of operational codes that can help you understand your deployme
225225
| Code | Description |
226226
| ---------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
227227
| TopicNameRequired | You are trying to use Realtime without a topic name set |
228+
| InvalidJoinPayload | The payload provided to Realtime on connect is invalid |
228229
| RealtimeDisabledForConfiguration | The configuration provided to Realtime on connect will not be able to provide you any Postgres Changes |
229230
| TenantNotFound | The tenant you are trying to connect to does not exist |
230231
| ErrorConnectingToWebsocket | Error when trying to connect to the WebSocket server |
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Broadcast do
2+
@moduledoc """
3+
Validate broadcast field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
9+
embedded_schema do
10+
field :ack, :boolean, default: false
11+
field :self, :boolean, default: false
12+
end
13+
14+
def changeset(broadcast, attrs) do
15+
cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2)
16+
end
17+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Config do
2+
@moduledoc """
3+
Validate config field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
alias RealtimeWeb.Channels.Payloads.Broadcast
9+
alias RealtimeWeb.Channels.Payloads.Presence
10+
alias RealtimeWeb.Channels.Payloads.PostgresChange
11+
12+
embedded_schema do
13+
embeds_one :broadcast, Broadcast
14+
embeds_one :presence, Presence
15+
embeds_many :postgres_changes, PostgresChange
16+
field :private, :boolean, default: false
17+
end
18+
19+
def changeset(config, attrs) do
20+
config
21+
|> cast(attrs, [:private], message: &Join.error_message/2)
22+
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")
23+
|> cast_embed(:presence, invalid_message: "unable to parse, expected a map")
24+
|> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps")
25+
end
26+
end
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Join do
2+
@moduledoc """
3+
Payload validation for the phx_join event.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Config
8+
alias RealtimeWeb.Channels.Payloads.Broadcast
9+
alias RealtimeWeb.Channels.Payloads.Presence
10+
11+
embedded_schema do
12+
embeds_one :config, Config
13+
field :access_token, :string
14+
field :user_token, :string
15+
end
16+
17+
def changeset(join, attrs) do
18+
join
19+
|> cast(attrs, [:access_token, :user_token], message: &error_message/2)
20+
|> cast_embed(:config, invalid_message: "unable to parse, expected a map")
21+
end
22+
23+
@spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()}
24+
def validate(params) do
25+
case changeset(%__MODULE__{}, params) do
26+
%Ecto.Changeset{valid?: true} = changeset ->
27+
{:ok, Ecto.Changeset.apply_changes(changeset)}
28+
29+
%Ecto.Changeset{valid?: false} = changeset ->
30+
errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
31+
{:error, :invalid_join_payload, errors}
32+
end
33+
end
34+
35+
def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled
36+
def presence_enabled?(_), do: true
37+
38+
def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: ""}}}), do: UUID.uuid1()
39+
def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key
40+
def presence_key(_), do: UUID.uuid1()
41+
42+
def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack
43+
def ack_broadcast?(_), do: false
44+
45+
def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self
46+
def self_broadcast?(_), do: false
47+
48+
def private?(%__MODULE__{config: %Config{private: private}}), do: private
49+
def private?(_), do: false
50+
51+
def error_message(_field, meta) do
52+
type = Keyword.get(meta, :type)
53+
54+
if type,
55+
do: "unable to parse, expected #{type}",
56+
else: "unable to parse"
57+
end
58+
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
defmodule RealtimeWeb.Channels.Payloads.PostgresChange do
2+
@moduledoc """
3+
Validate postgres_changes field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
9+
embedded_schema do
10+
field :event, :string
11+
field :schema, :string
12+
field :table, :string
13+
field :filter, :string
14+
end
15+
16+
def changeset(postgres_change, attrs) do
17+
cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2)
18+
end
19+
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Presence do
2+
@moduledoc """
3+
Validate presence field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
9+
embedded_schema do
10+
field :enabled, :boolean, default: true
11+
field :key, :string, default: UUID.uuid1()
12+
end
13+
14+
def changeset(presence, attrs) do
15+
cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2)
16+
end
17+
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ defmodule RealtimeWeb.RealtimeChannel do
2121
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
2222
alias Realtime.Tenants.Connect
2323

24+
alias RealtimeWeb.Channels.Payloads.Join
2425
alias RealtimeWeb.ChannelsAuthorization
2526
alias RealtimeWeb.RealtimeChannel.BroadcastHandler
2627
alias RealtimeWeb.RealtimeChannel.MessageDispatcher
@@ -54,6 +55,15 @@ defmodule RealtimeWeb.RealtimeChannel do
5455
|> assign(:private?, !!params["config"]["private"])
5556
|> assign(:policies, nil)
5657

58+
case Join.validate(params) do
59+
{:ok, _join} ->
60+
nil
61+
62+
{:error, :invalid_join_payload, errors} ->
63+
log_params = params |> Map.put("access_token", "<redacted>") |> Map.put("user_token", "<redacted>")
64+
log_error(socket, "InvalidJoinPayload", %{changeset_errors: errors, params: log_params})
65+
end
66+
5767
with :ok <- SignalHandler.shutdown_in_progress?(),
5868
:ok <- only_private?(tenant_id, socket),
5969
:ok <- limit_joins(socket),

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.44.0",
7+
version: "2.44.1",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

phx_join.schema.json

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
{
2+
"type": "object",
3+
"description": "",
4+
"title": "phx_join",
5+
"required": [
6+
"access_token",
7+
"config"
8+
],
9+
"properties": {
10+
"config": {
11+
"title": "config",
12+
"$ref": "#/$defs/phx_join_config"
13+
},
14+
"access_token": {
15+
"type": "string",
16+
"description": "String, e.g. 'hello'",
17+
"title": "access_token"
18+
}
19+
},
20+
"additionalProperties": false,
21+
"$defs": {
22+
"phx_join_config": {
23+
"type": "object",
24+
"description": "",
25+
"title": "phx_join_config",
26+
"properties": {
27+
"private": {
28+
"type": "boolean",
29+
"description": "Boolean, e.g. true",
30+
"title": "private",
31+
"default": false
32+
},
33+
"broadcast": {
34+
"title": "broadcast",
35+
"$ref": "#/$defs/phx_join_broadcast",
36+
"default": {
37+
"self": false,
38+
"ack": false
39+
}
40+
},
41+
"presence": {
42+
"title": "presence",
43+
"$ref": "#/$defs/phx_join_presence",
44+
"default": {
45+
"enabled": false,
46+
"key": ""
47+
}
48+
},
49+
"postgres_changes": {
50+
"type": "array",
51+
"title": "phx_join_postgres_changes",
52+
"items": {
53+
"$ref": "#/$defs/phx_join_postgres_changes",
54+
"default": {
55+
"table": "",
56+
"filter": "",
57+
"schema": "",
58+
"event": ""
59+
}
60+
}
61+
}
62+
},
63+
"additionalProperties": false
64+
},
65+
"phx_join_broadcast": {
66+
"type": "object",
67+
"description": "",
68+
"title": "phx_join_broadcast",
69+
"properties": {
70+
"self": {
71+
"type": "boolean",
72+
"description": "Boolean, e.g. true",
73+
"title": "self",
74+
"default": false
75+
},
76+
"ack": {
77+
"type": "boolean",
78+
"description": "Boolean, e.g. true",
79+
"title": "ack",
80+
"default": false
81+
}
82+
},
83+
"additionalProperties": false
84+
},
85+
"phx_join_postgres_changes": {
86+
"type": "object",
87+
"description": "",
88+
"title": "phx_join_postgres_changes",
89+
"required": [
90+
"event",
91+
"filter",
92+
"schema",
93+
"table"
94+
],
95+
"properties": {
96+
"table": {
97+
"type": "string",
98+
"description": "String, e.g. 'hello'",
99+
"title": "table"
100+
},
101+
"filter": {
102+
"type": "string",
103+
"description": "String, e.g. 'hello'",
104+
"title": "filter"
105+
},
106+
"schema": {
107+
"type": "string",
108+
"description": "String, e.g. 'hello'",
109+
"title": "schema"
110+
},
111+
"event": {
112+
"type": "string",
113+
"description": "String, e.g. 'hello'",
114+
"title": "event"
115+
}
116+
},
117+
"additionalProperties": false
118+
},
119+
"phx_join_presence": {
120+
"type": "object",
121+
"description": "",
122+
"title": "phx_join_presence",
123+
"properties": {
124+
"enabled": {
125+
"type": "boolean",
126+
"description": "Boolean, e.g. true",
127+
"title": "enabled",
128+
"default": false
129+
},
130+
"key": {
131+
"type": "string",
132+
"description": "String, e.g. 'hello'",
133+
"title": "key"
134+
}
135+
},
136+
"additionalProperties": false
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)