diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml new file mode 100644 index 000000000..3981300df --- /dev/null +++ b/.github/workflows/integration_tests.yml @@ -0,0 +1,33 @@ +name: Integration Tests +on: + pull_request: + paths: + - "lib/**" + - "test/**" + - "config/**" + - "priv/**" + - "assets/**" + - "rel/**" + - "mix.exs" + - "Dockerfile" + - "run.sh" + - "docker-compose.test.yml" + + push: + branches: + - main + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + tests: + name: Tests + runs-on: blacksmith-8vcpu-ubuntu-2404 + + steps: + - uses: actions/checkout@v2 + - name: Run integration test + run: docker compose -f docker-compose.tests.yml up --abort-on-container-exit --exit-code-from test-runner + diff --git a/docker-compose.tests.yml b/docker-compose.tests.yml new file mode 100644 index 000000000..56f5466e8 --- /dev/null +++ b/docker-compose.tests.yml @@ -0,0 +1,83 @@ +services: + # Supabase Realtime service + test_db: + image: supabase/postgres:14.1.0.105 + container_name: test-realtime-db + ports: + - "5532:5432" + volumes: + - ./dev/postgres:/docker-entrypoint-initdb.d/ + command: postgres -c config_file=/etc/postgresql/postgresql.conf + environment: + POSTGRES_HOST: /var/run/postgresql + POSTGRES_PASSWORD: postgres + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 5 + test_realtime: + depends_on: + - test_db + build: . + container_name: test-realtime-server + ports: + - "4100:4100" + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + PORT: 4100 + DB_HOST: host.docker.internal + DB_PORT: 5532 + DB_USER: postgres + DB_PASSWORD: postgres + DB_NAME: postgres + DB_ENC_KEY: 1234567890123456 + DB_AFTER_CONNECT_QUERY: 'SET search_path TO _realtime' + API_JWT_SECRET: super-secret-jwt-token-with-at-least-32-characters-long + SECRET_KEY_BASE: UpNVntn3cDxHJpq99YMc1T1AQgQpc8kfYTuRgBiYa15BLrx8etQoXz3gZv1/u2oq + ERL_AFLAGS: -proto_dist inet_tcp + RLIMIT_NOFILE: 1000000 + DNS_NODES: "''" + APP_NAME: realtime + RUN_JANITOR: true + JANITOR_INTERVAL: 60000 + LOG_LEVEL: "info" + SEED_SELF_HOST: true + networks: + test-network: + aliases: + - realtime-dev.local + - realtime-dev.localhost + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4100/"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 5s + + # Deno test runner + test-runner: + image: denoland/deno:alpine-2.5.6 + container_name: deno-test-runner + depends_on: + test_realtime: + condition: service_healthy + test_db: + condition: service_healthy + volumes: + - ./test/integration/tests.ts:/app/tests.ts:ro + working_dir: /app + command: > + sh -c " + echo 'Running tests...' && + deno test tests.ts --allow-import --no-check --allow-read --allow-net --trace-leaks --allow-env=WS_NO_BUFFER_UTIL + " + networks: + - test-network + extra_hosts: + - "realtime-dev.localhost:host-gateway" + +networks: + test-network: + driver: bridge diff --git a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex index 0bbc0c157..4251f5787 100644 --- a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex @@ -15,11 +15,13 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do alias Realtime.Tenants.Authorization.Policies alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies + @type payload :: map | {String.t(), :json | :binary, binary} + @event_type "broadcast" - @spec handle(map(), Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()} + @spec handle(payload, Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()} def handle(payload, %{assigns: %{private?: false}} = socket), do: handle(payload, nil, socket) - @spec handle(map(), pid() | nil, Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()} + @spec handle(payload, pid() | nil, Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()} def handle(payload, db_conn, %{assigns: %{private?: true}} = socket) do %{ assigns: %{ @@ -101,7 +103,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do end defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do - broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload} + broadcast = build_broadcast(tenant_topic, payload) if self_broadcast do TenantBroadcaster.pubsub_broadcast( @@ -123,6 +125,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do end end + # No idea why Dialyzer is complaining here + @dialyzer {:nowarn_function, build_broadcast: 2} + + # Message payload was built by V2 Serializer which was originally UserBroadcastPush + defp build_broadcast(topic, {user_event, user_payload_encoding, user_payload}) do + %RealtimeWeb.Socket.UserBroadcast{ + topic: topic, + user_event: user_event, + user_payload_encoding: user_payload_encoding, + user_payload: user_payload + } + end + + defp build_broadcast(topic, payload) do + %Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload} + end + defp increment_rate_counter(%{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{write: false}}}} = socket) do socket end diff --git a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex index 6604eb2bd..29489ea66 100644 --- a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex +++ b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex @@ -4,92 +4,138 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do """ require Logger + alias Phoenix.Socket.Broadcast + alias RealtimeWeb.Socket.UserBroadcast def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do {:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids} end + @presence_diff "presence_diff" + @doc """ This dispatch function caches encoded messages if fastlane is used It also sends an :update_rate_counter to the subscriber and it can conditionally log - """ - @spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok - def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do - # fastlane_pid is the actual socket transport pid - # This reduce caches the serialization and bypasses the channel process going straight to the - # transport process - - message_id = message_id(msg.payload) + fastlane_pid is the actual socket transport pid + """ + @spec dispatch(list, pid, Broadcast.t() | UserBroadcast.t()) :: :ok + def dispatch(subscribers, from, %Broadcast{event: @presence_diff} = msg) do {_cache, count} = Enum.reduce(subscribers, {%{}, 0}, fn {pid, _}, {cache, count} when pid == from -> {cache, count} - {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}}, + {_pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, _replayed_message_ids}}, {cache, count} -> + maybe_log(log_level, join_topic, msg, tenant_id) + + cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache, tenant_id, log_level) + {cache, count + 1} + + {pid, _}, {cache, count} -> + send(pid, msg) + {cache, count} + end) + + tenant_id = tenant_id(subscribers) + increment_presence_counter(tenant_id, msg.event, count) + + :ok + end + + def dispatch(subscribers, from, msg) do + message_id = message_id(msg) + + _ = + Enum.reduce(subscribers, %{}, fn + {pid, _}, cache when pid == from -> + cache + + {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}}, + cache -> if already_replayed?(message_id, replayed_message_ids) do # skip already replayed message - {cache, count} + cache else - if event != "presence_diff", do: send(pid, :update_rate_counter) + send(pid, :update_rate_counter) maybe_log(log_level, join_topic, msg, tenant_id) - cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) - {cache, count + 1} + do_dispatch(msg, fastlane_pid, serializer, join_topic, cache, tenant_id, log_level) end - {pid, _}, {cache, count} -> + {pid, _}, cache -> send(pid, msg) - {cache, count} + cache end) - tenant_id = tenant_id(subscribers) - increment_presence_counter(tenant_id, event, count) - :ok end - defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do - tenant_id - |> Realtime.Tenants.presence_events_per_second_key() - |> Realtime.GenCounter.add(count) + defp maybe_log(:info, join_topic, msg, tenant_id) when is_struct(msg) do + log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" + Logger.info(log, external_id: tenant_id, project: tenant_id) end - defp increment_presence_counter(_tenant_id, _event, _count), do: :ok - - defp maybe_log(:info, join_topic, msg, tenant_id) do - log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" + defp maybe_log(:info, join_topic, msg, tenant_id) when is_binary(msg) do + log = "Received message on #{join_topic}. #{msg}" Logger.info(log, external_id: tenant_id, project: tenant_id) end defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok - defp message_id(%{"meta" => %{"id" => id}}), do: id - defp message_id(_), do: nil - - defp already_replayed?(nil, _replayed_message_ids), do: false - defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id) - - defp do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) do + defp do_dispatch(msg, fastlane_pid, serializer, join_topic, cache, tenant_id, log_level) do case cache do - %{^serializer => encoded_msg} -> + %{^serializer => {:ok, encoded_msg}} -> send(fastlane_pid, encoded_msg) cache + %{^serializer => {:error, _reason}} -> + # We do nothing at this stage. It has been already logged depending on the log level + cache + %{} -> # Use the original topic that was joined without the external_id msg = %{msg | topic: join_topic} - encoded_msg = serializer.fastlane!(msg) - send(fastlane_pid, encoded_msg) - Map.put(cache, serializer, encoded_msg) + + result = + case fastlane!(serializer, msg) do + {:ok, encoded_msg} -> + send(fastlane_pid, encoded_msg) + {:ok, encoded_msg} + + {:error, reason} -> + maybe_log(log_level, join_topic, reason, tenant_id) + end + + Map.put(cache, serializer, result) end end - defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do - tenant_id + # We have to convert because V1 does not know how to process UserBroadcast + defp fastlane!(Phoenix.Socket.V1.JSONSerializer = serializer, %UserBroadcast{} = msg) do + with {:ok, msg} <- UserBroadcast.convert_to_json_broadcast(msg) do + {:ok, serializer.fastlane!(msg)} + end end + defp fastlane!(serializer, msg), do: {:ok, serializer.fastlane!(msg)} + + defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]), do: tenant_id defp tenant_id(_), do: nil + + defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do + tenant_id + |> Realtime.Tenants.presence_events_per_second_key() + |> Realtime.GenCounter.add(count) + end + + defp increment_presence_counter(_tenant_id, _event, _count), do: :ok + + defp message_id(%Broadcast{payload: %{"meta" => %{"id" => id}}}), do: id + defp message_id(_), do: nil + + defp already_replayed?(nil, _replayed_message_ids), do: false + defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id) end diff --git a/lib/realtime_web/endpoint.ex b/lib/realtime_web/endpoint.ex index dd91e7664..ac8e58054 100644 --- a/lib/realtime_web/endpoint.ex +++ b/lib/realtime_web/endpoint.ex @@ -25,11 +25,11 @@ defmodule RealtimeWeb.Endpoint do # the expense of potentially higher memory being used. active_n: 100, # Skip validating UTF8 for faster frame processing. - # Currently all text frames as handled only with JSON which already requires UTF-8 + # Currently all text frames are handled only with JSON which already requires UTF-8 validate_utf8: false, serializer: [ {Phoenix.Socket.V1.JSONSerializer, "~> 1.0.0"}, - {Phoenix.Socket.V2.JSONSerializer, "~> 2.0.0"} + {RealtimeWeb.Socket.V2Serializer, "~> 2.0.0"} ] ], longpoll: [ diff --git a/lib/realtime_web/plugs/auth_tenant.ex b/lib/realtime_web/plugs/auth_tenant.ex index 11bf2e0bc..23c0581a8 100644 --- a/lib/realtime_web/plugs/auth_tenant.ex +++ b/lib/realtime_web/plugs/auth_tenant.ex @@ -42,6 +42,9 @@ defmodule RealtimeWeb.AuthTenant do [] -> nil + [""] -> + nil + [value | _] -> [bearer, token] = value |> String.split(" ") bearer = String.downcase(bearer) diff --git a/lib/realtime_web/socket/user_broadcast.ex b/lib/realtime_web/socket/user_broadcast.ex new file mode 100644 index 000000000..7caba33ce --- /dev/null +++ b/lib/realtime_web/socket/user_broadcast.ex @@ -0,0 +1,39 @@ +defmodule RealtimeWeb.Socket.UserBroadcast do + @moduledoc """ + Defines a message sent from pubsub to channels and vice-versa. + + The message format requires the following keys: + + * `:topic` - The string topic or topic:subtopic pair namespace, for example "messages", "messages:123" + * `:user_event`- The string user event name, for example "my-event" + * `:user_payload_encoding`- :json or :binary + * `:user_payload` - The actual message payload + + Optionally metadata which is a map to be JSON encoded + """ + + alias Phoenix.Socket.Broadcast + + @type t :: %__MODULE__{} + defstruct topic: nil, user_event: nil, user_payload: nil, user_payload_encoding: nil, metadata: nil + + @spec convert_to_json_broadcast(t) :: {:ok, Broadcast.t()} | {:error, String.t()} + def convert_to_json_broadcast(%__MODULE__{user_payload_encoding: :json} = user_broadcast) do + payload = %{ + "event" => user_broadcast.user_event, + "payload" => Jason.Fragment.new(user_broadcast.user_payload), + "type" => "broadcast" + } + + payload = + if user_broadcast.metadata do + Map.put(payload, "meta", user_broadcast.metadata) + else + payload + end + + {:ok, %Broadcast{event: "broadcast", payload: payload, topic: user_broadcast.topic}} + end + + def convert_to_json_broadcast(%__MODULE__{}), do: {:error, "User payload encoding is not JSON"} +end diff --git a/lib/realtime_web/socket/v2_serializer.ex b/lib/realtime_web/socket/v2_serializer.ex new file mode 100644 index 000000000..5fc02aa5b --- /dev/null +++ b/lib/realtime_web/socket/v2_serializer.ex @@ -0,0 +1,223 @@ +defmodule RealtimeWeb.Socket.V2Serializer do + @moduledoc """ + Custom serializer that is a superset of Phoenix's V2 JSONSerializer + that handles user broadcast and user broadcast push + """ + + @behaviour Phoenix.Socket.Serializer + + @push 0 + @reply 1 + @broadcast 2 + @user_broadcast_push 3 + @user_broadcast 4 + + alias Phoenix.Socket.{Message, Reply, Broadcast} + alias RealtimeWeb.Socket.UserBroadcast + + @impl true + def fastlane!(%UserBroadcast{} = msg) do + metadata = + if msg.metadata do + Phoenix.json_library().encode!(msg.metadata) + else + msg.metadata + end + + topic_size = byte_size!(msg.topic, :topic, 255) + user_event_size = byte_size!(msg.user_event, :user_event, 255) + metadata_size = byte_size!(metadata, :metadata, 255) + user_payload_encoding = if msg.user_payload_encoding == :json, do: 1, else: 0 + + bin = << + @user_broadcast::size(8), + topic_size::size(8), + user_event_size::size(8), + metadata_size::size(8), + user_payload_encoding::size(8), + msg.topic::binary-size(topic_size), + msg.user_event::binary-size(user_event_size), + metadata || <<>>::binary-size(metadata_size), + msg.user_payload::binary + >> + + {:socket_push, :binary, bin} + end + + def fastlane!(%Broadcast{payload: {:binary, data}} = msg) do + topic_size = byte_size!(msg.topic, :topic, 255) + event_size = byte_size!(msg.event, :event, 255) + + bin = << + @broadcast::size(8), + topic_size::size(8), + event_size::size(8), + msg.topic::binary-size(topic_size), + msg.event::binary-size(event_size), + data::binary + >> + + {:socket_push, :binary, bin} + end + + def fastlane!(%Broadcast{payload: %{}} = msg) do + data = Phoenix.json_library().encode_to_iodata!([nil, nil, msg.topic, msg.event, msg.payload]) + {:socket_push, :text, data} + end + + def fastlane!(%Broadcast{payload: invalid}) do + raise ArgumentError, "expected broadcasted payload to be a map, got: #{inspect(invalid)}" + end + + @impl true + def encode!(%Reply{payload: {:binary, data}} = reply) do + status = to_string(reply.status) + join_ref = to_string(reply.join_ref) + ref = to_string(reply.ref) + join_ref_size = byte_size!(join_ref, :join_ref, 255) + ref_size = byte_size!(ref, :ref, 255) + topic_size = byte_size!(reply.topic, :topic, 255) + status_size = byte_size!(status, :status, 255) + + bin = << + @reply::size(8), + join_ref_size::size(8), + ref_size::size(8), + topic_size::size(8), + status_size::size(8), + join_ref::binary-size(join_ref_size), + ref::binary-size(ref_size), + reply.topic::binary-size(topic_size), + status::binary-size(status_size), + data::binary + >> + + {:socket_push, :binary, bin} + end + + def encode!(%Reply{} = reply) do + data = [ + reply.join_ref, + reply.ref, + reply.topic, + "phx_reply", + %{status: reply.status, response: reply.payload} + ] + + {:socket_push, :text, Phoenix.json_library().encode_to_iodata!(data)} + end + + def encode!(%Message{payload: {:binary, data}} = msg) do + join_ref = to_string(msg.join_ref) + join_ref_size = byte_size!(join_ref, :join_ref, 255) + topic_size = byte_size!(msg.topic, :topic, 255) + event_size = byte_size!(msg.event, :event, 255) + + bin = << + @push::size(8), + join_ref_size::size(8), + topic_size::size(8), + event_size::size(8), + join_ref::binary-size(join_ref_size), + msg.topic::binary-size(topic_size), + msg.event::binary-size(event_size), + data::binary + >> + + {:socket_push, :binary, bin} + end + + def encode!(%Message{payload: %{}} = msg) do + data = [msg.join_ref, msg.ref, msg.topic, msg.event, msg.payload] + {:socket_push, :text, Phoenix.json_library().encode_to_iodata!(data)} + end + + def encode!(%Message{payload: invalid}) do + raise ArgumentError, "expected payload to be a map, got: #{inspect(invalid)}" + end + + @impl true + def decode!(raw_message, opts) do + case Keyword.fetch(opts, :opcode) do + {:ok, :text} -> decode_text(raw_message) + {:ok, :binary} -> decode_binary(raw_message) + end + end + + defp decode_text(raw_message) do + [join_ref, ref, topic, event, payload | _] = Phoenix.json_library().decode!(raw_message) + + %Message{ + topic: topic, + event: event, + payload: payload, + ref: ref, + join_ref: join_ref + } + end + + defp decode_binary(<< + @push::size(8), + join_ref_size::size(8), + ref_size::size(8), + topic_size::size(8), + event_size::size(8), + join_ref::binary-size(join_ref_size), + ref::binary-size(ref_size), + topic::binary-size(topic_size), + event::binary-size(event_size), + data::binary + >>) do + %Message{ + topic: topic, + event: event, + payload: {:binary, data}, + ref: ref, + join_ref: join_ref + } + end + + defp decode_binary(<< + @user_broadcast_push::size(8), + join_ref_size::size(8), + ref_size::size(8), + topic_size::size(8), + user_event_size::size(8), + user_payload_encoding::size(8), + join_ref::binary-size(join_ref_size), + ref::binary-size(ref_size), + topic::binary-size(topic_size), + user_event::binary-size(user_event_size), + user_payload::binary + >>) do + user_payload_encoding = if user_payload_encoding == 0, do: :binary, else: :json + + # Encoding as Message because that's how Phoenix Socket and Channel.Server expects things to show up + # Here we abuse the payload field to carry a tuple of (user_event, user payload encoding, user payload) + %Message{ + topic: topic, + event: "broadcast", + payload: {user_event, user_payload_encoding, user_payload}, + ref: ref, + join_ref: join_ref + } + end + + defp byte_size!(nil, _kind, _max), do: 0 + + defp byte_size!(bin, kind, max) do + case byte_size(bin) do + size when size <= max -> + size + + oversized -> + raise ArgumentError, """ + unable to convert #{kind} to binary. + + #{inspect(bin)} + + must be less than or equal to #{max} bytes, but is #{oversized} bytes. + """ + end + end +end diff --git a/mix.exs b/mix.exs index 2351cbb5c..ef593400a 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.62.0", + version: "2.63.0", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/distributed_realtime_channel_test.exs b/test/integration/distributed_realtime_channel_test.exs new file mode 100644 index 000000000..c588ce731 --- /dev/null +++ b/test/integration/distributed_realtime_channel_test.exs @@ -0,0 +1,61 @@ +defmodule Realtime.Integration.DistributedRealtimeChannelTest do + # Use of Clustered + use RealtimeWeb.ConnCase, + async: false, + parameterize: [%{serializer: Phoenix.Socket.V1.JSONSerializer}, %{serializer: RealtimeWeb.Socket.V2Serializer}] + + alias Phoenix.Socket.Message + + alias Realtime.Tenants.Connect + alias Realtime.Integration.WebsocketClient + + setup do + tenant = Realtime.Api.get_tenant_by_external_id("dev_tenant") + + Realtime.RateCounter.stop(tenant.external_id) + + Connect.shutdown(tenant.external_id) + # Sleeping so that syn can forget about this Connect process + Process.sleep(100) + + on_exit(fn -> + Connect.shutdown(tenant.external_id) + # Sleeping so that syn can forget about this Connect process + Process.sleep(100) + end) + + on_exit(fn -> Connect.shutdown(tenant.external_id) end) + {:ok, node} = Clustered.start() + region = Realtime.Tenants.region(tenant) + {:ok, db_conn} = :erpc.call(node, Connect, :connect, ["dev_tenant", region]) + assert Connect.ready?(tenant.external_id) + + assert node(db_conn) == node + %{tenant: tenant, topic: random_string()} + end + + describe "distributed broadcast" do + @tag mode: :distributed + test "it works", %{tenant: tenant, topic: topic, serializer: serializer} do + {:ok, token} = + generate_token(tenant, %{exp: System.system_time(:second) + 1000, role: "authenticated", sub: random_string()}) + + {:ok, remote_socket} = + WebsocketClient.connect(self(), uri(tenant, serializer, 4012), serializer, [{"x-api-key", token}]) + + {:ok, socket} = WebsocketClient.connect(self(), uri(tenant, serializer), serializer, [{"x-api-key", token}]) + + config = %{broadcast: %{self: false}, private: false} + topic = "realtime:#{topic}" + + :ok = WebsocketClient.join(remote_socket, topic, %{config: config}) + :ok = WebsocketClient.join(socket, topic, %{config: config}) + + # Send through one socket and receive through the other (self: false) + payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"} + :ok = WebsocketClient.send_event(remote_socket, topic, "broadcast", payload) + + assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 2000 + end + end +end diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 5c64bff0c..61b69787a 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -1,84 +1,37 @@ defmodule Realtime.Integration.RtChannelTest do - # async: false due to the fact that multiple operations against the same tenant and usage of mocks - # Also using dev_tenant due to distributed test - alias Realtime.Api - use RealtimeWeb.ConnCase, async: false - use Mimic + use RealtimeWeb.ConnCase, + async: true, + parameterize: [%{serializer: Phoenix.Socket.V1.JSONSerializer}, %{serializer: RealtimeWeb.Socket.V2Serializer}] + import ExUnit.CaptureLog import Generators - setup :set_mimic_global - require Logger alias Extensions.PostgresCdcRls - alias Phoenix.Socket.Message - alias Phoenix.Socket.V1 - alias Postgrex - alias Realtime.Api.Tenant alias Realtime.Database alias Realtime.Integration.WebsocketClient alias Realtime.RateCounter alias Realtime.Tenants - alias Realtime.Tenants.Authorization alias Realtime.Tenants.Connect alias Realtime.Tenants.ReplicationConnection - - alias RealtimeWeb.RealtimeChannel.Tracker alias RealtimeWeb.SocketDisconnect @moduletag :capture_log - @port 4003 - @serializer V1.JSONSerializer - - Application.put_env(:phoenix, TestEndpoint, - https: false, - http: [port: @port], - debug_errors: false, - server: true, - pubsub_server: __MODULE__, - secret_key_base: String.duplicate("a", 64) - ) - - setup_all do - capture_log(fn -> start_supervised!(TestEndpoint) end) - start_supervised!({Phoenix.PubSub, name: __MODULE__}) - :ok - end - setup [:mode] - - describe "postgres changes" do - setup %{tenant: tenant} do - {:ok, conn} = Database.connect(tenant, "realtime_test") - - Database.transaction(conn, fn db_conn -> - queries = [ - "drop table if exists public.test", - "drop publication if exists supabase_realtime_test", - "create sequence if not exists test_id_seq;", - """ - create table if not exists "public"."test" ( - "id" int4 not null default nextval('test_id_seq'::regclass), - "details" text, - primary key ("id")); - """, - "grant all on table public.test to anon;", - "grant all on table public.test to postgres;", - "grant all on table public.test to authenticated;", - "create publication supabase_realtime_test for all tables" - ] - - Enum.each(queries, &Postgrex.query!(db_conn, &1, [])) - end) + setup do + tenant = Containers.checkout_tenant(run_migrations: true) - :ok - end + {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert Connect.ready?(tenant.external_id) + %{db_conn: db_conn, tenant: tenant} + end - test "error subscribing", %{tenant: tenant} do + describe "postgres changes" do + test "error subscribing", %{tenant: tenant, serializer: serializer} do {:ok, conn} = Database.connect(tenant, "realtime_test") # Let's drop the publication to cause an error @@ -86,7 +39,7 @@ defmodule Realtime.Integration.RtChannelTest do Postgrex.query!(db_conn, "drop publication if exists supabase_realtime_test") end) - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) topic = "realtime:any" config = %{postgres_changes: [%{event: "INSERT", schema: "public"}]} @@ -113,8 +66,8 @@ defmodule Realtime.Integration.RtChannelTest do assert log =~ "Unable to subscribe to changes with given parameters" end - test "handle insert", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "handle insert", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) topic = "realtime:any" config = %{postgres_changes: [%{event: "INSERT", schema: "public"}]} @@ -176,8 +129,8 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "handle update", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "handle update", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) topic = "realtime:any" config = %{postgres_changes: [%{event: "UPDATE", schema: "public"}]} @@ -243,8 +196,8 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "handle delete", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "handle delete", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) topic = "realtime:any" config = %{postgres_changes: [%{event: "DELETE", schema: "public"}]} @@ -308,8 +261,8 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "handle wildcard", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "handle wildcard", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) topic = "realtime:any" config = %{postgres_changes: [%{event: "*", schema: "public"}]} @@ -421,8 +374,8 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - test "handle nil postgres changes params as empty param changes", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "handle nil postgres changes params as empty param changes", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) topic = "realtime:any" config = %{postgres_changes: [nil]} @@ -449,8 +402,8 @@ defmodule Realtime.Integration.RtChannelTest do describe "handle broadcast extension" do setup [:rls_context] - test "public broadcast", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "public broadcast", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) config = %{broadcast: %{self: true}, private: false} topic = "realtime:any" WebsocketClient.join(socket, topic, %{config: config}) @@ -464,15 +417,21 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 500 end - test "broadcast to another tenant does not get mixed up", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "broadcast to another tenant does not get mixed up", %{tenant: tenant, serializer: serializer} do + other_tenant = Containers.checkout_tenant(run_migrations: true) + + Cachex.put!( + Realtime.Tenants.Cache, + {{:get_tenant_by_external_id, 1}, [other_tenant.external_id]}, + {:cached, other_tenant} + ) + + {socket, _} = get_connection(tenant, serializer) config = %{broadcast: %{self: false}, private: false} topic = "realtime:any" WebsocketClient.join(socket, topic, %{config: config}) - other_tenant = Containers.checkout_tenant(run_migrations: true) - - {other_socket, _} = get_connection(other_tenant) + {other_socket, _} = get_connection(other_tenant, serializer) WebsocketClient.join(other_socket, topic, %{config: config}) # Both sockets joined @@ -489,8 +448,12 @@ defmodule Realtime.Integration.RtChannelTest do end @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] - test "private broadcast with valid channel with permissions sends message", %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + test "private broadcast with valid channel with permissions sends message", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: true} topic = "realtime:#{topic}" WebsocketClient.join(socket, topic, %{config: config}) @@ -504,61 +467,12 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic} end - @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence], - mode: :distributed - test "private broadcast with valid channel with permissions sends message using a remote node (phoenix adapter)", %{ - tenant: tenant, - topic: topic - } do - {:ok, token} = - generate_token(tenant, %{exp: System.system_time(:second) + 1000, role: "authenticated", sub: random_string()}) - - {:ok, remote_socket} = WebsocketClient.connect(self(), uri(tenant, 4012), @serializer, [{"x-api-key", token}]) - {:ok, socket} = WebsocketClient.connect(self(), uri(tenant), @serializer, [{"x-api-key", token}]) - - config = %{broadcast: %{self: false, replay: %{"limit" => 2, "since" => 0}}, private: true} - topic = "realtime:#{topic}" - - WebsocketClient.join(remote_socket, topic, %{config: config}) - WebsocketClient.join(socket, topic, %{config: config}) - - # Send through one socket and receive through the other (self: false) - payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"} - WebsocketClient.send_event(socket, topic, "broadcast", payload) - - assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 500 - end - - @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence], - mode: :distributed - test "private broadcast with valid channel with permissions sends message using a remote node", %{ - tenant: tenant, - topic: topic - } do - {:ok, token} = - generate_token(tenant, %{exp: System.system_time(:second) + 1000, role: "authenticated", sub: random_string()}) - - {:ok, remote_socket} = WebsocketClient.connect(self(), uri(tenant, 4012), @serializer, [{"x-api-key", token}]) - {:ok, socket} = WebsocketClient.connect(self(), uri(tenant), @serializer, [{"x-api-key", token}]) - - config = %{broadcast: %{self: false}, private: true} - topic = "realtime:#{topic}" - - WebsocketClient.join(remote_socket, topic, %{config: config}) - WebsocketClient.join(socket, topic, %{config: config}) - - # Send through one socket and receive through the other (self: false) - payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"} - WebsocketClient.send_event(socket, topic, "broadcast", payload) - assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 500 - end - @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence], topic: "topic" test "private broadcast with valid channel a colon character sends message and won't intercept in public channels", - %{topic: topic, tenant: tenant} do - {anon_socket, _} = get_connection(tenant, "anon") - {socket, _} = get_connection(tenant, "authenticated") + %{topic: topic, tenant: tenant, serializer: serializer} do + {anon_socket, _} = get_connection(tenant, serializer, role: "anon") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") valid_topic = "realtime:#{topic}" malicious_topic = "realtime:private:#{topic}" @@ -580,17 +494,18 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence] test "private broadcast with valid channel no write permissions won't send message but will receive message", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do config = %{broadcast: %{self: true}, private: true} topic = "realtime:#{topic}" - {service_role_socket, _} = get_connection(tenant, "service_role") + {service_role_socket, _} = get_connection(tenant, serializer, role: "service_role") WebsocketClient.join(service_role_socket, topic, %{config: config}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") WebsocketClient.join(socket, topic, %{config: config}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} @@ -606,12 +521,16 @@ defmodule Realtime.Integration.RtChannelTest do end @tag policies: [] - test "private broadcast with valid channel and no read permissions won't join", %{tenant: tenant, topic: topic} do + test "private broadcast with valid channel and no read permissions won't join", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do config = %{private: true} expected = "Unauthorized: You do not have permissions to read from this Channel topic: #{topic}" topic = "realtime:#{topic}" - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") log = capture_log(fn -> @@ -637,14 +556,18 @@ defmodule Realtime.Integration.RtChannelTest do end @tag policies: [:authenticated_read_broadcast_and_presence] - test "handles lack of connection to database error on private channels", %{tenant: tenant, topic: topic} do + test "handles lack of connection to database error on private channels", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do topic = "realtime:#{topic}" - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") WebsocketClient.join(socket, topic, %{config: %{broadcast: %{self: true}, private: true}}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} - {service_role_socket, _} = get_connection(tenant, "service_role") + {service_role_socket, _} = get_connection(tenant, serializer, role: "service_role") WebsocketClient.join(service_role_socket, topic, %{config: %{broadcast: %{self: false}, private: true}}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} @@ -662,14 +585,18 @@ defmodule Realtime.Integration.RtChannelTest do end @tag policies: [] - test "lack of connection to database error does not impact public channels", %{tenant: tenant, topic: topic} do + test "lack of connection to database error does not impact public channels", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do topic = "realtime:#{topic}" - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") WebsocketClient.join(socket, topic, %{config: %{broadcast: %{self: true}, private: false}}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} - {service_role_socket, _} = get_connection(tenant, "service_role") + {service_role_socket, _} = get_connection(tenant, serializer, role: "service_role") WebsocketClient.join(service_role_socket, topic, %{config: %{broadcast: %{self: false}, private: false}}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} @@ -689,8 +616,8 @@ defmodule Realtime.Integration.RtChannelTest do describe "handle presence extension" do setup [:rls_context] - test "public presence", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "public presence", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) config = %{presence: %{key: "", enabled: true}, private: false} topic = "realtime:any" @@ -716,8 +643,8 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "private presence with read and write permissions will be able to track and receive presence changes", - %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{presence: %{key: "", enabled: true}, private: true} topic = "realtime:#{topic}" @@ -741,8 +668,8 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence], mode: :distributed test "private presence with read and write permissions will be able to track and receive presence changes using a remote node", - %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{presence: %{key: "", enabled: true}, private: true} topic = "realtime:#{topic}" @@ -765,9 +692,9 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence] test "private presence with read permissions will be able to receive presence changes but won't be able to track", - %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") - {secondary_socket, _} = get_connection(tenant, "service_role") + %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") + {secondary_socket, _} = get_connection(tenant, serializer, role: "service_role") config = fn key -> %{presence: %{key: key, enabled: true}, private: true} end topic = "realtime:#{topic}" @@ -817,9 +744,13 @@ defmodule Realtime.Integration.RtChannelTest do end @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] - test "handles lack of connection to database error on private channels", %{tenant: tenant, topic: topic} do + test "handles lack of connection to database error on private channels", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do topic = "realtime:#{topic}" - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") WebsocketClient.join(socket, topic, %{config: %{private: true, presence: %{enabled: true}}}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} @@ -835,13 +766,17 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{event: "phx_leave", topic: ^topic}, 16000 end) - assert log =~ "UnableToHandlePresence" + assert log =~ ~r/external_id=#{tenant.external_id}.*UnableToHandlePresence/ end @tag policies: [] - test "lack of connection to database error does not impact public channels", %{tenant: tenant, topic: topic} do + test "lack of connection to database error does not impact public channels", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do topic = "realtime:#{topic}" - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") WebsocketClient.join(socket, topic, %{config: %{private: false, presence: %{enabled: true}}}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{event: "presence_state"} @@ -856,16 +791,17 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{event: "phx_leave", topic: ^topic} end) - refute log =~ "UnableToHandlePresence" + refute log =~ ~r/external_id=#{tenant.external_id}.*UnableToHandlePresence/ end @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "presence enabled if param enabled is set in configuration for private channels", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") topic = "realtime:#{topic}" WebsocketClient.join(socket, topic, %{config: %{private: true, presence: %{enabled: true}}}) @@ -877,9 +813,10 @@ defmodule Realtime.Integration.RtChannelTest do test "presence disabled if param 'enabled' is set to false in configuration for private channels", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") topic = "realtime:#{topic}" WebsocketClient.join(socket, topic, %{config: %{private: true, presence: %{enabled: false}}}) @@ -889,9 +826,10 @@ defmodule Realtime.Integration.RtChannelTest do test "presence enabled if param enabled is set in configuration for public channels", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") topic = "realtime:#{topic}" WebsocketClient.join(socket, topic, %{config: %{private: false, presence: %{enabled: true}}}) @@ -901,9 +839,10 @@ defmodule Realtime.Integration.RtChannelTest do test "presence disabled if param 'enabled' is set to false in configuration for public channels", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") topic = "realtime:#{topic}" WebsocketClient.join(socket, topic, %{config: %{private: false, presence: %{enabled: false}}}) @@ -911,8 +850,11 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{event: "presence_state"}, 500 end - test "presence automatically enabled when user sends track message for public channel", %{tenant: tenant} do - {socket, _} = get_connection(tenant) + test "presence automatically enabled when user sends track message for public channel", %{ + tenant: tenant, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer) config = %{presence: %{key: "", enabled: false}, private: false} topic = "realtime:any" @@ -938,8 +880,8 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "presence automatically enabled when user sends track message for private channel", - %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{presence: %{key: "", enabled: false}, private: true} topic = "realtime:#{topic}" @@ -969,32 +911,40 @@ defmodule Realtime.Integration.RtChannelTest do :authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence ] - test "badly formatted jwt token", %{tenant: tenant} do + test "badly formatted jwt token", %{tenant: tenant, serializer: serializer} do log = capture_log(fn -> - WebsocketClient.connect(self(), uri(tenant), @serializer, [{"x-api-key", "bad_token"}]) + WebsocketClient.connect(self(), uri(tenant, serializer), serializer, [{"x-api-key", "bad_token"}]) end) assert log =~ "MalformedJWT: The token provided is not a valid JWT" end - test "invalid JWT with expired token", %{tenant: tenant} do + test "invalid JWT with expired token", %{tenant: tenant, serializer: serializer} do log = capture_log(fn -> - get_connection(tenant, "authenticated", %{:exp => System.system_time(:second) - 1000}, %{log_level: :info}) + get_connection(tenant, serializer, + role: "authenticated", + claims: %{:exp => System.system_time(:second) - 1000}, + params: %{log_level: :info} + ) end) assert log =~ "InvalidJWTToken: Token has expired" end - test "token required the role key", %{tenant: tenant} do + test "token required the role key", %{tenant: tenant, serializer: serializer} do {:ok, token} = token_no_role(tenant) assert {:error, %{status_code: 403}} = - WebsocketClient.connect(self(), uri(tenant), @serializer, [{"x-api-key", token}]) + WebsocketClient.connect(self(), uri(tenant, serializer), serializer, [{"x-api-key", token}]) end - test "handles connection with valid api-header but ignorable access_token payload", %{tenant: tenant, topic: topic} do + test "handles connection with valid api-header but ignorable access_token payload", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do realtime_topic = "realtime:#{topic}" log = @@ -1006,7 +956,7 @@ defmodule Realtime.Integration.RtChannelTest do sub: random_string() }) - {:ok, socket} = WebsocketClient.connect(self(), uri(tenant), @serializer, [{"x-api-key", token}]) + {:ok, socket} = WebsocketClient.connect(self(), uri(tenant, serializer), serializer, [{"x-api-key", token}]) WebsocketClient.join(socket, realtime_topic, %{ config: %{broadcast: %{self: true}, private: false}, @@ -1022,8 +972,8 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "on new access_token and channel is private policies are reevaluated for read policy", - %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") realtime_topic = "realtime:#{topic}" @@ -1053,9 +1003,10 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "on new access_token and channel is private policies are reevaluated for write policy", %{ topic: topic, - tenant: tenant + tenant: tenant, + serializer: serializer } do - {socket, access_token} = get_connection(tenant, "authenticated") + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") realtime_topic = "realtime:#{topic}" config = %{broadcast: %{self: true}, private: true} WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token}) @@ -1092,8 +1043,12 @@ defmodule Realtime.Integration.RtChannelTest do 1500 end - test "on new access_token and channel is public policies are not reevaluated", %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + test "on new access_token and channel is public policies are not reevaluated", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") {:ok, new_token} = token_valid(tenant, "anon") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1108,8 +1063,12 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{} end - test "on empty string access_token the socket sends an error message", %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + test "on empty string access_token the socket sends an error message", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1134,10 +1093,14 @@ defmodule Realtime.Integration.RtChannelTest do assert msg =~ "The token provided is not a valid JWT" end - test "on expired access_token the socket sends an error message", %{tenant: tenant, topic: topic} do + test "on expired access_token the socket sends an error message", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do sub = random_string() - {socket, access_token} = get_connection(tenant, "authenticated", %{sub: sub}) + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated", claims: %{sub: sub}) config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1149,23 +1112,31 @@ defmodule Realtime.Integration.RtChannelTest do {:ok, token} = generate_token(tenant, %{:exp => System.system_time(:second) - 1000, sub: sub}) log = - capture_log([log_level: :warning], fn -> + capture_log(fn -> WebsocketClient.send_event(socket, realtime_topic, "access_token", %{"access_token" => token}) assert_receive %Message{ topic: ^realtime_topic, event: "system", - payload: %{"extension" => "system", "message" => "Token has expired 1000 seconds ago", "status" => "error"} + payload: %{"extension" => "system", "message" => "Token has expired " <> _, "status" => "error"} } + + assert_receive %Message{event: "phx_close", topic: ^realtime_topic} end) assert log =~ "ChannelShutdown: Token has expired" end - test "ChannelShutdown include sub if available in jwt claims", %{tenant: tenant, topic: topic} do + test "ChannelShutdown include sub if available in jwt claims", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do exp = System.system_time(:second) + 10_000 - {socket, access_token} = get_connection(tenant, "authenticated", %{exp: exp}, %{log_level: :warning}) + {socket, access_token} = + get_connection(tenant, serializer, role: "authenticated", claims: %{exp: exp}, params: %{log_level: :warning}) + config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" sub = random_string() @@ -1183,8 +1154,8 @@ defmodule Realtime.Integration.RtChannelTest do assert log =~ "sub=#{sub}" end - test "missing claims close connection", %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + test "missing claims close connection", %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1211,8 +1182,8 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_close"} end - test "checks token periodically", %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + test "checks token periodically", %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1222,7 +1193,8 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500 assert_receive %Message{event: "presence_state"}, 500 - {:ok, token} = generate_token(tenant, %{:exp => System.system_time(:second) + 2, role: "authenticated"}) + {:ok, token} = + generate_token(tenant, %{:exp => System.system_time(:second) + 2, role: "authenticated"}) # Update token to be a near expiring token WebsocketClient.send_event(socket, realtime_topic, "access_token", %{"access_token" => token}) @@ -1239,8 +1211,8 @@ defmodule Realtime.Integration.RtChannelTest do assert msg =~ "Token has expired" end - test "token expires in between joins", %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + test "token expires in between joins", %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1249,7 +1221,8 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500 assert_receive %Message{event: "presence_state"}, 500 - {:ok, access_token} = generate_token(tenant, %{:exp => System.system_time(:second) + 1, role: "authenticated"}) + {:ok, access_token} = + generate_token(tenant, %{:exp => System.system_time(:second) + 1, role: "authenticated"}) # token expires in between joins so it needs to be handled by the channel and not the socket Process.sleep(1000) @@ -1274,8 +1247,8 @@ defmodule Realtime.Integration.RtChannelTest do assert log =~ "#{tenant.external_id}" end - test "token loses claims in between joins", %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + test "token loses claims in between joins", %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1305,8 +1278,8 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_close"} end - test "token is badly formatted in between joins", %{tenant: tenant, topic: topic} do - {socket, access_token} = get_connection(tenant, "authenticated") + test "token is badly formatted in between joins", %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, access_token} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1333,56 +1306,18 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_close"} end - @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] - test "handles RPC error on token refreshed", %{tenant: tenant, topic: topic} do - Authorization - |> expect(:get_read_authorizations, fn conn, db_conn, context -> - call_original(Authorization, :get_read_authorizations, [conn, db_conn, context]) - end) - |> expect(:get_read_authorizations, fn _, _, _ -> {:error, "RPC Error"} end) - - {socket, access_token} = get_connection(tenant, "authenticated") - config = %{broadcast: %{self: true}, private: true} - realtime_topic = "realtime:#{topic}" - - WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token}) - - assert_receive %Phoenix.Socket.Message{event: "phx_reply"}, 500 - assert_receive %Phoenix.Socket.Message{event: "presence_state"}, 500 - - # Update token to force update - {:ok, access_token} = - generate_token(tenant, %{:exp => System.system_time(:second) + 1000, role: "authenticated"}) - - log = - capture_log([log_level: :warning], fn -> - WebsocketClient.send_event(socket, realtime_topic, "access_token", %{"access_token" => access_token}) - - assert_receive %Phoenix.Socket.Message{ - event: "system", - payload: %{ - "status" => "error", - "extension" => "system", - "message" => "Realtime was unable to connect to the project database" - }, - topic: ^realtime_topic - }, - 500 - - assert_receive %Phoenix.Socket.Message{event: "phx_close", topic: ^realtime_topic} - end) - - assert log =~ "Realtime was unable to connect to the project database" - end - test "on sb prefixed access_token the socket ignores the message and respects JWT expiry time", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do sub = random_string() {socket, access_token} = - get_connection(tenant, "authenticated", %{sub: sub, exp: System.system_time(:second) + 5}) + get_connection(tenant, serializer, + role: "authenticated", + claims: %{sub: sub, exp: System.system_time(:second) + 5} + ) config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1420,9 +1355,10 @@ defmodule Realtime.Integration.RtChannelTest do tenant: tenant, topic: topic, db_conn: db_conn, - table_name: table_name + table_name: table_name, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: true} topic = "realtime:#{topic}" @@ -1460,10 +1396,11 @@ defmodule Realtime.Integration.RtChannelTest do tenant: tenant, topic: topic, db_conn: db_conn, - table_name: table_name + table_name: table_name, + serializer: serializer } do value = random_string() - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: true} topic = "realtime:#{topic}" @@ -1503,9 +1440,10 @@ defmodule Realtime.Integration.RtChannelTest do tenant: tenant, topic: topic, db_conn: db_conn, - table_name: table_name + table_name: table_name, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: true} topic = "realtime:#{topic}" @@ -1543,9 +1481,10 @@ defmodule Realtime.Integration.RtChannelTest do test "broadcast event when function 'send' is called with private topic", %{ tenant: tenant, topic: topic, - db_conn: db_conn + db_conn: db_conn, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: true} full_topic = "realtime:#{topic}" @@ -1580,9 +1519,10 @@ defmodule Realtime.Integration.RtChannelTest do test "broadcast event when function 'send' is called with public topic", %{ tenant: tenant, topic: topic, - db_conn: db_conn + db_conn: db_conn, + serializer: serializer } do - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} full_topic = "realtime:#{topic}" @@ -1619,11 +1559,11 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "user with only private channels enabled will not be able to join public channels", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do change_tenant_configuration(tenant, :private_only, true) - on_exit(fn -> change_tenant_configuration(tenant, :private_only, false) end) - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} topic = "realtime:#{topic}" @@ -1644,14 +1584,14 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "user with only private channels enabled will be able to join private channels", %{ tenant: tenant, - topic: topic + topic: topic, + serializer: serializer } do change_tenant_configuration(tenant, :private_only, true) - on_exit(fn -> change_tenant_configuration(tenant, :private_only, false) end) Process.sleep(100) - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: true} topic = "realtime:#{topic}" WebsocketClient.join(socket, topic, %{config: config}) @@ -1663,21 +1603,19 @@ defmodule Realtime.Integration.RtChannelTest do describe "socket disconnect" do setup [:rls_context] - test "tenant already suspended", %{topic: _topic} do - tenant = Containers.checkout_tenant(run_migrations: true) - + test "tenant already suspended", %{tenant: tenant, serializer: serializer} do log = capture_log(fn -> - {:ok, _} = Realtime.Api.update_tenant(tenant, %{suspend: true}) - {:error, %Mint.WebSocket.UpgradeFailureError{}} = get_connection(tenant, "anon") + change_tenant_configuration(tenant, :suspend, true) + {:error, %Mint.WebSocket.UpgradeFailureError{}} = get_connection(tenant, serializer, role: "anon") refute_receive _any end) assert log =~ "RealtimeDisabledForTenant" end - test "on jwks the socket closes and sends a system message", %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + test "on jwks the socket closes and sends a system message", %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1691,8 +1629,12 @@ defmodule Realtime.Integration.RtChannelTest do assert_process_down(socket) end - test "on jwt_secret the socket closes and sends a system message", %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + test "on jwt_secret the socket closes and sends a system message", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1707,8 +1649,12 @@ defmodule Realtime.Integration.RtChannelTest do assert_process_down(socket) end - test "on private_only the socket closes and sends a system message", %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + test "on private_only the socket closes and sends a system message", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1723,8 +1669,12 @@ defmodule Realtime.Integration.RtChannelTest do assert_process_down(socket) end - test "on other param changes the socket won't close and no message is sent", %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + test "on other param changes the socket won't close and no message is sent", %{ + tenant: tenant, + topic: topic, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{topic}" @@ -1751,17 +1701,24 @@ defmodule Realtime.Integration.RtChannelTest do assert :ok = WebsocketClient.send_heartbeat(socket) end - test "invalid JWT with expired token", %{tenant: tenant} do + test "invalid JWT with expired token", %{tenant: tenant, serializer: serializer} do log = capture_log(fn -> - get_connection(tenant, "authenticated", %{:exp => System.system_time(:second) - 1000}, %{log_level: :info}) + get_connection(tenant, serializer, + role: "authenticated", + claims: %{:exp => System.system_time(:second) - 1000}, + params: %{log_level: :info} + ) end) assert log =~ "InvalidJWTToken: Token has expired" end - test "check registry of SocketDisconnect and on distribution called, kill socket", %{tenant: tenant} do - {socket, _} = get_connection(tenant, "authenticated") + test "check registry of SocketDisconnect and on distribution called, kill socket", %{ + tenant: tenant, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} for _ <- 1..10 do @@ -1783,11 +1740,11 @@ defmodule Realtime.Integration.RtChannelTest do describe "rate limits" do setup [:rls_context] - test "max_concurrent_users limit respected", %{tenant: tenant} do + test "max_concurrent_users limit respected", %{tenant: tenant, serializer: serializer} do %{max_concurrent_users: max_concurrent_users} = Tenants.get_tenant_by_external_id(tenant.external_id) change_tenant_configuration(tenant, :max_concurrent_users, 1) - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{random_string()}" WebsocketClient.join(socket, realtime_topic, %{config: config}) @@ -1809,14 +1766,12 @@ defmodule Realtime.Integration.RtChannelTest do change_tenant_configuration(tenant, :max_concurrent_users, max_concurrent_users) end - test "max_events_per_second limit respected", %{tenant: tenant} do - %{max_events_per_second: max_events_per_second} = Tenants.get_tenant_by_external_id(tenant.external_id) - on_exit(fn -> change_tenant_configuration(tenant, :max_events_per_second, max_events_per_second) end) + test "max_events_per_second limit respected", %{tenant: tenant, serializer: serializer} do RateCounter.stop(tenant.external_id) log = capture_log(fn -> - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} realtime_topic = "realtime:#{random_string()}" @@ -1836,11 +1791,10 @@ defmodule Realtime.Integration.RtChannelTest do assert log =~ "MessagePerSecondRateLimitReached" end - test "max_channels_per_client limit respected", %{tenant: tenant} do - %{max_events_per_second: max_concurrent_users} = Tenants.get_tenant_by_external_id(tenant.external_id) + test "max_channels_per_client limit respected", %{tenant: tenant, serializer: serializer} do change_tenant_configuration(tenant, :max_channels_per_client, 1) - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic_1 = "realtime:#{random_string()}" realtime_topic_2 = "realtime:#{random_string()}" @@ -1871,12 +1825,10 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{event: "phx_reply", topic: ^realtime_topic_2}, 500 refute_receive %Message{event: "presence_state", topic: ^realtime_topic_2}, 500 - - change_tenant_configuration(tenant, :max_channels_per_client, max_concurrent_users) end - test "max_joins_per_second limit respected", %{tenant: tenant} do - {socket, _} = get_connection(tenant, "authenticated") + test "max_joins_per_second limit respected", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:#{random_string()}" @@ -1909,9 +1861,8 @@ defmodule Realtime.Integration.RtChannelTest do assert log =~ "project=#{tenant.external_id} external_id=#{tenant.external_id} [critical] ClientJoinRateLimitReached: Too many joins per second" - # Only one log message should be emitted - # Splitting by the error message returns the error message and the rest of the log only - assert length(String.split(log, "ClientJoinRateLimitReached")) == 2 + # Only one or two log messages should be emitted + assert length(String.split(log, "ClientJoinRateLimitReached")) <= 3 end end @@ -1919,8 +1870,8 @@ defmodule Realtime.Integration.RtChannelTest do setup [:rls_context] @tag policies: [:read_matching_user_role, :write_matching_user_role], role: "anon" - test "role policies are respected when accessing the channel", %{tenant: tenant} do - {socket, _} = get_connection(tenant, "anon") + test "role policies are respected when accessing the channel", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "anon") config = %{broadcast: %{self: true}, private: true, presence: %{enabled: false}} topic = random_string() realtime_topic = "realtime:#{topic}" @@ -1929,7 +1880,7 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500 - {socket, _} = get_connection(tenant, "potato") + {socket, _} = get_connection(tenant, serializer, role: "potato") topic = random_string() realtime_topic = "realtime:#{topic}" @@ -1939,8 +1890,8 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [:authenticated_read_matching_user_sub, :authenticated_write_matching_user_sub], sub: Ecto.UUID.generate() - test "sub policies are respected when accessing the channel", %{tenant: tenant, sub: sub} do - {socket, _} = get_connection(tenant, "authenticated", %{sub: sub}) + test "sub policies are respected when accessing the channel", %{tenant: tenant, sub: sub, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated", claims: %{sub: sub}) config = %{broadcast: %{self: true}, private: true, presence: %{enabled: false}} topic = random_string() realtime_topic = "realtime:#{topic}" @@ -1949,7 +1900,7 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500 - {socket, _} = get_connection(tenant, "authenticated", %{sub: Ecto.UUID.generate()}) + {socket, _} = get_connection(tenant, serializer, role: "authenticated", claims: %{sub: Ecto.UUID.generate()}) topic = random_string() realtime_topic = "realtime:#{topic}" @@ -1957,11 +1908,9 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500 end - @tag role: "authenticated", - policies: [:broken_read_presence, :broken_write_presence] - - test "handle failing rls policy", %{tenant: tenant} do - {socket, _} = get_connection(tenant, "authenticated") + @tag role: "authenticated", policies: [:broken_read_presence, :broken_write_presence] + test "handle failing rls policy", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: true} topic = random_string() realtime_topic = "realtime:#{topic}" @@ -1991,8 +1940,8 @@ defmodule Realtime.Integration.RtChannelTest do end end - test "handle empty topic by closing the socket", %{tenant: tenant} do - {socket, _} = get_connection(tenant, "authenticated") + test "handle empty topic by closing the socket", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} realtime_topic = "realtime:" @@ -2013,11 +1962,11 @@ defmodule Realtime.Integration.RtChannelTest do refute_receive %Message{event: "presence_state"} end - def handle_telemetry(event, %{sum: sum}, metadata, _) do + def handle_telemetry(event, %{sum: sum}, metadata, name) do tenant = metadata[:tenant] [key] = Enum.take(event, -1) - Agent.update(TestCounter, fn state -> + Agent.update(name, fn state -> state = Map.put_new(state, tenant, %{joins: 0, events: 0, db_events: 0, presence_events: 0}) update_in(state, [metadata[:tenant], key], fn v -> (v || 0) + sum end) end) @@ -2026,7 +1975,8 @@ defmodule Realtime.Integration.RtChannelTest do defp get_count(event, tenant) do [key] = Enum.take(event, -1) - Agent.get(TestCounter, fn state -> get_in(state, [tenant, key]) || 0 end) + :"TestCounter_#{tenant}" + |> Agent.get(fn state -> get_in(state, [tenant, key]) || 0 end) end describe "billable events" do @@ -2038,45 +1988,24 @@ defmodule Realtime.Integration.RtChannelTest do [:realtime, :rate_counter, :channel, :presence_events] ] + name = :"TestCounter_#{tenant.external_id}" + {:ok, _} = start_supervised(%{ id: 1, - start: {Agent, :start_link, [fn -> %{} end, [name: TestCounter]]} + start: {Agent, :start_link, [fn -> %{} end, [name: name]]} }) RateCounter.stop(tenant.external_id) - on_exit(fn -> :telemetry.detach(__MODULE__) end) - :telemetry.attach_many(__MODULE__, events, &__MODULE__.handle_telemetry/4, []) - - {:ok, conn} = Database.connect(tenant, "realtime_test") - - # Setup for postgres changes - Database.transaction(conn, fn db_conn -> - queries = [ - "drop table if exists public.test", - "drop publication if exists supabase_realtime_test", - "create sequence if not exists test_id_seq;", - """ - create table if not exists "public"."test" ( - "id" int4 not null default nextval('test_id_seq'::regclass), - "details" text, - primary key ("id")); - """, - "grant all on table public.test to anon;", - "grant all on table public.test to postgres;", - "grant all on table public.test to authenticated;", - "create publication supabase_realtime_test for all tables" - ] - - Enum.each(queries, &Postgrex.query!(db_conn, &1, [])) - end) + on_exit(fn -> :telemetry.detach({__MODULE__, tenant.external_id}) end) + :telemetry.attach_many({__MODULE__, tenant.external_id}, events, &__MODULE__.handle_telemetry/4, name) :ok end - test "join events", %{tenant: tenant} do + test "join events", %{tenant: tenant, serializer: serializer} do external_id = tenant.external_id - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) config = %{broadcast: %{self: true}, postgres_changes: [%{event: "*", schema: "public"}]} topic = "realtime:any" @@ -2101,9 +2030,9 @@ defmodule Realtime.Integration.RtChannelTest do assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id) end - test "broadcast events", %{tenant: tenant} do + test "broadcast events", %{tenant: tenant, serializer: serializer} do external_id = tenant.external_id - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) config = %{broadcast: %{self: true}} topic = "realtime:any" @@ -2114,7 +2043,7 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{topic: ^topic, event: "presence_state"} # Add second client so we can test the "multiplication" of billable events - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) WebsocketClient.join(socket, topic, %{config: config}) # Join events @@ -2143,9 +2072,9 @@ defmodule Realtime.Integration.RtChannelTest do assert 15 = get_count([:realtime, :rate_counter, :channel, :events], external_id) end - test "presence events", %{tenant: tenant} do + test "presence events", %{tenant: tenant, serializer: serializer} do external_id = tenant.external_id - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) config = %{broadcast: %{self: true}, presence: %{enabled: true}} topic = "realtime:any" @@ -2165,7 +2094,7 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "presence_diff", payload: %{"joins" => _, "leaves" => %{}}, topic: ^topic} # Presence events - {socket, _} = get_connection(tenant, "authenticated") + {socket, _} = get_connection(tenant, serializer, role: "authenticated") WebsocketClient.join(socket, topic, %{config: config}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 @@ -2195,9 +2124,9 @@ defmodule Realtime.Integration.RtChannelTest do assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id) end - test "postgres changes events", %{tenant: tenant} do + test "postgres changes events", %{tenant: tenant, serializer: serializer} do external_id = tenant.external_id - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) config = %{broadcast: %{self: true}, postgres_changes: [%{event: "*", schema: "public"}]} topic = "realtime:any" @@ -2209,7 +2138,7 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{topic: ^topic, event: "system"}, 5000 # Add second user to test the "multiplication" of billable events - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) WebsocketClient.join(socket, topic, %{config: config}) assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300 assert_receive %Message{topic: ^topic, event: "presence_state"}, 500 @@ -2245,9 +2174,9 @@ defmodule Realtime.Integration.RtChannelTest do assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id) end - test "postgres changes error events", %{tenant: tenant} do + test "postgres changes error events", %{tenant: tenant, serializer: serializer} do external_id = tenant.external_id - {socket, _} = get_connection(tenant) + {socket, _} = get_connection(tenant, serializer) config = %{broadcast: %{self: true}, postgres_changes: [%{event: "*", schema: "none"}]} topic = "realtime:any" @@ -2273,90 +2202,6 @@ defmodule Realtime.Integration.RtChannelTest do end end - test "tracks and untracks properly channels", %{tenant: tenant} do - assert [] = Tracker.list_pids() - - {socket, _} = get_connection(tenant) - config = %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} - - topics = - for _ <- 1..10 do - topic = "realtime:#{random_string()}" - :ok = WebsocketClient.join(socket, topic, %{config: config}) - assert_receive %Message{topic: ^topic, event: "phx_reply"}, 500 - topic - end - - assert [{_pid, count}] = Tracker.list_pids() - assert count == length(topics) - - for topic <- topics do - :ok = WebsocketClient.leave(socket, topic, %{}) - assert_receive %Message{topic: ^topic, event: "phx_close"}, 500 - end - - start_supervised!({Tracker, check_interval_in_ms: 100}) - # wait to trigger tracker - assert_process_down(socket, 1000) - assert [] = Tracker.list_pids() - end - - test "failed connections are present in tracker with counter counter lower than 0 so they are actioned on by tracker", - %{tenant: tenant} do - assert [] = Tracker.list_pids() - - {socket, _} = get_connection(tenant) - config = %{broadcast: %{self: true}, private: true, presence: %{enabled: false}} - - for _ <- 1..10 do - topic = "realtime:#{random_string()}" - :ok = WebsocketClient.join(socket, topic, %{config: config}) - assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "error"}}, 500 - end - - assert [{_pid, count}] = Tracker.list_pids() - assert count == 0 - end - - test "failed connections but one succeeds properly tracks", - %{tenant: tenant} do - assert [] = Tracker.list_pids() - - {socket, _} = get_connection(tenant) - topic = "realtime:#{random_string()}" - - :ok = - WebsocketClient.join(socket, topic, %{ - config: %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} - }) - - assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "ok"}}, 500 - assert [{_pid, count}] = Tracker.list_pids() - assert count == 1 - - for _ <- 1..10 do - topic = "realtime:#{random_string()}" - - :ok = - WebsocketClient.join(socket, topic, %{ - config: %{broadcast: %{self: true}, private: true, presence: %{enabled: false}} - }) - - assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "error"}}, 500 - end - - topic = "realtime:#{random_string()}" - - :ok = - WebsocketClient.join(socket, topic, %{ - config: %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} - }) - - assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "ok"}}, 500 - assert [{_pid, count}] = Tracker.list_pids() - assert count == 2 - end - describe "WAL bloat handling" do setup %{tenant: tenant} do topic = random_string() @@ -2409,8 +2254,8 @@ defmodule Realtime.Integration.RtChannelTest do %{topic: topic} end - test "track PID changes during WAL bloat creation", %{tenant: tenant, topic: topic} do - {socket, _} = get_connection(tenant, "authenticated") + test "track PID changes during WAL bloat creation", %{tenant: tenant, topic: topic, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer, role: "authenticated") config = %{broadcast: %{self: true}, private: false} full_topic = "realtime:#{topic}" @@ -2471,7 +2316,7 @@ defmodule Realtime.Integration.RtChannelTest do # Check if we are receiving the message from replication connection Postgrex.query!(db_conn, "INSERT INTO wal_test VALUES (1, 'test')", []) - assert_receive %Phoenix.Socket.Message{ + assert_receive %Message{ event: "broadcast", payload: %{ "event" => "test", @@ -2486,45 +2331,6 @@ defmodule Realtime.Integration.RtChannelTest do end end - defp mode(%{mode: :distributed}) do - tenant = Api.get_tenant_by_external_id("dev_tenant") - - RateCounter.stop(tenant.external_id) - :ets.delete_all_objects(Tracker.table_name()) - - Connect.shutdown(tenant.external_id) - # Sleeping so that syn can forget about this Connect process - Process.sleep(100) - - on_exit(fn -> - Connect.shutdown(tenant.external_id) - # Sleeping so that syn can forget about this Connect process - Process.sleep(100) - end) - - on_exit(fn -> Connect.shutdown(tenant.external_id) end) - {:ok, node} = Clustered.start() - region = Tenants.region(tenant) - {:ok, db_conn} = :erpc.call(node, Connect, :connect, ["dev_tenant", region]) - assert Connect.ready?(tenant.external_id) - - assert node(db_conn) == node - %{db_conn: db_conn, node: node, tenant: tenant} - end - - defp mode(_) do - tenant = Containers.checkout_tenant(run_migrations: true) - RateCounter.stop(tenant.external_id) - - :ets.delete_all_objects(Tracker.table_name()) - Realtime.Tenants.Connect.shutdown(tenant.external_id) - # Sleeping so that syn can forget about this Connect process - Process.sleep(100) - {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) - assert Connect.ready?(tenant.external_id) - %{db_conn: db_conn, tenant: tenant} - end - defp rls_context(%{tenant: tenant} = context) do {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) clean_table(db_conn, "realtime", "messages") @@ -2581,12 +2387,13 @@ defmodule Realtime.Integration.RtChannelTest do end defp change_tenant_configuration(%Tenant{external_id: external_id}, limit, value) do - external_id - |> Realtime.Tenants.get_tenant_by_external_id() - |> Realtime.Api.Tenant.changeset(%{limit => value}) - |> Realtime.Repo.update!() + tenant = + external_id + |> Realtime.Tenants.get_tenant_by_external_id() + |> Realtime.Api.Tenant.changeset(%{limit => value}) + |> Realtime.Repo.update!() - Realtime.Tenants.Cache.invalidate_tenant_cache(external_id) + Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant}) end defp assert_process_down(pid, timeout \\ 1000) do diff --git a/test/integration/tests.ts b/test/integration/tests.ts new file mode 100644 index 000000000..bf3f2e6d4 --- /dev/null +++ b/test/integration/tests.ts @@ -0,0 +1,169 @@ +import { RealtimeClient } from "npm:@supabase/supabase-js@2.80.1-canary.3"; +import { sleep } from "https://deno.land/x/sleep/mod.ts"; +import { describe, it } from "jsr:@std/testing/bdd"; +import { assertEquals } from "jsr:@std/assert"; +import { deadline } from "jsr:@std/async/deadline"; + +const withDeadline = Promise>(fn: Fn, ms: number): Fn => + ((...args) => deadline(fn(...args), ms)) as Fn; + +const url = "http://realtime-dev.localhost:4100/socket"; +const serviceRoleKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIwNzU3NzYzODIsInJlZiI6IjEyNy4wLjAuMSIsInJvbGUiOiJzZXJ2aWNlX3JvbGUiLCJpYXQiOjE3NjA3NzYzODJ9.nupH8pnrOTgK9Xaq8-D4Ry-yQ-PnlXEagTVywQUJVIE" +const apiKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIwNzU2NjE3MjEsInJlZiI6IjEyNy4wLjAuMSIsInJvbGUiOiJhdXRoZW50aWNhdGVkIiwiaWF0IjoxNzYwNjYxNzIxfQ.PxpBoelC9vWQ2OVhmwKBUDEIKgX7MpgSdsnmXw7UdYk"; + +const realtimeV1 = { vsn: '1.0.0', params: { apikey: apiKey } , heartbeatIntervalMs: 5000, timeout: 5000 }; +const realtimeV2 = { vsn: '2.0.0', params: { apikey: apiKey } , heartbeatIntervalMs: 5000, timeout: 5000 }; +const realtimeServiceRole = { vsn: '2.0.0', logger: console.log, params: { apikey: serviceRoleKey } , heartbeatIntervalMs: 5000, timeout: 5000 }; + +let clientV1: RealtimeClient | null; +let clientV2: RealtimeClient | null; + +describe("broadcast extension", { sanitizeOps: false, sanitizeResources: false }, () => { + it("users with different versions can receive self broadcast", withDeadline(async () => { + clientV1 = new RealtimeClient(url, realtimeV1) + clientV2 = new RealtimeClient(url, realtimeV2) + let resultV1 = null; + let resultV2 = null; + let event = crypto.randomUUID(); + let topic = "topic:" + crypto.randomUUID(); + let expectedPayload = { message: crypto.randomUUID() }; + const config = { config: { broadcast: { ack: true, self: true } } }; + + const channelV1 = clientV1 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV1 = payload)) + .subscribe(); + + const channelV2 = clientV2 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV2 = payload)) + .subscribe(); + + while (channelV1.state != "joined" || channelV2.state != "joined") await sleep(0.2); + + // Send from V1 client - both should receive + await channelV1.send({ + type: "broadcast", + event, + payload: expectedPayload, + }); + + while (resultV1 == null || resultV2 == null) await sleep(0.2); + + assertEquals(resultV1, expectedPayload); + assertEquals(resultV2, expectedPayload); + + // Reset results for second test + resultV1 = null; + resultV2 = null; + let expectedPayload2 = { message: crypto.randomUUID() }; + + // Send from V2 client - both should receive + await channelV2.send({ + type: "broadcast", + event, + payload: expectedPayload2, + }); + + while (resultV1 == null || resultV2 == null) await sleep(0.2); + + assertEquals(resultV1, expectedPayload2); + assertEquals(resultV2, expectedPayload2); + + await channelV1.unsubscribe(); + await channelV2.unsubscribe(); + + await stopClient(clientV1); + await stopClient(clientV2); + clientV1 = null; + clientV2 = null; + }, 5000)); + + it("users with different versions can receive broadcasts from endpoint", withDeadline(async () => { + clientV1 = new RealtimeClient(url, realtimeV1) + clientV2 = new RealtimeClient(url, realtimeV2) + let resultV1 = null; + let resultV2 = null; + let event = crypto.randomUUID(); + let topic = "topic:" + crypto.randomUUID(); + let expectedPayload = { message: crypto.randomUUID() }; + const config = { config: { broadcast: { ack: true, self: true } } }; + + const channelV1 = clientV1 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV1 = payload)) + .subscribe(); + + const channelV2 = clientV2 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV2 = payload)) + .subscribe(); + + while (channelV1.state != "joined" || channelV2.state != "joined") await sleep(0.2); + + // Send from unsubscribed channel - both should receive + new RealtimeClient(url, realtimeServiceRole).channel(topic, config).httpSend(event, expectedPayload); + + while (resultV1 == null || resultV2 == null) await sleep(0.2); + + assertEquals(resultV1, expectedPayload); + assertEquals(resultV2, expectedPayload); + + await channelV1.unsubscribe(); + await channelV2.unsubscribe(); + + await stopClient(clientV1); + await stopClient(clientV2); + clientV1 = null; + clientV2 = null; + }, 5000)); +}); + +// describe("presence extension", () => { +// it("user is able to receive presence updates", async () => { +// let result: any = []; +// let error = null; +// let topic = "topic:" + crypto.randomUUID(); +// let keyV1 = "key V1"; +// let keyV2 = "key V2"; +// +// const configV1 = { config: { presence: { keyV1 } } }; +// const configV2 = { config: { presence: { keyV1 } } }; +// +// const channelV1 = clientV1 +// .channel(topic, configV1) +// .on("presence", { event: "join" }, ({ key, newPresences }) => +// result.push({ key, newPresences }) +// ) +// .subscribe(); +// +// const channelV2 = clientV2 +// .channel(topic, configV2) +// .on("presence", { event: "join" }, ({ key, newPresences }) => +// result.push({ key, newPresences }) +// ) +// .subscribe(); +// +// while (channelV1.state != "joined" || channelV2.state != "joined") await sleep(0.2); +// +// const resV1 = await channelV1.track({ key: keyV1 }); +// const resV2 = await channelV2.track({ key: keyV2 }); +// +// if (resV1 == "timed out" || resV2 == "timed out") error = resV1 || resV2; +// +// sleep(2.2); +// +// // FIXME write assertions +// console.log(result) +// let presences = result[0].newPresences[0]; +// assertEquals(result[0].key, keyV1); +// assertEquals(presences.message, message); +// assertEquals(error, null); +// }); +// }); + +async function stopClient(client: RealtimeClient | null) { + if (client) { + await client.removeAllChannels(); + } +} diff --git a/test/integration/tracker_test.exs b/test/integration/tracker_test.exs new file mode 100644 index 000000000..32b73f65a --- /dev/null +++ b/test/integration/tracker_test.exs @@ -0,0 +1,101 @@ +defmodule Integration.TrackerTest do + # Changing the Tracker ETS table + use RealtimeWeb.ConnCase, async: false + + alias RealtimeWeb.RealtimeChannel.Tracker + alias Phoenix.Socket.Message + alias Realtime.Tenants.Connect + alias Realtime.Integration.WebsocketClient + + setup do + tenant = Containers.checkout_tenant(run_migrations: true) + :ets.delete_all_objects(Tracker.table_name()) + + {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert Connect.ready?(tenant.external_id) + %{db_conn: db_conn, tenant: tenant} + end + + test "tracks and untracks properly channels", %{tenant: tenant} do + {socket, _} = get_connection(tenant) + config = %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} + + topics = + for _ <- 1..10 do + topic = "realtime:#{random_string()}" + :ok = WebsocketClient.join(socket, topic, %{config: config}) + assert_receive %Message{topic: ^topic, event: "phx_reply"}, 500 + topic + end + + for topic <- topics do + :ok = WebsocketClient.leave(socket, topic, %{}) + assert_receive %Message{topic: ^topic, event: "phx_close"}, 500 + end + + start_supervised!({Tracker, check_interval_in_ms: 100}) + # wait to trigger tracker + assert_process_down(socket, 1000) + end + + test "failed connections are present in tracker with counter lower than 0 so they are actioned on by tracker", %{ + tenant: tenant + } do + assert [] = Tracker.list_pids() + + {socket, _} = get_connection(tenant) + config = %{broadcast: %{self: true}, private: true, presence: %{enabled: false}} + + for _ <- 1..10 do + topic = "realtime:#{random_string()}" + :ok = WebsocketClient.join(socket, topic, %{config: config}) + assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "error"}}, 500 + end + + assert [{_pid, count}] = Tracker.list_pids() + assert count == 0 + end + + test "failed connections but one succeeds properly tracks", %{tenant: tenant} do + assert [] = Tracker.list_pids() + + {socket, _} = get_connection(tenant) + topic = "realtime:#{random_string()}" + + :ok = + WebsocketClient.join(socket, topic, %{ + config: %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} + }) + + assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "ok"}}, 500 + assert [{_pid, count}] = Tracker.list_pids() + assert count == 1 + + for _ <- 1..10 do + topic = "realtime:#{random_string()}" + + :ok = + WebsocketClient.join(socket, topic, %{ + config: %{broadcast: %{self: true}, private: true, presence: %{enabled: false}} + }) + + assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "error"}}, 500 + end + + topic = "realtime:#{random_string()}" + + :ok = + WebsocketClient.join(socket, topic, %{ + config: %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} + }) + + assert_receive %Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "ok"}}, 500 + assert [{_pid, count}] = Tracker.list_pids() + assert count == 2 + end + + defp assert_process_down(pid, timeout) do + ref = Process.monitor(pid) + assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout + end +end diff --git a/test/realtime/gen_rpc_pub_sub_test.exs b/test/realtime/gen_rpc_pub_sub_test.exs index 677a354ef..25e8154e7 100644 --- a/test/realtime/gen_rpc_pub_sub_test.exs +++ b/test/realtime/gen_rpc_pub_sub_test.exs @@ -23,7 +23,7 @@ defmodule Realtime.GenRpcPubSubTest do def subscribe(subscriber, topic) do spawn(fn -> RealtimeWeb.Endpoint.subscribe(topic) - send(subscriber, :ready) + send(subscriber, {:ready, Application.get_env(:realtime, :region)}) loop = fn f -> receive do @@ -97,9 +97,12 @@ defmodule Realtime.GenRpcPubSubTest do # Ensuring that syn had enough time to propagate to all nodes the group information Process.sleep(3000) - assert_receive :ready - assert_receive :ready - assert_receive :ready + assert length(Realtime.Nodes.region_nodes("us-east-1")) == 2 + assert length(Realtime.Nodes.region_nodes("ap-southeast-2")) == 2 + + assert_receive {:ready, "us-east-1"} + assert_receive {:ready, "ap-southeast-2"} + assert_receive {:ready, "ap-southeast-2"} message = %Phoenix.Socket.Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]} Phoenix.PubSub.broadcast(Realtime.PubSub, @topic, message) diff --git a/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs b/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs index ad9198c97..e2879880f 100644 --- a/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs +++ b/test/realtime/monitoring/prom_ex/plugins/phoenix_test.exs @@ -21,10 +21,20 @@ defmodule Realtime.PromEx.Plugins.PhoenixTest do {:ok, token} = token_valid(tenant, "anon", %{}) {:ok, _} = - WebsocketClient.connect(self(), uri(tenant, 4002), Phoenix.Socket.V1.JSONSerializer, [{"x-api-key", token}]) + WebsocketClient.connect( + self(), + uri(tenant, Phoenix.Socket.V1.JSONSerializer, 4002), + Phoenix.Socket.V1.JSONSerializer, + [{"x-api-key", token}] + ) {:ok, _} = - WebsocketClient.connect(self(), uri(tenant, 4002), Phoenix.Socket.V1.JSONSerializer, [{"x-api-key", token}]) + WebsocketClient.connect( + self(), + uri(tenant, Phoenix.Socket.V1.JSONSerializer, 4002), + Phoenix.Socket.V1.JSONSerializer, + [{"x-api-key", token}] + ) Process.sleep(200) assert metric_value() >= 2 diff --git a/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs b/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs index c6e8ba21e..9b6298666 100644 --- a/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs +++ b/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs @@ -1,5 +1,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do - use Realtime.DataCase, async: true + use Realtime.DataCase, + async: true, + parameterize: [%{serializer: Phoenix.Socket.V1.JSONSerializer}, %{serializer: RealtimeWeb.Socket.V2Serializer}] + use Mimic import Generators @@ -17,21 +20,24 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do setup [:initiate_tenant] + @payload %{"a" => "b"} + describe "handle/3" do - test "with write true policy, user is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do + test "with write true policy, user is able to send message", + %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do socket = socket_fixture(tenant, topic, policies: %Policies{broadcast: %BroadcastPolicies{write: true}}) for _ <- 1..100, reduce: socket do socket -> - {:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end for _ <- 1..100 do topic = "realtime:#{topic}" assert_receive {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + + assert Jason.decode!(data) == message(serializer, topic, @payload) end Process.sleep(120) @@ -57,20 +63,20 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do end @tag policies: [:authenticated_read_broadcast, :authenticated_write_broadcast] - test "with nil policy but valid user, is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do + test "with nil policy but valid user, is able to send message", + %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do socket = socket_fixture(tenant, topic) for _ <- 1..100, reduce: socket do socket -> - {:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + assert Jason.decode!(data) == message(serializer, topic, @payload) end Process.sleep(120) @@ -80,7 +86,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do end @tag policies: [:authenticated_read_matching_user_sub, :authenticated_write_matching_user_sub], sub: UUID.generate() - test "with valid sub, is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn, sub: sub} do + test "with valid sub, is able to send message", + %{topic: topic, tenant: tenant, db_conn: db_conn, sub: sub, serializer: serializer} do socket = socket_fixture(tenant, topic, policies: %Policies{broadcast: %BroadcastPolicies{write: nil, read: true}}, @@ -89,15 +96,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do for _ <- 1..100, reduce: socket do socket -> - {:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + assert Jason.decode!(data) == message(serializer, topic, @payload) end end @@ -119,7 +125,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do end @tag policies: [:read_matching_user_role, :write_matching_user_role], role: "anon" - test "with valid role, is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do + test "with valid role, is able to send message", + %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do socket = socket_fixture(tenant, topic, policies: %Policies{broadcast: %BroadcastPolicies{write: nil, read: true}}, @@ -128,15 +135,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do for _ <- 1..100, reduce: socket do socket -> - {:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + assert Jason.decode!(data) == message(serializer, topic, @payload) end end @@ -173,7 +179,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do end @tag policies: [:authenticated_read_broadcast, :authenticated_write_broadcast] - test "validation only runs once on nil and valid policies", %{topic: topic, tenant: tenant, db_conn: db_conn} do + test "validation only runs once on nil and valid policies", + %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do socket = socket_fixture(tenant, topic) expect(Authorization, :get_write_authorizations, 1, fn conn, db_conn, auth_context -> @@ -184,15 +191,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do for _ <- 1..100, reduce: socket do socket -> - {:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end for _ <- 1..100 do topic = "realtime:#{topic}" assert_receive {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + assert Jason.decode!(data) == message(serializer, topic, @payload) end end @@ -212,7 +218,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do refute_receive _, 100 end - test "no ack still sends message", %{topic: topic, tenant: tenant, db_conn: db_conn} do + test "no ack still sends message", %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do socket = socket_fixture(tenant, topic, policies: %Policies{broadcast: %BroadcastPolicies{write: true}}, @@ -221,7 +227,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do for _ <- 1..100, reduce: socket do socket -> - {:noreply, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:noreply, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end @@ -230,25 +236,24 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + assert Jason.decode!(data) == message(serializer, topic, @payload) end end - test "public channels are able to send messages", %{topic: topic, tenant: tenant, db_conn: db_conn} do + test "public channels are able to send messages", + %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do socket = socket_fixture(tenant, topic, private?: false, policies: nil) for _ <- 1..100, reduce: socket do socket -> - {:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + assert Jason.decode!(data) == message(serializer, topic, @payload) end Process.sleep(120) @@ -257,20 +262,20 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do assert avg > 0.0 end - test "public channels are able to send messages and ack", %{topic: topic, tenant: tenant, db_conn: db_conn} do + test "public channels are able to send messages and ack", + %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do socket = socket_fixture(tenant, topic, private?: false, policies: nil) for _ <- 1..100, reduce: socket do socket -> - {:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket) + {:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket) socket end for _ <- 1..100 do topic = "realtime:#{topic}" assert_receive {:socket_push, :text, data} - message = data |> IO.iodata_to_binary() |> Jason.decode!() - assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} + assert Jason.decode!(data) == message(serializer, topic, @payload) end Process.sleep(120) @@ -280,6 +285,82 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do assert avg > 0.0 end + test "V2 json UserBroadcastPush", %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do + socket = socket_fixture(tenant, topic, private?: false, policies: nil) + + user_broadcast_payload = %{"a" => "b"} + json_encoded_user_broadcast_payload = Jason.encode!(user_broadcast_payload) + + {:reply, :ok, _socket} = + BroadcastHandler.handle({"event123", :json, json_encoded_user_broadcast_payload}, db_conn, socket) + + topic = "realtime:#{topic}" + assert_receive {:socket_push, code, data} + + if serializer == RealtimeWeb.Socket.V2Serializer do + assert code == :binary + + assert data == + << + # user broadcast = 4 + 4::size(8), + # topic_size + byte_size(topic), + # user_event_size + byte_size("event123"), + # metadata_size + 0, + # json encoding + 1::size(8), + topic::binary, + "event123" + >> <> json_encoded_user_broadcast_payload + else + assert code == :text + + assert Jason.decode!(data) == + message(serializer, topic, %{ + "event" => "event123", + "payload" => user_broadcast_payload, + "type" => "broadcast" + }) + end + end + + test "V2 binary UserBroadcastPush", %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do + socket = socket_fixture(tenant, topic, private?: false, policies: nil) + + user_broadcast_payload = <<123, 456, 789>> + + {:reply, :ok, _socket} = + BroadcastHandler.handle({"event123", :binary, user_broadcast_payload}, db_conn, socket) + + topic = "realtime:#{topic}" + + if serializer == RealtimeWeb.Socket.V2Serializer do + assert_receive {:socket_push, :binary, data} + + assert data == + << + # user broadcast = 4 + 4::size(8), + # topic_size + byte_size(topic), + # user_event_size + byte_size("event123"), + # metadata_size + 0, + # binary encoding + 0::size(8), + topic::binary, + "event123" + >> <> user_broadcast_payload + else + # Can't receive binary payloads on V1 serializer + refute_receive {:socket_push, _code, _data} + end + end + @tag policies: [:broken_write_presence] test "handle failing rls policy", %{topic: topic, tenant: tenant, db_conn: db_conn} do socket = socket_fixture(tenant, topic) @@ -384,7 +465,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do fastlane = RealtimeWeb.RealtimeChannel.MessageDispatcher.fastlane_metadata( self(), - Phoenix.Socket.V1.JSONSerializer, + context.serializer, "realtime:#{topic}", :warning, "tenant_id" @@ -442,4 +523,10 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do } } end + + defp message(RealtimeWeb.Socket.V2Serializer, topic, payload), do: [nil, nil, topic, "broadcast", payload] + + defp message(Phoenix.Socket.V1.JSONSerializer, topic, payload) do + %{"event" => "broadcast", "payload" => payload, "ref" => nil, "topic" => topic} + end end diff --git a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs index 1bc74520c..2670394a1 100644 --- a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs +++ b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs @@ -4,7 +4,10 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do import ExUnit.CaptureLog alias Phoenix.Socket.Broadcast + alias Phoenix.Socket.V1 alias RealtimeWeb.RealtimeChannel.MessageDispatcher + alias RealtimeWeb.Socket.UserBroadcast + alias RealtimeWeb.Socket.V2Serializer defmodule TestSerializer do def fastlane!(msg) do @@ -233,5 +236,233 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do # TestSerializer is not called assert Agent.get(TestSerializer, & &1) == 0 end + + test "dispatches Broadcast to V1 & V2 Serializers" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + + subscribers = [ + {subscriber_pid, {:rc_fastlane, self(), V1.JSONSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V1.JSONSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V2Serializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V2Serializer, "realtime:topic", :info, "tenant123", MapSet.new()}} + ] + + msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}} + + log = + capture_log(fn -> + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + end) + + assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}" + + # Receive 2 messages using V1 + assert_receive {:socket_push, :text, message_v1} + assert_receive {:socket_push, :text, ^message_v1} + + assert Jason.decode!(message_v1) == %{ + "event" => "event", + "payload" => %{"data" => "test"}, + "ref" => nil, + "topic" => "realtime:topic" + } + + # Receive 2 messages using V2 + assert_receive {:socket_push, :text, message_v2} + assert_receive {:socket_push, :text, ^message_v2} + + # V2 is an array format + assert Jason.decode!(message_v2) == [nil, nil, "realtime:topic", "event", %{"data" => "test"}] + + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + + refute_receive _any + end + + test "dispatches json UserBroadcast to V1 & V2 Serializers" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + + subscribers = [ + {subscriber_pid, {:rc_fastlane, self(), V1.JSONSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V1.JSONSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V2Serializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V2Serializer, "realtime:topic", :info, "tenant123", MapSet.new()}} + ] + + user_payload = Jason.encode!(%{data: "test"}) + + msg = %UserBroadcast{ + topic: "some:other:topic", + user_event: "event123", + user_payload: user_payload, + user_payload_encoding: :json, + metadata: %{"id" => "123", "replayed" => true} + } + + log = + capture_log(fn -> + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + end) + + assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}" + + # Receive 2 messages using V1 + assert_receive {:socket_push, :text, message_v1} + assert_receive {:socket_push, :text, ^message_v1} + + assert Jason.decode!(message_v1) == %{ + "event" => "broadcast", + "payload" => %{ + "event" => "event123", + "meta" => %{"id" => "123", "replayed" => true}, + "payload" => %{"data" => "test"}, + "type" => "broadcast" + }, + "ref" => nil, + "topic" => "realtime:topic" + } + + # Receive 2 messages using V2 + assert_receive {:socket_push, :binary, message_v2} + assert_receive {:socket_push, :binary, ^message_v2} + + encoded_metadata = Jason.encode!(%{"id" => "123", "replayed" => true}) + metadata_size = byte_size(encoded_metadata) + + # binary payload structure + assert message_v2 == + << + # user broadcast = 4 + 4::size(8), + # topic_size + 14, + # user_event_size + 8, + # metadata_size + metadata_size, + # json encoding + 1::size(8), + "realtime:topic", + "event123" + >> <> encoded_metadata <> user_payload + + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + + refute_receive _any + end + + test "dispatches binary UserBroadcast to V1 & V2 Serializers" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + + subscribers = [ + {subscriber_pid, {:rc_fastlane, self(), V1.JSONSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V1.JSONSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V2Serializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), V2Serializer, "realtime:topic", :info, "tenant123", MapSet.new()}} + ] + + user_payload = <<123, 456, 789>> + + msg = %UserBroadcast{ + topic: "some:other:topic", + user_event: "event123", + user_payload: user_payload, + user_payload_encoding: :binary, + metadata: %{"id" => "123", "replayed" => true} + } + + log = + capture_log(fn -> + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + end) + + assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}" + assert log =~ "User payload encoding is not JSON" + + # No V1 message received as binary payloads are not supported + refute_receive {:socket_push, :text, _message_v1} + + # Receive 2 messages using V2 + assert_receive {:socket_push, :binary, message_v2} + assert_receive {:socket_push, :binary, ^message_v2} + + encoded_metadata = Jason.encode!(%{"id" => "123", "replayed" => true}) + metadata_size = byte_size(encoded_metadata) + + # binary payload structure + assert message_v2 == + << + # user broadcast = 4 + 4::size(8), + # topic_size + 14, + # user_event_size + 8, + # metadata_size + metadata_size, + # binary encoding + 0::size(8), + "realtime:topic", + "event123" + >> <> encoded_metadata <> user_payload + + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + assert_receive {:subscriber, :update_rate_counter} + + refute_receive _any + end end end diff --git a/test/realtime_web/socket/v2_serializer_test.exs b/test/realtime_web/socket/v2_serializer_test.exs new file mode 100644 index 000000000..3bebb337d --- /dev/null +++ b/test/realtime_web/socket/v2_serializer_test.exs @@ -0,0 +1,513 @@ +defmodule RealtimeWeb.Socket.V2SerializerTest do + use ExUnit.Case, async: true + + alias Phoenix.Socket.{Broadcast, Message, Reply} + alias RealtimeWeb.Socket.UserBroadcast + alias RealtimeWeb.Socket.V2Serializer + + @serializer V2Serializer + @v2_fastlane_json "[null,null,\"t\",\"e\",{\"m\":1}]" + @v2_msg_json "[null,null,\"t\",\"e\",{\"m\":1}]" + + @client_push << + # push + 0::size(8), + # join_ref_size + 2, + # ref_size + 3, + # topic_size + 5, + # event_size + 5, + "12", + "123", + "topic", + "event", + 101, + 102, + 103 + >> + + @client_binary_user_broadcast_push << + # user broadcast push + 3::size(8), + # join_ref_size + 2, + # ref_size + 3, + # topic_size + 5, + # user_event_size + 10, + # binary encoding + 0::size(8), + "12", + "123", + "topic", + "user_event", + 101, + 102, + 103 + >> + + @client_json_user_broadcast_push << + # user broadcast push + 3::size(8), + # join_ref_size + 2, + # ref_size + 3, + # topic_size + 5, + # user_event_size + 10, + # json encoding + 1::size(8), + "12", + "123", + "topic", + "user_event", + 123, + 34, + 97, + 34, + 58, + 34, + 98, + 34, + 125 + >> + + @reply << + # reply + 1::size(8), + # join_ref_size + 2, + # ref_size + 3, + # topic_size + 5, + # status_size + 2, + "12", + "123", + "topic", + "ok", + 101, + 102, + 103 + >> + + @broadcast << + # broadcast + 2::size(8), + # topic_size + 5, + # event_size + 5, + "topic", + "event", + 101, + 102, + 103 + >> + + @binary_user_broadcast << + # user broadcast + 4::size(8), + # topic_size + 5, + # user_event_size + 10, + # metadata_size + 17, + # binary encoding + 0::size(8), + "topic", + "user_event", + # metadata + 123, + 34, + 114, + 101, + 112, + 108, + 97, + 121, + 101, + 100, + 34, + 58, + 116, + 114, + 117, + 101, + 125, + # payload + 101, + 102, + 103 + >> + + @binary_user_broadcast_no_metadata << + # user broadcast + 4::size(8), + # topic_size + 5, + # user_event_size + 10, + # metadata_size + 0, + # binary encoding + 0::size(8), + "topic", + "user_event", + # metadata + # payload + 101, + 102, + 103 + >> + + @json_user_broadcast << + # user broadcast + 4::size(8), + # topic_size + 5, + # user_event_size + 10, + # metadata_size + 17, + # json encoding + 1::size(8), + "topic", + "user_event", + # metadata + 123, + 34, + 114, + 101, + 112, + 108, + 97, + 121, + 101, + 100, + 34, + 58, + 116, + 114, + 117, + 101, + 125, + # payload + 123, + 34, + 97, + 34, + 58, + 34, + 98, + 34, + 125 + >> + + @json_user_broadcast_no_metadata << + # broadcast + 4::size(8), + # topic_size + 5, + # user_event_size + 10, + # metadata_size + 0, + # json encoding + 1::size(8), + "topic", + "user_event", + # metadata + # payload + 123, + 34, + 97, + 34, + 58, + 34, + 98, + 34, + 125 + >> + + defp encode!(serializer, msg) do + case serializer.encode!(msg) do + {:socket_push, :text, encoded} -> + assert is_list(encoded) + IO.iodata_to_binary(encoded) + + {:socket_push, :binary, encoded} -> + assert is_binary(encoded) + encoded + end + end + + defp decode!(serializer, msg, opts), do: serializer.decode!(msg, opts) + + defp fastlane!(serializer, msg) do + case serializer.fastlane!(msg) do + {:socket_push, :text, encoded} -> + assert is_list(encoded) + IO.iodata_to_binary(encoded) + + {:socket_push, :binary, encoded} -> + assert is_binary(encoded) + encoded + end + end + + test "encode!/1 encodes `Phoenix.Socket.Message` as JSON" do + msg = %Message{topic: "t", event: "e", payload: %{m: 1}} + assert encode!(@serializer, msg) == @v2_msg_json + end + + test "encode!/1 raises when payload is not a map" do + msg = %Message{topic: "t", event: "e", payload: "invalid"} + assert_raise ArgumentError, fn -> encode!(@serializer, msg) end + end + + test "encode!/1 encodes `Phoenix.Socket.Reply` as JSON" do + msg = %Reply{topic: "t", payload: %{m: 1}} + encoded = encode!(@serializer, msg) + + assert Jason.decode!(encoded) == [ + nil, + nil, + "t", + "phx_reply", + %{"response" => %{"m" => 1}, "status" => nil} + ] + end + + test "decode!/2 decodes `Phoenix.Socket.Message` from JSON" do + assert %Message{topic: "t", event: "e", payload: %{"m" => 1}} == + decode!(@serializer, @v2_msg_json, opcode: :text) + end + + test "fastlane!/1 encodes a broadcast into a message as JSON" do + msg = %Broadcast{topic: "t", event: "e", payload: %{m: 1}} + assert fastlane!(@serializer, msg) == @v2_fastlane_json + end + + test "fastlane!/1 raises when payload is not a map" do + msg = %Broadcast{topic: "t", event: "e", payload: "invalid"} + assert_raise ArgumentError, fn -> fastlane!(@serializer, msg) end + end + + describe "binary encode" do + test "general pushed message" do + push = << + # push + 0::size(8), + # join_ref_size + 2, + # topic_size + 5, + # event_size + 5, + "12", + "topic", + "event", + 101, + 102, + 103 + >> + + assert encode!(@serializer, %Phoenix.Socket.Message{ + join_ref: "12", + ref: nil, + topic: "topic", + event: "event", + payload: {:binary, <<101, 102, 103>>} + }) == push + end + + test "encode with oversized headers" do + assert_raise ArgumentError, ~r/unable to convert topic to binary/, fn -> + encode!(@serializer, %Phoenix.Socket.Message{ + join_ref: "12", + ref: nil, + topic: String.duplicate("t", 256), + event: "event", + payload: {:binary, <<101, 102, 103>>} + }) + end + + assert_raise ArgumentError, ~r/unable to convert event to binary/, fn -> + encode!(@serializer, %Phoenix.Socket.Message{ + join_ref: "12", + ref: nil, + topic: "topic", + event: String.duplicate("e", 256), + payload: {:binary, <<101, 102, 103>>} + }) + end + + assert_raise ArgumentError, ~r/unable to convert join_ref to binary/, fn -> + encode!(@serializer, %Phoenix.Socket.Message{ + join_ref: String.duplicate("j", 256), + ref: nil, + topic: "topic", + event: "event", + payload: {:binary, <<101, 102, 103>>} + }) + end + end + + test "reply" do + assert encode!(@serializer, %Phoenix.Socket.Reply{ + join_ref: "12", + ref: "123", + topic: "topic", + status: :ok, + payload: {:binary, <<101, 102, 103>>} + }) == @reply + end + + test "reply with oversized headers" do + assert_raise ArgumentError, ~r/unable to convert ref to binary/, fn -> + encode!(@serializer, %Phoenix.Socket.Reply{ + join_ref: "12", + ref: String.duplicate("r", 256), + topic: "topic", + status: :ok, + payload: {:binary, <<101, 102, 103>>} + }) + end + end + + test "fastlane binary Broadcast" do + assert fastlane!(@serializer, %Broadcast{ + topic: "topic", + event: "event", + payload: {:binary, <<101, 102, 103>>} + }) == @broadcast + end + + test "fastlane binary UserBroadcast" do + assert fastlane!(@serializer, %UserBroadcast{ + topic: "topic", + user_event: "user_event", + metadata: %{"replayed" => true}, + user_payload_encoding: :binary, + user_payload: <<101, 102, 103>> + }) == @binary_user_broadcast + end + + test "fastlane binary UserBroadcast no metadata" do + assert fastlane!(@serializer, %UserBroadcast{ + topic: "topic", + user_event: "user_event", + metadata: nil, + user_payload_encoding: :binary, + user_payload: <<101, 102, 103>> + }) == @binary_user_broadcast_no_metadata + end + + test "fastlane json UserBroadcast" do + assert fastlane!(@serializer, %UserBroadcast{ + topic: "topic", + user_event: "user_event", + metadata: %{"replayed" => true}, + user_payload_encoding: :json, + user_payload: "{\"a\":\"b\"}" + }) == @json_user_broadcast + end + + test "fastlane json UserBroadcast no metadata" do + assert fastlane!(@serializer, %UserBroadcast{ + topic: "topic", + user_event: "user_event", + user_payload_encoding: :json, + user_payload: "{\"a\":\"b\"}" + }) == @json_user_broadcast_no_metadata + end + + test "fastlane with oversized headers" do + assert_raise ArgumentError, ~r/unable to convert topic to binary/, fn -> + fastlane!(@serializer, %Broadcast{ + topic: String.duplicate("t", 256), + event: "event", + payload: {:binary, <<101, 102, 103>>} + }) + end + + assert_raise ArgumentError, ~r/unable to convert event to binary/, fn -> + fastlane!(@serializer, %Broadcast{ + topic: "topic", + event: String.duplicate("e", 256), + payload: {:binary, <<101, 102, 103>>} + }) + end + + assert_raise ArgumentError, ~r/unable to convert topic to binary/, fn -> + fastlane!(@serializer, %UserBroadcast{ + topic: String.duplicate("t", 256), + user_event: "user_event", + user_payload_encoding: :json, + user_payload: "{\"a\":\"b\"}" + }) + end + + assert_raise ArgumentError, ~r/unable to convert user_event to binary/, fn -> + fastlane!(@serializer, %UserBroadcast{ + topic: "topic", + user_event: String.duplicate("e", 256), + user_payload_encoding: :json, + user_payload: "{\"a\":\"b\"}" + }) + end + + assert_raise ArgumentError, ~r/unable to convert metadata to binary/, fn -> + fastlane!(@serializer, %UserBroadcast{ + topic: "topic", + user_event: "user_event", + metadata: %{k: String.duplicate("e", 256)}, + user_payload_encoding: :json, + user_payload: "{\"a\":\"b\"}" + }) + end + end + end + + describe "binary decode" do + test "pushed message" do + assert decode!(@serializer, @client_push, opcode: :binary) == %Phoenix.Socket.Message{ + join_ref: "12", + ref: "123", + topic: "topic", + event: "event", + payload: {:binary, <<101, 102, 103>>} + } + end + + test "binary user pushed message" do + assert decode!(@serializer, @client_binary_user_broadcast_push, opcode: :binary) == %Phoenix.Socket.Message{ + join_ref: "12", + ref: "123", + topic: "topic", + event: "broadcast", + payload: {"user_event", :binary, <<101, 102, 103>>} + } + end + + test "json binary user pushed message" do + assert decode!(@serializer, @client_json_user_broadcast_push, opcode: :binary) == %Phoenix.Socket.Message{ + join_ref: "12", + ref: "123", + topic: "topic", + event: "broadcast", + payload: {"user_event", :json, "{\"a\":\"b\"}"} + } + end + end +end diff --git a/test/support/containers.ex b/test/support/containers.ex index 7ce0ec5d9..97456b95e 100644 --- a/test/support/containers.ex +++ b/test/support/containers.ex @@ -141,8 +141,6 @@ defmodule Containers do Postgrex.query!(db_conn, "CREATE SCHEMA IF NOT EXISTS realtime", []) end) - Process.exit(conn, :normal) - RateCounter.stop(tenant.external_id) # Automatically checkin the container at the end of the test @@ -203,6 +201,8 @@ defmodule Containers do tenant end + GenServer.stop(conn) + tenant else _ -> {:error, "failed to checkout a container"} diff --git a/test/support/generators.ex b/test/support/generators.ex index 768e3823b..481944772 100644 --- a/test/support/generators.ex +++ b/test/support/generators.ex @@ -283,25 +283,28 @@ defmodule Generators do jwt end - @port 4003 - @serializer Phoenix.Socket.V1.JSONSerializer - - def get_connection( - tenant, - role \\ "anon", - claims \\ %{}, - params \\ %{vsn: "1.0.0", log_level: :warning} - ) do + # default test port + @port 4002 + + def get_connection(tenant, serializer \\ Phoenix.Socket.V1.JSONSerializer, opts \\ []) do + params = Keyword.get(opts, :params, %{log_level: :warning}) + claims = Keyword.get(opts, :claims, %{}) + role = Keyword.get(opts, :role, "anon") + params = Enum.reduce(params, "", fn {k, v}, acc -> "#{acc}&#{k}=#{v}" end) - uri = "#{uri(tenant)}?#{params}" + uri = "#{uri(tenant, serializer)}&#{params}" with {:ok, token} <- token_valid(tenant, role, claims), - {:ok, socket} <- WebsocketClient.connect(self(), uri, @serializer, [{"x-api-key", token}]) do + {:ok, socket} <- WebsocketClient.connect(self(), uri, serializer, [{"x-api-key", token}]) do {socket, token} end end - def uri(tenant, port \\ @port), do: "ws://#{tenant.external_id}.localhost:#{port}/socket/websocket" + def uri(tenant, serializer, port \\ @port), + do: "ws://#{tenant.external_id}.localhost:#{port}/socket/websocket?vsn=#{vsn(serializer)}" + + defp vsn(Phoenix.Socket.V1.JSONSerializer), do: "1.0.0" + defp vsn(RealtimeWeb.Socket.V2Serializer), do: "2.0.0" @spec token_valid(Tenant.t(), binary(), map()) :: {:ok, binary()} def token_valid(tenant, role, claims \\ %{}), do: generate_token(tenant, Map.put(claims, :role, role)) diff --git a/test/support/test_endpoint.ex b/test/support/test_endpoint.ex deleted file mode 100644 index 67c477153..000000000 --- a/test/support/test_endpoint.ex +++ /dev/null @@ -1,26 +0,0 @@ -defmodule TestEndpoint do - use Phoenix.Endpoint, otp_app: :phoenix - - @session_config store: :cookie, - key: "_hello_key", - signing_salt: "change_me" - - socket("/socket", RealtimeWeb.UserSocket, - websocket: [ - connect_info: [:peer_data, :uri, :x_headers], - fullsweep_after: 20, - max_frame_size: 8_000_000 - ] - ) - - plug(Plug.Session, @session_config) - plug(:fetch_session) - plug(Plug.CSRFProtection) - plug(:put_session) - - defp put_session(conn, _) do - conn - |> put_session(:from_session, "123") - |> send_resp(200, Plug.CSRFProtection.get_csrf_token()) - end -end