From f905b03e9056b7f8ae305b94e526cb2f5c026aae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Thu, 21 Aug 2025 16:27:42 +0100 Subject: [PATCH 1/5] fix: improve join payload handling improves error handling on join by doing basic checks on types received by realtime. it's the scafold for future validations with more informative errors to the users --- README.md | 1 + lib/realtime_web/channels/realtime_channel.ex | 133 ++++++++--------- .../realtime_channel/payloads/join.ex | 140 ++++++++++++++++++ mix.exs | 2 +- test/integration/rt_channel_test.exs | 24 +-- .../realtime_channel/payloads/join_test.exs | 90 +++++++++++ .../channels/realtime_channel_test.exs | 45 +++++- 7 files changed, 346 insertions(+), 89 deletions(-) create mode 100644 lib/realtime_web/channels/realtime_channel/payloads/join.ex create mode 100644 test/realtime_web/channels/realtime_channel/payloads/join_test.exs diff --git a/README.md b/README.md index 657b621f5..41592608c 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,7 @@ This is the list of operational codes that can help you understand your deployme | Code | Description | | ---------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | TopicNameRequired | You are trying to use Realtime without a topic name set | +| InvalidJoinPayload | The payload provided to Realtime on connect is invalid | | RealtimeDisabledForConfiguration | The configuration provided to Realtime on connect will not be able to provide you any Postgres Changes | | TenantNotFound | The tenant you are trying to connect to does not exist | | ErrorConnectingToWebsocket | Error when trying to connect to the WebSocket server | diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 35c080509..5d6e0322e 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -5,8 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel do use RealtimeWeb, :channel use RealtimeWeb.RealtimeChannel.Logging - alias RealtimeWeb.SocketDisconnect alias DBConnection.Backoff + alias Phoenix.Socket alias Realtime.Crypto alias Realtime.GenCounter @@ -21,11 +21,15 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.Tenants.Authorization.Policies.PresencePolicies alias Realtime.Tenants.Connect + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.RealtimeChannel.BroadcastHandler alias RealtimeWeb.RealtimeChannel.MessageDispatcher alias RealtimeWeb.RealtimeChannel.PresenceHandler alias RealtimeWeb.RealtimeChannel.Tracker + alias RealtimeWeb.SocketDisconnect @confirm_token_ms_interval :timer.minutes(5) @@ -46,15 +50,11 @@ defmodule RealtimeWeb.RealtimeChannel do Logger.metadata(external_id: tenant_id, project: tenant_id) Logger.put_process_level(self(), log_level) - socket = - socket - |> assign_access_token(params) - |> assign_counter() - |> assign_presence_counter() - |> assign(:private?, !!params["config"]["private"]) - |> assign(:policies, nil) + # We always need to assign the access token so we can get the logs metadata working as expected + socket = assign_access_token(socket, params) - with :ok <- SignalHandler.shutdown_in_progress?(), + with {:ok, %Socket{} = socket, %Join{} = configuration} <- configure_socket(socket, params), + :ok <- SignalHandler.shutdown_in_progress?(), :ok <- only_private?(tenant_id, socket), :ok <- limit_joins(socket), :ok <- limit_channels(socket), @@ -64,7 +64,6 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id), {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?) - # fastlane subscription metadata = MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id) @@ -73,15 +72,11 @@ defmodule RealtimeWeb.RealtimeChannel do Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id) - is_new_api = new_api?(params) - # TODO: Default will be moved to false in the future - presence_enabled? = - case get_in(params, ["config", "presence", "enabled"]) do - enabled when is_boolean(enabled) -> enabled - _ -> true - end + is_new_api = new_api?(configuration) - pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic) + presence_enabled? = Join.presence_enabled?(configuration) + + pg_change_params = pg_change_params(is_new_api, configuration, channel_pid, claims, sub_topic) opts = %{ is_new_api: is_new_api, @@ -98,13 +93,13 @@ defmodule RealtimeWeb.RealtimeChannel do state = %{postgres_changes: add_id_to_postgres_changes(pg_change_params)} assigns = %{ - ack_broadcast: !!params["config"]["broadcast"]["ack"], + ack_broadcast: Join.ack_broadcast?(configuration), confirm_token_ref: confirm_token_ref, is_new_api: is_new_api, pg_sub_ref: nil, pg_change_params: pg_change_params, - presence_key: presence_key(params), - self_broadcast: !!params["config"]["broadcast"]["self"], + presence_key: Join.presence_key(configuration), + self_broadcast: Join.self_broadcast?(configuration), tenant_topic: tenant_topic, channel_name: sub_topic, presence_enabled?: presence_enabled? @@ -118,6 +113,9 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, state, assign(socket, assigns)} else + {:error, :invalid_join_payload, errors, socket} -> + log_error(socket, "InvalidJoinPayload", errors) + {:error, :expired_token, msg} -> maybe_log_warning(socket, "InvalidJWTToken", msg) @@ -194,6 +192,23 @@ defmodule RealtimeWeb.RealtimeChannel do end end + defp configure_socket(socket, params) do + case Join.validate(params) do + {:ok, configuration} -> + socket = + socket + |> assign_counter() + |> assign_presence_counter() + |> assign(:private?, Join.private?(configuration)) + |> assign(:policies, nil) + + {:ok, socket, configuration} + + {:error, :invalid_join_payload, errors} -> + {:error, :invalid_join_payload, errors, socket} + end + end + @impl true def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do count(socket) @@ -531,40 +546,24 @@ defmodule RealtimeWeb.RealtimeChannel do defp count(%{assigns: %{rate_counter: counter}}), do: GenCounter.add(counter.id) - defp presence_key(params) do - case params["config"]["presence"]["key"] do - key when is_binary(key) and key != "" -> key - _ -> UUID.uuid1() - end - end - - defp assign_access_token(%{assigns: %{headers: headers}} = socket, params) do - access_token = Map.get(params, "access_token") || Map.get(params, "user_token") + defp assign_access_token(socket, params) do + %{assigns: %{tenant_token: tenant_token, headers: headers}} = socket {_, header} = Enum.find(headers, {nil, nil}, fn {k, _} -> k == "x-api-key" end) - case access_token do - nil -> assign(socket, :access_token, header) - "sb_" <> _ -> assign(socket, :access_token, header) - _ -> handle_access_token(socket, params) - end - end + access_token = Map.get(params, "access_token") + user_token = Map.get(params, "user_token") - defp assign_access_token(socket, params), do: handle_access_token(socket, params) - - defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"user_token" => user_token}) - when is_binary(user_token) do - assign(socket, :access_token, user_token) - end + access_token = + cond do + access_token != nil and !String.starts_with?(access_token, "sb_") -> access_token + user_token != nil and !String.starts_with?(user_token, "sb_") -> user_token + tenant_token != nil and !String.starts_with?(tenant_token, "sb_") -> tenant_token + true -> header + end - defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"access_token" => access_token}) - when is_binary(access_token) do assign(socket, :access_token, access_token) end - defp handle_access_token(%{assigns: %{tenant_token: tenant_token}} = socket, _params) when is_binary(tenant_token) do - assign(socket, :access_token, tenant_token) - end - defp confirm_token(%{assigns: assigns}) do %{jwt_secret: jwt_secret, access_token: access_token} = assigns @@ -631,28 +630,30 @@ defmodule RealtimeWeb.RealtimeChannel do }) end - defp new_api?(%{"config" => _}), do: true + defp new_api?(%Join{config: config}) when not is_nil(config), do: true defp new_api?(_), do: false - defp pg_change_params(true, params, channel_pid, claims, _) do - case get_in(params, ["config", "postgres_changes"]) do - [_ | _] = params_list -> - params_list - |> Enum.reject(&is_nil/1) - |> Enum.map(fn params -> - %{ - id: UUID.uuid1(), - channel_pid: channel_pid, - claims: claims, - params: params - } - end) - - _ -> - [] - end + defp pg_change_params(true, %Join{config: %Config{postgres_changes: postgres_changes}}, channel_pid, claims, _) + when not is_nil(postgres_changes) do + postgres_changes + |> Enum.reject(&is_nil/1) + |> Enum.map(fn %PostgresChange{table: table, event: event, schema: schema, filter: filter} -> + params = + %{"table" => table, "filter" => filter, "schema" => schema, "event" => event} + |> Enum.reject(fn {_, v} -> is_nil(v) end) + |> Map.new() + + %{ + id: UUID.uuid1(), + channel_pid: channel_pid, + claims: claims, + params: params + } + end) end + defp pg_change_params(true, _, _, _, _), do: [] + defp pg_change_params(false, _, channel_pid, claims, sub_topic) do params = case String.split(sub_topic, ":", parts: 3) do diff --git a/lib/realtime_web/channels/realtime_channel/payloads/join.ex b/lib/realtime_web/channels/realtime_channel/payloads/join.ex new file mode 100644 index 000000000..7e7bebc74 --- /dev/null +++ b/lib/realtime_web/channels/realtime_channel/payloads/join.ex @@ -0,0 +1,140 @@ +defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange do + @moduledoc """ + Validate postgres_changes field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join + + embedded_schema do + field :event, :string + field :schema, :string + field :table, :string + field :filter, :string + end + + def changeset(postgres_change, attrs) do + cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2) + end +end + +defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast do + @moduledoc """ + Validate broadcast field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join + + embedded_schema do + field :ack, :boolean, default: false + field :self, :boolean, default: false + end + + def changeset(broadcast, attrs) do + cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2) + end +end + +defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence do + @moduledoc """ + Validate presence field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join + + embedded_schema do + field :enabled, :boolean, default: true + field :key, :string, default: UUID.uuid1() + end + + def changeset(presence, attrs) do + cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2) + end +end + +defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config do + @moduledoc """ + Validate config field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange + + embedded_schema do + embeds_one :broadcast, Broadcast + embeds_one :presence, Presence + embeds_many :postgres_changes, PostgresChange + field :private, :boolean, default: false + end + + def changeset(config, attrs) do + config + |> cast(attrs, [:private], message: &Join.error_message/2) + |> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map") + |> cast_embed(:presence, invalid_message: "unable to parse, expected a map ") + |> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps") + end +end + +defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join do + @moduledoc """ + Payload validation for the phx_join event. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence + + embedded_schema do + embeds_one :config, Config + field :access_token, :string + field :user_token, :string + end + + def changeset(join, attrs) do + join + |> cast(attrs, [:access_token, :user_token], message: &error_message/2) + |> cast_embed(:config, invalid_message: "unable to parse, expected a map") + end + + @spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()} + def validate(params) do + case changeset(%__MODULE__{}, params) do + %Ecto.Changeset{valid?: true} = changeset -> + {:ok, Ecto.Changeset.apply_changes(changeset)} + + %Ecto.Changeset{valid?: false} = changeset -> + errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0)) + {:error, :invalid_join_payload, errors} + end + end + + def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled + def presence_enabled?(_), do: true + + def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key + def presence_key(_), do: UUID.uuid1() + + def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack + def ack_broadcast?(_), do: false + + def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self + def self_broadcast?(_), do: false + + def private?(%__MODULE__{config: %Config{private: private}}), do: private + def private?(_), do: false + + def error_message(_field, meta) do + type = Keyword.get(meta, :type) + + if type, + do: "unable to parse, expected #{type}", + else: "unable to parse" + end +end diff --git a/mix.exs b/mix.exs index 3177a742a..a30970c6f 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.43.1", + version: "2.43.2", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 3eec0e18a..14f4da622 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -420,28 +420,18 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "handle nil postgres changes params as empty param changes", %{tenant: tenant} do + test "nil postgres changes params identified as error", %{tenant: tenant} do {socket, _} = get_connection(tenant) topic = "realtime:any" config = %{postgres_changes: [nil]} - WebsocketClient.join(socket, topic, %{config: config}) - - assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 200 - assert_receive %Phoenix.Socket.Message{event: "presence_state", payload: %{}, topic: ^topic}, 500 + log = + capture_log(fn -> + WebsocketClient.join(socket, topic, %{config: config}) + Process.sleep(500) + end) - refute_receive %Message{ - event: "system", - payload: %{ - "channel" => "any", - "extension" => "postgres_changes", - "message" => "Subscribed to PostgreSQL", - "status" => "ok" - }, - ref: nil, - topic: ^topic - }, - 1000 + assert log =~ "InvalidJoinPayload" end end diff --git a/test/realtime_web/channels/realtime_channel/payloads/join_test.exs b/test/realtime_web/channels/realtime_channel/payloads/join_test.exs new file mode 100644 index 000000000..37b6ac888 --- /dev/null +++ b/test/realtime_web/channels/realtime_channel/payloads/join_test.exs @@ -0,0 +1,90 @@ +defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.JoinTest do + use ExUnit.Case + + import Generators + + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence + alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange + + describe "validate/1" do + test "valid payload allows join" do + key = random_string() + access_token = random_string() + + config = %{ + "config" => %{ + "private" => false, + "broadcast" => %{"ack" => false, "self" => false}, + "presence" => %{"enabled" => true, "key" => key}, + "postgres_changes" => [ + %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, + %{"event" => "DELETE", "schema" => "public", "table" => "users", "filter" => "id=eq.2"}, + %{"event" => "UPDATE", "schema" => "public", "table" => "users", "filter" => "id=eq.3"} + ] + }, + "access_token" => access_token + } + + assert {:ok, %Join{config: config, access_token: ^access_token}} = Join.validate(config) + + assert %Config{ + private: false, + broadcast: broadcast, + presence: presence, + postgres_changes: postgres_changes + } = config + + assert %Broadcast{ack: false, self: false} = broadcast + assert %Presence{enabled: true, key: ^key} = presence + + assert [ + %PostgresChange{event: "INSERT", schema: "public", table: "users", filter: "id=eq.1"}, + %PostgresChange{event: "DELETE", schema: "public", table: "users", filter: "id=eq.2"}, + %PostgresChange{event: "UPDATE", schema: "public", table: "users", filter: "id=eq.3"} + ] = postgres_changes + end + + test "presence key as default" do + config = %{"config" => %{"presence" => %{"enabled" => true}}} + + assert {:ok, %Join{config: %Config{presence: %Presence{key: key}}}} = Join.validate(config) + + assert key != "" + assert is_binary(key) + end + + test "invalid payload returns errors" do + config = %{"config" => ["test"]} + + assert {:error, :invalid_join_payload, %{config: error}} = Join.validate(config) + assert error == ["unable to parse, expected a map"] + end + + test "invalid nested configurations returns errors" do + config = %{ + "config" => %{ + "broadcast" => %{"ack" => "test"}, + "presence" => %{"enabled" => "test"}, + "postgres_changes" => %{"event" => "test"} + }, + "access_token" => true, + "user_token" => true + } + + assert {:error, :invalid_join_payload, errors} = Join.validate(config) + + assert errors == %{ + config: %{ + broadcast: %{ack: ["unable to parse, expected boolean"]}, + presence: %{enabled: ["unable to parse, expected boolean"]}, + postgres_changes: ["unable to parse, expected an array of maps"] + }, + access_token: ["unable to parse, expected string"], + user_token: ["unable to parse, expected string"] + } + end + end +end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 558ad5eb6..b7bf04049 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -503,12 +503,9 @@ defmodule RealtimeWeb.RealtimeChannelTest do test "expired jwt returns a error with sub data if available log_level=warning", %{tenant: tenant} do sub = random_string() - api_key = Generators.generate_jwt_token(tenant) - - jwt = - Generators.generate_jwt_token(tenant, %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub}) - + claims = %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub} + jwt = Generators.generate_jwt_token(tenant, claims) assert {:ok, socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, api_key)) log = @@ -675,6 +672,44 @@ defmodule RealtimeWeb.RealtimeChannelTest do end end + describe "join payload validations" do + test "valid payload allows join", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "config" => %{ + "private" => false, + "broadcast" => %{"ack" => false, "self" => false}, + "presence" => %{"enabled" => true, "key" => "potato"}, + "postgres_changes" => [ + %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, + %{"event" => "DELETE", "schema" => "public", "table" => "users", "filter" => "id=eq.2"}, + %{"event" => "UPDATE", "schema" => "public", "table" => "users", "filter" => "id=eq.3"} + ] + }, + "access_token" => jwt + } + + assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", config) + end + + test "invalid payload returns error", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + log = + capture_log(fn -> + assert {:error, %{reason: reason}} = + subscribe_and_join(socket, "realtime:test", %{"config" => "potato"}) + + assert reason =~ "unable to parse, expected a map" + end) + + assert log =~ "InvalidJoinPayload" + end + end + test "registers transport pid and channel pid per tenant", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) From 7cfdb7755ed7a6d21be0e45058c0eab02556154f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Thu, 21 Aug 2025 18:04:36 +0100 Subject: [PATCH 2/5] generated json schema for phx_join added to repo --- phx_join.schema.json | 139 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 phx_join.schema.json diff --git a/phx_join.schema.json b/phx_join.schema.json new file mode 100644 index 000000000..02719026c --- /dev/null +++ b/phx_join.schema.json @@ -0,0 +1,139 @@ +{ + "type": "object", + "description": "", + "title": "phx_join", + "required": [ + "access_token", + "config" + ], + "properties": { + "config": { + "title": "config", + "$ref": "#/$defs/phx_join_config" + }, + "access_token": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "access_token" + } + }, + "additionalProperties": false, + "$defs": { + "phx_join_config": { + "type": "object", + "description": "", + "title": "phx_join_config", + "properties": { + "private": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "private", + "default": false + }, + "broadcast": { + "title": "broadcast", + "$ref": "#/$defs/phx_join_broadcast", + "default": { + "self": false, + "ack": false + } + }, + "presence": { + "title": "presence", + "$ref": "#/$defs/phx_join_presence", + "default": { + "enabled": false, + "key": "" + } + }, + "postgres_changes": { + "type": "array", + "title": "phx_join_postgres_changes", + "items": { + "$ref": "#/$defs/phx_join_postgres_changes", + "default": { + "table": "", + "filter": "", + "schema": "", + "event": "" + } + } + } + }, + "additionalProperties": false + }, + "phx_join_broadcast": { + "type": "object", + "description": "", + "title": "phx_join_broadcast", + "properties": { + "self": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "self", + "default": false + }, + "ack": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "ack", + "default": false + } + }, + "additionalProperties": false + }, + "phx_join_postgres_changes": { + "type": "object", + "description": "", + "title": "phx_join_postgres_changes", + "required": [ + "event", + "filter", + "schema", + "table" + ], + "properties": { + "table": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "table" + }, + "filter": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "filter" + }, + "schema": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "schema" + }, + "event": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "event" + } + }, + "additionalProperties": false + }, + "phx_join_presence": { + "type": "object", + "description": "", + "title": "phx_join_presence", + "properties": { + "enabled": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "enabled", + "default": false + }, + "key": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "key" + } + }, + "additionalProperties": false + } + } +} \ No newline at end of file From cfb3167ab4a362fead34bb1509ea420b3dac7f6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Mon, 25 Aug 2025 13:22:41 +0100 Subject: [PATCH 3/5] apply PR feedback --- .../channels/payloads/broadcast.ex | 17 +++ lib/realtime_web/channels/payloads/config.ex | 26 ++++ lib/realtime_web/channels/payloads/join.ex | 58 ++++++++ .../channels/payloads/postgres_change.ex | 19 +++ .../channels/payloads/presence.ex | 17 +++ lib/realtime_web/channels/realtime_channel.ex | 12 +- .../realtime_channel/payloads/join.ex | 140 ------------------ .../payloads/join_test.exs | 18 ++- 8 files changed, 155 insertions(+), 152 deletions(-) create mode 100644 lib/realtime_web/channels/payloads/broadcast.ex create mode 100644 lib/realtime_web/channels/payloads/config.ex create mode 100644 lib/realtime_web/channels/payloads/join.ex create mode 100644 lib/realtime_web/channels/payloads/postgres_change.ex create mode 100644 lib/realtime_web/channels/payloads/presence.ex delete mode 100644 lib/realtime_web/channels/realtime_channel/payloads/join.ex rename test/realtime_web/channels/{realtime_channel => }/payloads/join_test.exs (85%) diff --git a/lib/realtime_web/channels/payloads/broadcast.ex b/lib/realtime_web/channels/payloads/broadcast.ex new file mode 100644 index 000000000..7feddb043 --- /dev/null +++ b/lib/realtime_web/channels/payloads/broadcast.ex @@ -0,0 +1,17 @@ +defmodule RealtimeWeb.Channels.Payloads.Broadcast do + @moduledoc """ + Validate broadcast field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :ack, :boolean, default: false + field :self, :boolean, default: false + end + + def changeset(broadcast, attrs) do + cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/payloads/config.ex b/lib/realtime_web/channels/payloads/config.ex new file mode 100644 index 000000000..923020174 --- /dev/null +++ b/lib/realtime_web/channels/payloads/config.ex @@ -0,0 +1,26 @@ +defmodule RealtimeWeb.Channels.Payloads.Config do + @moduledoc """ + Validate config field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Presence + alias RealtimeWeb.Channels.Payloads.PostgresChange + + embedded_schema do + embeds_one :broadcast, Broadcast + embeds_one :presence, Presence + embeds_many :postgres_changes, PostgresChange + field :private, :boolean, default: false + end + + def changeset(config, attrs) do + config + |> cast(attrs, [:private], message: &Join.error_message/2) + |> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map") + |> cast_embed(:presence, invalid_message: "unable to parse, expected a map") + |> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps") + end +end diff --git a/lib/realtime_web/channels/payloads/join.ex b/lib/realtime_web/channels/payloads/join.ex new file mode 100644 index 000000000..6f5e3ef11 --- /dev/null +++ b/lib/realtime_web/channels/payloads/join.ex @@ -0,0 +1,58 @@ +defmodule RealtimeWeb.Channels.Payloads.Join do + @moduledoc """ + Payload validation for the phx_join event. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Config + alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Presence + + embedded_schema do + embeds_one :config, Config + field :access_token, :string + field :user_token, :string + end + + def changeset(join, attrs) do + join + |> cast(attrs, [:access_token, :user_token], message: &error_message/2) + |> cast_embed(:config, invalid_message: "unable to parse, expected a map") + end + + @spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()} + def validate(params) do + case changeset(%__MODULE__{}, params) do + %Ecto.Changeset{valid?: true} = changeset -> + {:ok, Ecto.Changeset.apply_changes(changeset)} + + %Ecto.Changeset{valid?: false} = changeset -> + errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0)) + {:error, :invalid_join_payload, errors} + end + end + + def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled + def presence_enabled?(_), do: true + + def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: ""}}}), do: UUID.uuid1() + def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key + def presence_key(_), do: UUID.uuid1() + + def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack + def ack_broadcast?(_), do: false + + def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self + def self_broadcast?(_), do: false + + def private?(%__MODULE__{config: %Config{private: private}}), do: private + def private?(_), do: false + + def error_message(_field, meta) do + type = Keyword.get(meta, :type) + + if type, + do: "unable to parse, expected #{type}", + else: "unable to parse" + end +end diff --git a/lib/realtime_web/channels/payloads/postgres_change.ex b/lib/realtime_web/channels/payloads/postgres_change.ex new file mode 100644 index 000000000..d1fa16b57 --- /dev/null +++ b/lib/realtime_web/channels/payloads/postgres_change.ex @@ -0,0 +1,19 @@ +defmodule RealtimeWeb.Channels.Payloads.PostgresChange do + @moduledoc """ + Validate postgres_changes field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :event, :string + field :schema, :string + field :table, :string + field :filter, :string + end + + def changeset(postgres_change, attrs) do + cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/payloads/presence.ex b/lib/realtime_web/channels/payloads/presence.ex new file mode 100644 index 000000000..53e09047d --- /dev/null +++ b/lib/realtime_web/channels/payloads/presence.ex @@ -0,0 +1,17 @@ +defmodule RealtimeWeb.Channels.Payloads.Presence do + @moduledoc """ + Validate presence field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :enabled, :boolean, default: true + field :key, :string, default: UUID.uuid1() + end + + def changeset(presence, attrs) do + cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 5d6e0322e..256a4c96b 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -21,9 +21,9 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.Tenants.Authorization.Policies.PresencePolicies alias Realtime.Tenants.Connect - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange + alias RealtimeWeb.Channels.Payloads.Join + alias RealtimeWeb.Channels.Payloads.Config + alias RealtimeWeb.Channels.Payloads.PostgresChange alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.RealtimeChannel.BroadcastHandler alias RealtimeWeb.RealtimeChannel.MessageDispatcher @@ -555,9 +555,9 @@ defmodule RealtimeWeb.RealtimeChannel do access_token = cond do - access_token != nil and !String.starts_with?(access_token, "sb_") -> access_token - user_token != nil and !String.starts_with?(user_token, "sb_") -> user_token - tenant_token != nil and !String.starts_with?(tenant_token, "sb_") -> tenant_token + is_binary(access_token) and !String.starts_with?(access_token, "sb_") -> access_token + is_binary(user_token) and !String.starts_with?(user_token, "sb_") -> user_token + is_binary(tenant_token) and !String.starts_with?(tenant_token, "sb_") -> tenant_token true -> header end diff --git a/lib/realtime_web/channels/realtime_channel/payloads/join.ex b/lib/realtime_web/channels/realtime_channel/payloads/join.ex deleted file mode 100644 index 7e7bebc74..000000000 --- a/lib/realtime_web/channels/realtime_channel/payloads/join.ex +++ /dev/null @@ -1,140 +0,0 @@ -defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange do - @moduledoc """ - Validate postgres_changes field of the join payload. - """ - use Ecto.Schema - import Ecto.Changeset - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join - - embedded_schema do - field :event, :string - field :schema, :string - field :table, :string - field :filter, :string - end - - def changeset(postgres_change, attrs) do - cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2) - end -end - -defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast do - @moduledoc """ - Validate broadcast field of the join payload. - """ - use Ecto.Schema - import Ecto.Changeset - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join - - embedded_schema do - field :ack, :boolean, default: false - field :self, :boolean, default: false - end - - def changeset(broadcast, attrs) do - cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2) - end -end - -defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence do - @moduledoc """ - Validate presence field of the join payload. - """ - use Ecto.Schema - import Ecto.Changeset - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join - - embedded_schema do - field :enabled, :boolean, default: true - field :key, :string, default: UUID.uuid1() - end - - def changeset(presence, attrs) do - cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2) - end -end - -defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config do - @moduledoc """ - Validate config field of the join payload. - """ - use Ecto.Schema - import Ecto.Changeset - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange - - embedded_schema do - embeds_one :broadcast, Broadcast - embeds_one :presence, Presence - embeds_many :postgres_changes, PostgresChange - field :private, :boolean, default: false - end - - def changeset(config, attrs) do - config - |> cast(attrs, [:private], message: &Join.error_message/2) - |> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map") - |> cast_embed(:presence, invalid_message: "unable to parse, expected a map ") - |> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps") - end -end - -defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.Join do - @moduledoc """ - Payload validation for the phx_join event. - """ - use Ecto.Schema - import Ecto.Changeset - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence - - embedded_schema do - embeds_one :config, Config - field :access_token, :string - field :user_token, :string - end - - def changeset(join, attrs) do - join - |> cast(attrs, [:access_token, :user_token], message: &error_message/2) - |> cast_embed(:config, invalid_message: "unable to parse, expected a map") - end - - @spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()} - def validate(params) do - case changeset(%__MODULE__{}, params) do - %Ecto.Changeset{valid?: true} = changeset -> - {:ok, Ecto.Changeset.apply_changes(changeset)} - - %Ecto.Changeset{valid?: false} = changeset -> - errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0)) - {:error, :invalid_join_payload, errors} - end - end - - def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled - def presence_enabled?(_), do: true - - def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key - def presence_key(_), do: UUID.uuid1() - - def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack - def ack_broadcast?(_), do: false - - def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self - def self_broadcast?(_), do: false - - def private?(%__MODULE__{config: %Config{private: private}}), do: private - def private?(_), do: false - - def error_message(_field, meta) do - type = Keyword.get(meta, :type) - - if type, - do: "unable to parse, expected #{type}", - else: "unable to parse" - end -end diff --git a/test/realtime_web/channels/realtime_channel/payloads/join_test.exs b/test/realtime_web/channels/payloads/join_test.exs similarity index 85% rename from test/realtime_web/channels/realtime_channel/payloads/join_test.exs rename to test/realtime_web/channels/payloads/join_test.exs index 37b6ac888..4fdcd9f42 100644 --- a/test/realtime_web/channels/realtime_channel/payloads/join_test.exs +++ b/test/realtime_web/channels/payloads/join_test.exs @@ -1,13 +1,13 @@ -defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.JoinTest do +defmodule RealtimeWeb.Channels.Payloads.JoinTest do use ExUnit.Case import Generators - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Broadcast - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.Presence - alias RealtimeWeb.Channels.RealtimeChannel.Payloads.Join.Config.PostgresChange + alias RealtimeWeb.Channels.Payloads.Join + alias RealtimeWeb.Channels.Payloads.Config + alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Presence + alias RealtimeWeb.Channels.Payloads.PostgresChange describe "validate/1" do test "valid payload allows join" do @@ -56,6 +56,12 @@ defmodule RealtimeWeb.Channels.RealtimeChannel.Payloads.JoinTest do assert is_binary(key) end + test "missing enabled presence defaults to true" do + config = %{"config" => %{"presence" => %{}}} + + assert {:ok, %Join{config: %Config{presence: %Presence{enabled: true}}}} = Join.validate(config) + end + test "invalid payload returns errors" do config = %{"config" => ["test"]} From 1d140af3134f620ec1fa454cd9e4c96548cf953a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Thu, 28 Aug 2025 00:26:33 +0100 Subject: [PATCH 4/5] only log in case of error --- lib/realtime_web/channels/realtime_channel.ex | 137 +++++++++--------- test/integration/rt_channel_test.exs | 24 ++- .../channels/realtime_channel_test.exs | 45 +----- 3 files changed, 93 insertions(+), 113 deletions(-) diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 256a4c96b..863305628 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -5,8 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel do use RealtimeWeb, :channel use RealtimeWeb.RealtimeChannel.Logging + alias RealtimeWeb.SocketDisconnect alias DBConnection.Backoff - alias Phoenix.Socket alias Realtime.Crypto alias Realtime.GenCounter @@ -22,14 +22,11 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.Tenants.Connect alias RealtimeWeb.Channels.Payloads.Join - alias RealtimeWeb.Channels.Payloads.Config - alias RealtimeWeb.Channels.Payloads.PostgresChange alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.RealtimeChannel.BroadcastHandler alias RealtimeWeb.RealtimeChannel.MessageDispatcher alias RealtimeWeb.RealtimeChannel.PresenceHandler alias RealtimeWeb.RealtimeChannel.Tracker - alias RealtimeWeb.SocketDisconnect @confirm_token_ms_interval :timer.minutes(5) @@ -50,11 +47,20 @@ defmodule RealtimeWeb.RealtimeChannel do Logger.metadata(external_id: tenant_id, project: tenant_id) Logger.put_process_level(self(), log_level) - # We always need to assign the access token so we can get the logs metadata working as expected - socket = assign_access_token(socket, params) + socket = + socket + |> assign_access_token(params) + |> assign_counter() + |> assign_presence_counter() + |> assign(:private?, !!params["config"]["private"]) + |> assign(:policies, nil) + + case Join.validate(params) do + {:ok, _join} -> nil + {:error, :invalid_join_payload, errors} -> log_error(socket, "InvalidJoinPayload", errors) + end - with {:ok, %Socket{} = socket, %Join{} = configuration} <- configure_socket(socket, params), - :ok <- SignalHandler.shutdown_in_progress?(), + with :ok <- SignalHandler.shutdown_in_progress?(), :ok <- only_private?(tenant_id, socket), :ok <- limit_joins(socket), :ok <- limit_channels(socket), @@ -64,6 +70,7 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id), {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?) + # fastlane subscription metadata = MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id) @@ -72,11 +79,15 @@ defmodule RealtimeWeb.RealtimeChannel do Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id) - is_new_api = new_api?(configuration) - - presence_enabled? = Join.presence_enabled?(configuration) + is_new_api = new_api?(params) + # TODO: Default will be moved to false in the future + presence_enabled? = + case get_in(params, ["config", "presence", "enabled"]) do + enabled when is_boolean(enabled) -> enabled + _ -> true + end - pg_change_params = pg_change_params(is_new_api, configuration, channel_pid, claims, sub_topic) + pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic) opts = %{ is_new_api: is_new_api, @@ -93,13 +104,13 @@ defmodule RealtimeWeb.RealtimeChannel do state = %{postgres_changes: add_id_to_postgres_changes(pg_change_params)} assigns = %{ - ack_broadcast: Join.ack_broadcast?(configuration), + ack_broadcast: !!params["config"]["broadcast"]["ack"], confirm_token_ref: confirm_token_ref, is_new_api: is_new_api, pg_sub_ref: nil, pg_change_params: pg_change_params, - presence_key: Join.presence_key(configuration), - self_broadcast: Join.self_broadcast?(configuration), + presence_key: presence_key(params), + self_broadcast: !!params["config"]["broadcast"]["self"], tenant_topic: tenant_topic, channel_name: sub_topic, presence_enabled?: presence_enabled? @@ -113,9 +124,6 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, state, assign(socket, assigns)} else - {:error, :invalid_join_payload, errors, socket} -> - log_error(socket, "InvalidJoinPayload", errors) - {:error, :expired_token, msg} -> maybe_log_warning(socket, "InvalidJWTToken", msg) @@ -192,23 +200,6 @@ defmodule RealtimeWeb.RealtimeChannel do end end - defp configure_socket(socket, params) do - case Join.validate(params) do - {:ok, configuration} -> - socket = - socket - |> assign_counter() - |> assign_presence_counter() - |> assign(:private?, Join.private?(configuration)) - |> assign(:policies, nil) - - {:ok, socket, configuration} - - {:error, :invalid_join_payload, errors} -> - {:error, :invalid_join_payload, errors, socket} - end - end - @impl true def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do count(socket) @@ -546,24 +537,40 @@ defmodule RealtimeWeb.RealtimeChannel do defp count(%{assigns: %{rate_counter: counter}}), do: GenCounter.add(counter.id) - defp assign_access_token(socket, params) do - %{assigns: %{tenant_token: tenant_token, headers: headers}} = socket + defp presence_key(params) do + case params["config"]["presence"]["key"] do + key when is_binary(key) and key != "" -> key + _ -> UUID.uuid1() + end + end + + defp assign_access_token(%{assigns: %{headers: headers}} = socket, params) do + access_token = Map.get(params, "access_token") || Map.get(params, "user_token") {_, header} = Enum.find(headers, {nil, nil}, fn {k, _} -> k == "x-api-key" end) - access_token = Map.get(params, "access_token") - user_token = Map.get(params, "user_token") + case access_token do + nil -> assign(socket, :access_token, header) + "sb_" <> _ -> assign(socket, :access_token, header) + _ -> handle_access_token(socket, params) + end + end - access_token = - cond do - is_binary(access_token) and !String.starts_with?(access_token, "sb_") -> access_token - is_binary(user_token) and !String.starts_with?(user_token, "sb_") -> user_token - is_binary(tenant_token) and !String.starts_with?(tenant_token, "sb_") -> tenant_token - true -> header - end + defp assign_access_token(socket, params), do: handle_access_token(socket, params) + + defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"user_token" => user_token}) + when is_binary(user_token) do + assign(socket, :access_token, user_token) + end + defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"access_token" => access_token}) + when is_binary(access_token) do assign(socket, :access_token, access_token) end + defp handle_access_token(%{assigns: %{tenant_token: tenant_token}} = socket, _params) when is_binary(tenant_token) do + assign(socket, :access_token, tenant_token) + end + defp confirm_token(%{assigns: assigns}) do %{jwt_secret: jwt_secret, access_token: access_token} = assigns @@ -630,30 +637,28 @@ defmodule RealtimeWeb.RealtimeChannel do }) end - defp new_api?(%Join{config: config}) when not is_nil(config), do: true + defp new_api?(%{"config" => _}), do: true defp new_api?(_), do: false - defp pg_change_params(true, %Join{config: %Config{postgres_changes: postgres_changes}}, channel_pid, claims, _) - when not is_nil(postgres_changes) do - postgres_changes - |> Enum.reject(&is_nil/1) - |> Enum.map(fn %PostgresChange{table: table, event: event, schema: schema, filter: filter} -> - params = - %{"table" => table, "filter" => filter, "schema" => schema, "event" => event} - |> Enum.reject(fn {_, v} -> is_nil(v) end) - |> Map.new() - - %{ - id: UUID.uuid1(), - channel_pid: channel_pid, - claims: claims, - params: params - } - end) + defp pg_change_params(true, params, channel_pid, claims, _) do + case get_in(params, ["config", "postgres_changes"]) do + [_ | _] = params_list -> + params_list + |> Enum.reject(&is_nil/1) + |> Enum.map(fn params -> + %{ + id: UUID.uuid1(), + channel_pid: channel_pid, + claims: claims, + params: params + } + end) + + _ -> + [] + end end - defp pg_change_params(true, _, _, _, _), do: [] - defp pg_change_params(false, _, channel_pid, claims, sub_topic) do params = case String.split(sub_topic, ":", parts: 3) do diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 14f4da622..3eec0e18a 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -420,18 +420,28 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "nil postgres changes params identified as error", %{tenant: tenant} do + test "handle nil postgres changes params as empty param changes", %{tenant: tenant} do {socket, _} = get_connection(tenant) topic = "realtime:any" config = %{postgres_changes: [nil]} - log = - capture_log(fn -> - WebsocketClient.join(socket, topic, %{config: config}) - Process.sleep(500) - end) + WebsocketClient.join(socket, topic, %{config: config}) - assert log =~ "InvalidJoinPayload" + assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 200 + assert_receive %Phoenix.Socket.Message{event: "presence_state", payload: %{}, topic: ^topic}, 500 + + refute_receive %Message{ + event: "system", + payload: %{ + "channel" => "any", + "extension" => "postgres_changes", + "message" => "Subscribed to PostgreSQL", + "status" => "ok" + }, + ref: nil, + topic: ^topic + }, + 1000 end end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index b7bf04049..558ad5eb6 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -503,9 +503,12 @@ defmodule RealtimeWeb.RealtimeChannelTest do test "expired jwt returns a error with sub data if available log_level=warning", %{tenant: tenant} do sub = random_string() + api_key = Generators.generate_jwt_token(tenant) - claims = %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub} - jwt = Generators.generate_jwt_token(tenant, claims) + + jwt = + Generators.generate_jwt_token(tenant, %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub}) + assert {:ok, socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, api_key)) log = @@ -672,44 +675,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do end end - describe "join payload validations" do - test "valid payload allows join", %{tenant: tenant} do - jwt = Generators.generate_jwt_token(tenant) - {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) - - config = %{ - "config" => %{ - "private" => false, - "broadcast" => %{"ack" => false, "self" => false}, - "presence" => %{"enabled" => true, "key" => "potato"}, - "postgres_changes" => [ - %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, - %{"event" => "DELETE", "schema" => "public", "table" => "users", "filter" => "id=eq.2"}, - %{"event" => "UPDATE", "schema" => "public", "table" => "users", "filter" => "id=eq.3"} - ] - }, - "access_token" => jwt - } - - assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", config) - end - - test "invalid payload returns error", %{tenant: tenant} do - jwt = Generators.generate_jwt_token(tenant) - {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) - - log = - capture_log(fn -> - assert {:error, %{reason: reason}} = - subscribe_and_join(socket, "realtime:test", %{"config" => "potato"}) - - assert reason =~ "unable to parse, expected a map" - end) - - assert log =~ "InvalidJoinPayload" - end - end - test "registers transport pid and channel pid per tenant", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) From 6b3d992c0be5227259281e888f00b14656efab61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Thu, 28 Aug 2025 00:29:58 +0100 Subject: [PATCH 5/5] fix: on bad payload, block join --- lib/realtime_web/channels/realtime_channel.ex | 137 +++++++++--------- mix.exs | 2 +- test/integration/rt_channel_test.exs | 24 +-- .../channels/realtime_channel_test.exs | 45 +++++- 4 files changed, 114 insertions(+), 94 deletions(-) diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 863305628..256a4c96b 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -5,8 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel do use RealtimeWeb, :channel use RealtimeWeb.RealtimeChannel.Logging - alias RealtimeWeb.SocketDisconnect alias DBConnection.Backoff + alias Phoenix.Socket alias Realtime.Crypto alias Realtime.GenCounter @@ -22,11 +22,14 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.Tenants.Connect alias RealtimeWeb.Channels.Payloads.Join + alias RealtimeWeb.Channels.Payloads.Config + alias RealtimeWeb.Channels.Payloads.PostgresChange alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.RealtimeChannel.BroadcastHandler alias RealtimeWeb.RealtimeChannel.MessageDispatcher alias RealtimeWeb.RealtimeChannel.PresenceHandler alias RealtimeWeb.RealtimeChannel.Tracker + alias RealtimeWeb.SocketDisconnect @confirm_token_ms_interval :timer.minutes(5) @@ -47,20 +50,11 @@ defmodule RealtimeWeb.RealtimeChannel do Logger.metadata(external_id: tenant_id, project: tenant_id) Logger.put_process_level(self(), log_level) - socket = - socket - |> assign_access_token(params) - |> assign_counter() - |> assign_presence_counter() - |> assign(:private?, !!params["config"]["private"]) - |> assign(:policies, nil) - - case Join.validate(params) do - {:ok, _join} -> nil - {:error, :invalid_join_payload, errors} -> log_error(socket, "InvalidJoinPayload", errors) - end + # We always need to assign the access token so we can get the logs metadata working as expected + socket = assign_access_token(socket, params) - with :ok <- SignalHandler.shutdown_in_progress?(), + with {:ok, %Socket{} = socket, %Join{} = configuration} <- configure_socket(socket, params), + :ok <- SignalHandler.shutdown_in_progress?(), :ok <- only_private?(tenant_id, socket), :ok <- limit_joins(socket), :ok <- limit_channels(socket), @@ -70,7 +64,6 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id), {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?) - # fastlane subscription metadata = MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id) @@ -79,15 +72,11 @@ defmodule RealtimeWeb.RealtimeChannel do Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id) - is_new_api = new_api?(params) - # TODO: Default will be moved to false in the future - presence_enabled? = - case get_in(params, ["config", "presence", "enabled"]) do - enabled when is_boolean(enabled) -> enabled - _ -> true - end + is_new_api = new_api?(configuration) - pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic) + presence_enabled? = Join.presence_enabled?(configuration) + + pg_change_params = pg_change_params(is_new_api, configuration, channel_pid, claims, sub_topic) opts = %{ is_new_api: is_new_api, @@ -104,13 +93,13 @@ defmodule RealtimeWeb.RealtimeChannel do state = %{postgres_changes: add_id_to_postgres_changes(pg_change_params)} assigns = %{ - ack_broadcast: !!params["config"]["broadcast"]["ack"], + ack_broadcast: Join.ack_broadcast?(configuration), confirm_token_ref: confirm_token_ref, is_new_api: is_new_api, pg_sub_ref: nil, pg_change_params: pg_change_params, - presence_key: presence_key(params), - self_broadcast: !!params["config"]["broadcast"]["self"], + presence_key: Join.presence_key(configuration), + self_broadcast: Join.self_broadcast?(configuration), tenant_topic: tenant_topic, channel_name: sub_topic, presence_enabled?: presence_enabled? @@ -124,6 +113,9 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, state, assign(socket, assigns)} else + {:error, :invalid_join_payload, errors, socket} -> + log_error(socket, "InvalidJoinPayload", errors) + {:error, :expired_token, msg} -> maybe_log_warning(socket, "InvalidJWTToken", msg) @@ -200,6 +192,23 @@ defmodule RealtimeWeb.RealtimeChannel do end end + defp configure_socket(socket, params) do + case Join.validate(params) do + {:ok, configuration} -> + socket = + socket + |> assign_counter() + |> assign_presence_counter() + |> assign(:private?, Join.private?(configuration)) + |> assign(:policies, nil) + + {:ok, socket, configuration} + + {:error, :invalid_join_payload, errors} -> + {:error, :invalid_join_payload, errors, socket} + end + end + @impl true def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do count(socket) @@ -537,40 +546,24 @@ defmodule RealtimeWeb.RealtimeChannel do defp count(%{assigns: %{rate_counter: counter}}), do: GenCounter.add(counter.id) - defp presence_key(params) do - case params["config"]["presence"]["key"] do - key when is_binary(key) and key != "" -> key - _ -> UUID.uuid1() - end - end - - defp assign_access_token(%{assigns: %{headers: headers}} = socket, params) do - access_token = Map.get(params, "access_token") || Map.get(params, "user_token") + defp assign_access_token(socket, params) do + %{assigns: %{tenant_token: tenant_token, headers: headers}} = socket {_, header} = Enum.find(headers, {nil, nil}, fn {k, _} -> k == "x-api-key" end) - case access_token do - nil -> assign(socket, :access_token, header) - "sb_" <> _ -> assign(socket, :access_token, header) - _ -> handle_access_token(socket, params) - end - end - - defp assign_access_token(socket, params), do: handle_access_token(socket, params) + access_token = Map.get(params, "access_token") + user_token = Map.get(params, "user_token") - defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"user_token" => user_token}) - when is_binary(user_token) do - assign(socket, :access_token, user_token) - end + access_token = + cond do + is_binary(access_token) and !String.starts_with?(access_token, "sb_") -> access_token + is_binary(user_token) and !String.starts_with?(user_token, "sb_") -> user_token + is_binary(tenant_token) and !String.starts_with?(tenant_token, "sb_") -> tenant_token + true -> header + end - defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"access_token" => access_token}) - when is_binary(access_token) do assign(socket, :access_token, access_token) end - defp handle_access_token(%{assigns: %{tenant_token: tenant_token}} = socket, _params) when is_binary(tenant_token) do - assign(socket, :access_token, tenant_token) - end - defp confirm_token(%{assigns: assigns}) do %{jwt_secret: jwt_secret, access_token: access_token} = assigns @@ -637,28 +630,30 @@ defmodule RealtimeWeb.RealtimeChannel do }) end - defp new_api?(%{"config" => _}), do: true + defp new_api?(%Join{config: config}) when not is_nil(config), do: true defp new_api?(_), do: false - defp pg_change_params(true, params, channel_pid, claims, _) do - case get_in(params, ["config", "postgres_changes"]) do - [_ | _] = params_list -> - params_list - |> Enum.reject(&is_nil/1) - |> Enum.map(fn params -> - %{ - id: UUID.uuid1(), - channel_pid: channel_pid, - claims: claims, - params: params - } - end) - - _ -> - [] - end + defp pg_change_params(true, %Join{config: %Config{postgres_changes: postgres_changes}}, channel_pid, claims, _) + when not is_nil(postgres_changes) do + postgres_changes + |> Enum.reject(&is_nil/1) + |> Enum.map(fn %PostgresChange{table: table, event: event, schema: schema, filter: filter} -> + params = + %{"table" => table, "filter" => filter, "schema" => schema, "event" => event} + |> Enum.reject(fn {_, v} -> is_nil(v) end) + |> Map.new() + + %{ + id: UUID.uuid1(), + channel_pid: channel_pid, + claims: claims, + params: params + } + end) end + defp pg_change_params(true, _, _, _, _), do: [] + defp pg_change_params(false, _, channel_pid, claims, sub_topic) do params = case String.split(sub_topic, ":", parts: 3) do diff --git a/mix.exs b/mix.exs index a30970c6f..ce1ee4e38 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.43.2", + version: "2.43.3", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 3eec0e18a..14f4da622 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -420,28 +420,18 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "handle nil postgres changes params as empty param changes", %{tenant: tenant} do + test "nil postgres changes params identified as error", %{tenant: tenant} do {socket, _} = get_connection(tenant) topic = "realtime:any" config = %{postgres_changes: [nil]} - WebsocketClient.join(socket, topic, %{config: config}) - - assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 200 - assert_receive %Phoenix.Socket.Message{event: "presence_state", payload: %{}, topic: ^topic}, 500 + log = + capture_log(fn -> + WebsocketClient.join(socket, topic, %{config: config}) + Process.sleep(500) + end) - refute_receive %Message{ - event: "system", - payload: %{ - "channel" => "any", - "extension" => "postgres_changes", - "message" => "Subscribed to PostgreSQL", - "status" => "ok" - }, - ref: nil, - topic: ^topic - }, - 1000 + assert log =~ "InvalidJoinPayload" end end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 558ad5eb6..b7bf04049 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -503,12 +503,9 @@ defmodule RealtimeWeb.RealtimeChannelTest do test "expired jwt returns a error with sub data if available log_level=warning", %{tenant: tenant} do sub = random_string() - api_key = Generators.generate_jwt_token(tenant) - - jwt = - Generators.generate_jwt_token(tenant, %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub}) - + claims = %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub} + jwt = Generators.generate_jwt_token(tenant, claims) assert {:ok, socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, api_key)) log = @@ -675,6 +672,44 @@ defmodule RealtimeWeb.RealtimeChannelTest do end end + describe "join payload validations" do + test "valid payload allows join", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "config" => %{ + "private" => false, + "broadcast" => %{"ack" => false, "self" => false}, + "presence" => %{"enabled" => true, "key" => "potato"}, + "postgres_changes" => [ + %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, + %{"event" => "DELETE", "schema" => "public", "table" => "users", "filter" => "id=eq.2"}, + %{"event" => "UPDATE", "schema" => "public", "table" => "users", "filter" => "id=eq.3"} + ] + }, + "access_token" => jwt + } + + assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", config) + end + + test "invalid payload returns error", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + log = + capture_log(fn -> + assert {:error, %{reason: reason}} = + subscribe_and_join(socket, "realtime:test", %{"config" => "potato"}) + + assert reason =~ "unable to parse, expected a map" + end) + + assert log =~ "InvalidJoinPayload" + end + end + test "registers transport pid and channel pid per tenant", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))