diff --git a/README.md b/README.md index 657b621f5..41592608c 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,7 @@ This is the list of operational codes that can help you understand your deployme | Code | Description | | ---------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | TopicNameRequired | You are trying to use Realtime without a topic name set | +| InvalidJoinPayload | The payload provided to Realtime on connect is invalid | | RealtimeDisabledForConfiguration | The configuration provided to Realtime on connect will not be able to provide you any Postgres Changes | | TenantNotFound | The tenant you are trying to connect to does not exist | | ErrorConnectingToWebsocket | Error when trying to connect to the WebSocket server | diff --git a/lib/realtime_web/channels/payloads/broadcast.ex b/lib/realtime_web/channels/payloads/broadcast.ex new file mode 100644 index 000000000..7feddb043 --- /dev/null +++ b/lib/realtime_web/channels/payloads/broadcast.ex @@ -0,0 +1,17 @@ +defmodule RealtimeWeb.Channels.Payloads.Broadcast do + @moduledoc """ + Validate broadcast field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :ack, :boolean, default: false + field :self, :boolean, default: false + end + + def changeset(broadcast, attrs) do + cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/payloads/config.ex b/lib/realtime_web/channels/payloads/config.ex new file mode 100644 index 000000000..923020174 --- /dev/null +++ b/lib/realtime_web/channels/payloads/config.ex @@ -0,0 +1,26 @@ +defmodule RealtimeWeb.Channels.Payloads.Config do + @moduledoc """ + Validate config field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Presence + alias RealtimeWeb.Channels.Payloads.PostgresChange + + embedded_schema do + embeds_one :broadcast, Broadcast + embeds_one :presence, Presence + embeds_many :postgres_changes, PostgresChange + field :private, :boolean, default: false + end + + def changeset(config, attrs) do + config + |> cast(attrs, [:private], message: &Join.error_message/2) + |> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map") + |> cast_embed(:presence, invalid_message: "unable to parse, expected a map") + |> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps") + end +end diff --git a/lib/realtime_web/channels/payloads/join.ex b/lib/realtime_web/channels/payloads/join.ex new file mode 100644 index 000000000..6f5e3ef11 --- /dev/null +++ b/lib/realtime_web/channels/payloads/join.ex @@ -0,0 +1,58 @@ +defmodule RealtimeWeb.Channels.Payloads.Join do + @moduledoc """ + Payload validation for the phx_join event. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Config + alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Presence + + embedded_schema do + embeds_one :config, Config + field :access_token, :string + field :user_token, :string + end + + def changeset(join, attrs) do + join + |> cast(attrs, [:access_token, :user_token], message: &error_message/2) + |> cast_embed(:config, invalid_message: "unable to parse, expected a map") + end + + @spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()} + def validate(params) do + case changeset(%__MODULE__{}, params) do + %Ecto.Changeset{valid?: true} = changeset -> + {:ok, Ecto.Changeset.apply_changes(changeset)} + + %Ecto.Changeset{valid?: false} = changeset -> + errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0)) + {:error, :invalid_join_payload, errors} + end + end + + def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled + def presence_enabled?(_), do: true + + def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: ""}}}), do: UUID.uuid1() + def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key + def presence_key(_), do: UUID.uuid1() + + def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack + def ack_broadcast?(_), do: false + + def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self + def self_broadcast?(_), do: false + + def private?(%__MODULE__{config: %Config{private: private}}), do: private + def private?(_), do: false + + def error_message(_field, meta) do + type = Keyword.get(meta, :type) + + if type, + do: "unable to parse, expected #{type}", + else: "unable to parse" + end +end diff --git a/lib/realtime_web/channels/payloads/postgres_change.ex b/lib/realtime_web/channels/payloads/postgres_change.ex new file mode 100644 index 000000000..d1fa16b57 --- /dev/null +++ b/lib/realtime_web/channels/payloads/postgres_change.ex @@ -0,0 +1,19 @@ +defmodule RealtimeWeb.Channels.Payloads.PostgresChange do + @moduledoc """ + Validate postgres_changes field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :event, :string + field :schema, :string + field :table, :string + field :filter, :string + end + + def changeset(postgres_change, attrs) do + cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/payloads/presence.ex b/lib/realtime_web/channels/payloads/presence.ex new file mode 100644 index 000000000..53e09047d --- /dev/null +++ b/lib/realtime_web/channels/payloads/presence.ex @@ -0,0 +1,17 @@ +defmodule RealtimeWeb.Channels.Payloads.Presence do + @moduledoc """ + Validate presence field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :enabled, :boolean, default: true + field :key, :string, default: UUID.uuid1() + end + + def changeset(presence, attrs) do + cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 35c080509..256a4c96b 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -5,8 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel do use RealtimeWeb, :channel use RealtimeWeb.RealtimeChannel.Logging - alias RealtimeWeb.SocketDisconnect alias DBConnection.Backoff + alias Phoenix.Socket alias Realtime.Crypto alias Realtime.GenCounter @@ -21,11 +21,15 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.Tenants.Authorization.Policies.PresencePolicies alias Realtime.Tenants.Connect + alias RealtimeWeb.Channels.Payloads.Join + alias RealtimeWeb.Channels.Payloads.Config + alias RealtimeWeb.Channels.Payloads.PostgresChange alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.RealtimeChannel.BroadcastHandler alias RealtimeWeb.RealtimeChannel.MessageDispatcher alias RealtimeWeb.RealtimeChannel.PresenceHandler alias RealtimeWeb.RealtimeChannel.Tracker + alias RealtimeWeb.SocketDisconnect @confirm_token_ms_interval :timer.minutes(5) @@ -46,15 +50,11 @@ defmodule RealtimeWeb.RealtimeChannel do Logger.metadata(external_id: tenant_id, project: tenant_id) Logger.put_process_level(self(), log_level) - socket = - socket - |> assign_access_token(params) - |> assign_counter() - |> assign_presence_counter() - |> assign(:private?, !!params["config"]["private"]) - |> assign(:policies, nil) + # We always need to assign the access token so we can get the logs metadata working as expected + socket = assign_access_token(socket, params) - with :ok <- SignalHandler.shutdown_in_progress?(), + with {:ok, %Socket{} = socket, %Join{} = configuration} <- configure_socket(socket, params), + :ok <- SignalHandler.shutdown_in_progress?(), :ok <- only_private?(tenant_id, socket), :ok <- limit_joins(socket), :ok <- limit_channels(socket), @@ -64,7 +64,6 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id), {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?) - # fastlane subscription metadata = MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id) @@ -73,15 +72,11 @@ defmodule RealtimeWeb.RealtimeChannel do Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id) - is_new_api = new_api?(params) - # TODO: Default will be moved to false in the future - presence_enabled? = - case get_in(params, ["config", "presence", "enabled"]) do - enabled when is_boolean(enabled) -> enabled - _ -> true - end + is_new_api = new_api?(configuration) - pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic) + presence_enabled? = Join.presence_enabled?(configuration) + + pg_change_params = pg_change_params(is_new_api, configuration, channel_pid, claims, sub_topic) opts = %{ is_new_api: is_new_api, @@ -98,13 +93,13 @@ defmodule RealtimeWeb.RealtimeChannel do state = %{postgres_changes: add_id_to_postgres_changes(pg_change_params)} assigns = %{ - ack_broadcast: !!params["config"]["broadcast"]["ack"], + ack_broadcast: Join.ack_broadcast?(configuration), confirm_token_ref: confirm_token_ref, is_new_api: is_new_api, pg_sub_ref: nil, pg_change_params: pg_change_params, - presence_key: presence_key(params), - self_broadcast: !!params["config"]["broadcast"]["self"], + presence_key: Join.presence_key(configuration), + self_broadcast: Join.self_broadcast?(configuration), tenant_topic: tenant_topic, channel_name: sub_topic, presence_enabled?: presence_enabled? @@ -118,6 +113,9 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, state, assign(socket, assigns)} else + {:error, :invalid_join_payload, errors, socket} -> + log_error(socket, "InvalidJoinPayload", errors) + {:error, :expired_token, msg} -> maybe_log_warning(socket, "InvalidJWTToken", msg) @@ -194,6 +192,23 @@ defmodule RealtimeWeb.RealtimeChannel do end end + defp configure_socket(socket, params) do + case Join.validate(params) do + {:ok, configuration} -> + socket = + socket + |> assign_counter() + |> assign_presence_counter() + |> assign(:private?, Join.private?(configuration)) + |> assign(:policies, nil) + + {:ok, socket, configuration} + + {:error, :invalid_join_payload, errors} -> + {:error, :invalid_join_payload, errors, socket} + end + end + @impl true def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do count(socket) @@ -531,40 +546,24 @@ defmodule RealtimeWeb.RealtimeChannel do defp count(%{assigns: %{rate_counter: counter}}), do: GenCounter.add(counter.id) - defp presence_key(params) do - case params["config"]["presence"]["key"] do - key when is_binary(key) and key != "" -> key - _ -> UUID.uuid1() - end - end - - defp assign_access_token(%{assigns: %{headers: headers}} = socket, params) do - access_token = Map.get(params, "access_token") || Map.get(params, "user_token") + defp assign_access_token(socket, params) do + %{assigns: %{tenant_token: tenant_token, headers: headers}} = socket {_, header} = Enum.find(headers, {nil, nil}, fn {k, _} -> k == "x-api-key" end) - case access_token do - nil -> assign(socket, :access_token, header) - "sb_" <> _ -> assign(socket, :access_token, header) - _ -> handle_access_token(socket, params) - end - end + access_token = Map.get(params, "access_token") + user_token = Map.get(params, "user_token") - defp assign_access_token(socket, params), do: handle_access_token(socket, params) - - defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"user_token" => user_token}) - when is_binary(user_token) do - assign(socket, :access_token, user_token) - end + access_token = + cond do + is_binary(access_token) and !String.starts_with?(access_token, "sb_") -> access_token + is_binary(user_token) and !String.starts_with?(user_token, "sb_") -> user_token + is_binary(tenant_token) and !String.starts_with?(tenant_token, "sb_") -> tenant_token + true -> header + end - defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"access_token" => access_token}) - when is_binary(access_token) do assign(socket, :access_token, access_token) end - defp handle_access_token(%{assigns: %{tenant_token: tenant_token}} = socket, _params) when is_binary(tenant_token) do - assign(socket, :access_token, tenant_token) - end - defp confirm_token(%{assigns: assigns}) do %{jwt_secret: jwt_secret, access_token: access_token} = assigns @@ -631,28 +630,30 @@ defmodule RealtimeWeb.RealtimeChannel do }) end - defp new_api?(%{"config" => _}), do: true + defp new_api?(%Join{config: config}) when not is_nil(config), do: true defp new_api?(_), do: false - defp pg_change_params(true, params, channel_pid, claims, _) do - case get_in(params, ["config", "postgres_changes"]) do - [_ | _] = params_list -> - params_list - |> Enum.reject(&is_nil/1) - |> Enum.map(fn params -> - %{ - id: UUID.uuid1(), - channel_pid: channel_pid, - claims: claims, - params: params - } - end) - - _ -> - [] - end + defp pg_change_params(true, %Join{config: %Config{postgres_changes: postgres_changes}}, channel_pid, claims, _) + when not is_nil(postgres_changes) do + postgres_changes + |> Enum.reject(&is_nil/1) + |> Enum.map(fn %PostgresChange{table: table, event: event, schema: schema, filter: filter} -> + params = + %{"table" => table, "filter" => filter, "schema" => schema, "event" => event} + |> Enum.reject(fn {_, v} -> is_nil(v) end) + |> Map.new() + + %{ + id: UUID.uuid1(), + channel_pid: channel_pid, + claims: claims, + params: params + } + end) end + defp pg_change_params(true, _, _, _, _), do: [] + defp pg_change_params(false, _, channel_pid, claims, sub_topic) do params = case String.split(sub_topic, ":", parts: 3) do diff --git a/mix.exs b/mix.exs index 3177a742a..ce1ee4e38 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.43.1", + version: "2.43.3", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/phx_join.schema.json b/phx_join.schema.json new file mode 100644 index 000000000..02719026c --- /dev/null +++ b/phx_join.schema.json @@ -0,0 +1,139 @@ +{ + "type": "object", + "description": "", + "title": "phx_join", + "required": [ + "access_token", + "config" + ], + "properties": { + "config": { + "title": "config", + "$ref": "#/$defs/phx_join_config" + }, + "access_token": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "access_token" + } + }, + "additionalProperties": false, + "$defs": { + "phx_join_config": { + "type": "object", + "description": "", + "title": "phx_join_config", + "properties": { + "private": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "private", + "default": false + }, + "broadcast": { + "title": "broadcast", + "$ref": "#/$defs/phx_join_broadcast", + "default": { + "self": false, + "ack": false + } + }, + "presence": { + "title": "presence", + "$ref": "#/$defs/phx_join_presence", + "default": { + "enabled": false, + "key": "" + } + }, + "postgres_changes": { + "type": "array", + "title": "phx_join_postgres_changes", + "items": { + "$ref": "#/$defs/phx_join_postgres_changes", + "default": { + "table": "", + "filter": "", + "schema": "", + "event": "" + } + } + } + }, + "additionalProperties": false + }, + "phx_join_broadcast": { + "type": "object", + "description": "", + "title": "phx_join_broadcast", + "properties": { + "self": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "self", + "default": false + }, + "ack": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "ack", + "default": false + } + }, + "additionalProperties": false + }, + "phx_join_postgres_changes": { + "type": "object", + "description": "", + "title": "phx_join_postgres_changes", + "required": [ + "event", + "filter", + "schema", + "table" + ], + "properties": { + "table": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "table" + }, + "filter": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "filter" + }, + "schema": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "schema" + }, + "event": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "event" + } + }, + "additionalProperties": false + }, + "phx_join_presence": { + "type": "object", + "description": "", + "title": "phx_join_presence", + "properties": { + "enabled": { + "type": "boolean", + "description": "Boolean, e.g. true", + "title": "enabled", + "default": false + }, + "key": { + "type": "string", + "description": "String, e.g. 'hello'", + "title": "key" + } + }, + "additionalProperties": false + } + } +} \ No newline at end of file diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 3eec0e18a..14f4da622 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -420,28 +420,18 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "handle nil postgres changes params as empty param changes", %{tenant: tenant} do + test "nil postgres changes params identified as error", %{tenant: tenant} do {socket, _} = get_connection(tenant) topic = "realtime:any" config = %{postgres_changes: [nil]} - WebsocketClient.join(socket, topic, %{config: config}) - - assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 200 - assert_receive %Phoenix.Socket.Message{event: "presence_state", payload: %{}, topic: ^topic}, 500 + log = + capture_log(fn -> + WebsocketClient.join(socket, topic, %{config: config}) + Process.sleep(500) + end) - refute_receive %Message{ - event: "system", - payload: %{ - "channel" => "any", - "extension" => "postgres_changes", - "message" => "Subscribed to PostgreSQL", - "status" => "ok" - }, - ref: nil, - topic: ^topic - }, - 1000 + assert log =~ "InvalidJoinPayload" end end diff --git a/test/realtime_web/channels/payloads/join_test.exs b/test/realtime_web/channels/payloads/join_test.exs new file mode 100644 index 000000000..4fdcd9f42 --- /dev/null +++ b/test/realtime_web/channels/payloads/join_test.exs @@ -0,0 +1,96 @@ +defmodule RealtimeWeb.Channels.Payloads.JoinTest do + use ExUnit.Case + + import Generators + + alias RealtimeWeb.Channels.Payloads.Join + alias RealtimeWeb.Channels.Payloads.Config + alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Presence + alias RealtimeWeb.Channels.Payloads.PostgresChange + + describe "validate/1" do + test "valid payload allows join" do + key = random_string() + access_token = random_string() + + config = %{ + "config" => %{ + "private" => false, + "broadcast" => %{"ack" => false, "self" => false}, + "presence" => %{"enabled" => true, "key" => key}, + "postgres_changes" => [ + %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, + %{"event" => "DELETE", "schema" => "public", "table" => "users", "filter" => "id=eq.2"}, + %{"event" => "UPDATE", "schema" => "public", "table" => "users", "filter" => "id=eq.3"} + ] + }, + "access_token" => access_token + } + + assert {:ok, %Join{config: config, access_token: ^access_token}} = Join.validate(config) + + assert %Config{ + private: false, + broadcast: broadcast, + presence: presence, + postgres_changes: postgres_changes + } = config + + assert %Broadcast{ack: false, self: false} = broadcast + assert %Presence{enabled: true, key: ^key} = presence + + assert [ + %PostgresChange{event: "INSERT", schema: "public", table: "users", filter: "id=eq.1"}, + %PostgresChange{event: "DELETE", schema: "public", table: "users", filter: "id=eq.2"}, + %PostgresChange{event: "UPDATE", schema: "public", table: "users", filter: "id=eq.3"} + ] = postgres_changes + end + + test "presence key as default" do + config = %{"config" => %{"presence" => %{"enabled" => true}}} + + assert {:ok, %Join{config: %Config{presence: %Presence{key: key}}}} = Join.validate(config) + + assert key != "" + assert is_binary(key) + end + + test "missing enabled presence defaults to true" do + config = %{"config" => %{"presence" => %{}}} + + assert {:ok, %Join{config: %Config{presence: %Presence{enabled: true}}}} = Join.validate(config) + end + + test "invalid payload returns errors" do + config = %{"config" => ["test"]} + + assert {:error, :invalid_join_payload, %{config: error}} = Join.validate(config) + assert error == ["unable to parse, expected a map"] + end + + test "invalid nested configurations returns errors" do + config = %{ + "config" => %{ + "broadcast" => %{"ack" => "test"}, + "presence" => %{"enabled" => "test"}, + "postgres_changes" => %{"event" => "test"} + }, + "access_token" => true, + "user_token" => true + } + + assert {:error, :invalid_join_payload, errors} = Join.validate(config) + + assert errors == %{ + config: %{ + broadcast: %{ack: ["unable to parse, expected boolean"]}, + presence: %{enabled: ["unable to parse, expected boolean"]}, + postgres_changes: ["unable to parse, expected an array of maps"] + }, + access_token: ["unable to parse, expected string"], + user_token: ["unable to parse, expected string"] + } + end + end +end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 558ad5eb6..b7bf04049 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -503,12 +503,9 @@ defmodule RealtimeWeb.RealtimeChannelTest do test "expired jwt returns a error with sub data if available log_level=warning", %{tenant: tenant} do sub = random_string() - api_key = Generators.generate_jwt_token(tenant) - - jwt = - Generators.generate_jwt_token(tenant, %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub}) - + claims = %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub} + jwt = Generators.generate_jwt_token(tenant, claims) assert {:ok, socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, api_key)) log = @@ -675,6 +672,44 @@ defmodule RealtimeWeb.RealtimeChannelTest do end end + describe "join payload validations" do + test "valid payload allows join", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "config" => %{ + "private" => false, + "broadcast" => %{"ack" => false, "self" => false}, + "presence" => %{"enabled" => true, "key" => "potato"}, + "postgres_changes" => [ + %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, + %{"event" => "DELETE", "schema" => "public", "table" => "users", "filter" => "id=eq.2"}, + %{"event" => "UPDATE", "schema" => "public", "table" => "users", "filter" => "id=eq.3"} + ] + }, + "access_token" => jwt + } + + assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", config) + end + + test "invalid payload returns error", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + log = + capture_log(fn -> + assert {:error, %{reason: reason}} = + subscribe_and_join(socket, "realtime:test", %{"config" => "potato"}) + + assert reason =~ "unable to parse, expected a map" + end) + + assert log =~ "InvalidJoinPayload" + end + end + test "registers transport pid and channel pid per tenant", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))