diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml
new file mode 100644
index 000000000..b27a4e9f3
--- /dev/null
+++ b/.github/workflows/lint.yml
@@ -0,0 +1,78 @@
+name: Lint
+on:
+ pull_request:
+ paths:
+ - "lib/**"
+ - "test/**"
+ - "config/**"
+ - "priv/**"
+ - "assets/**"
+ - "rel/**"
+ - "mix.exs"
+ - "Dockerfile"
+ - "run.sh"
+
+ push:
+ branches:
+ - main
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ tests:
+ name: Lint
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup elixir
+ id: beam
+ uses: erlef/setup-beam@v1
+ with:
+ otp-version: 27.x # Define the OTP version [required]
+ elixir-version: 1.17.x # Define the elixir version [required]
+ - name: Cache Mix
+ uses: actions/cache@v4
+ with:
+ path: |
+ deps
+ _build
+ key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
+ restore-keys: |
+ ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-
+
+ - name: Install dependencies
+ run: mix deps.get
+ - name: Set up Postgres
+ run: docker compose -f docker-compose.dbs.yml up -d
+ - name: Run main database migrations
+ run: mix ecto.migrate --log-migrator-sql
+ - name: Run database tenant migrations
+ run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
+ - name: Run format check
+ run: mix format --check-formatted
+ - name: Credo checks
+ run: mix credo
+ - name: Run hex audit
+ run: mix hex.audit
+ - name: Run mix_audit
+ run: mix deps.audit
+ - name: Run sobelow
+ run: mix sobelow --config .sobelow-conf
+ - name: Retrieve PLT Cache
+ uses: actions/cache@v4
+ id: plt-cache
+ with:
+ path: priv/plts
+ key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
+ - name: Create PLTs
+ if: steps.plt-cache.outputs.cache-hit != 'true'
+ run: |
+ mkdir -p priv/plts
+ mix dialyzer.build
+ - name: Run dialyzer
+ run: mix dialyzer
+ - name: Run dev seeds
+ run: DB_ENC_KEY="1234567890123456" mix ecto.setup
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 5d3818814..45d27634a 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -16,6 +16,13 @@ on:
branches:
- main
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
+ cancel-in-progress: true
+
+env:
+ MIX_ENV: test
+
jobs:
tests:
name: Tests
@@ -32,44 +39,19 @@ jobs:
- name: Cache Mix
uses: actions/cache@v4
with:
- path: deps
- key: ${{ runner.os }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
+ path: |
+ deps
+ _build
+ key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
- ${{ runner.os }}-mix-
+ ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-
+ - name: Pull postgres image quietly in background (used by test/support/containers.ex)
+ run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 &
- name: Install dependencies
run: mix deps.get
- name: Set up Postgres
run: docker compose -f docker-compose.dbs.yml up -d
- - name: Run main database migrations
- run: mix ecto.migrate --log-migrator-sql
- - name: Run database tenant migrations
- run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
- - name: Run format check
- run: mix format --check-formatted
- - name: Credo checks
- run: mix credo
- - name: Run hex audit
- run: mix hex.audit
- - name: Run mix_audit
- run: mix deps.audit
- - name: Run sobelow
- run: mix sobelow --config .sobelow-conf
- - name: Retrieve PLT Cache
- uses: actions/cache@v4
- id: plt-cache
- with:
- path: priv/plts
- key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- - name: Create PLTs
- if: steps.plt-cache.outputs.cache-hit != 'true'
- run: |
- mkdir -p priv/plts
- mix dialyzer.build
- - name: Run dialyzer
- run: mix dialyzer
- - name: Run dev seeds
- run: DB_ENC_KEY="1234567890123456" mix ecto.setup
- name: Start epmd
run: epmd -daemon
- name: Run tests
diff --git a/Makefile b/Makefile
index fd7f0f7fd..1259a1335 100644
--- a/Makefile
+++ b/Makefile
@@ -9,10 +9,10 @@ PORT ?= 4000
# Common commands
dev: ## Start a dev server
- ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server
+ ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=us-east-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server
dev.orange: ## Start another dev server (orange) on port 4001
- ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server
+ ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=eu-west-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server
seed: ## Seed the database
DB_ENC_KEY="1234567890123456" FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 mix run priv/repo/dev_seeds.exs
diff --git a/config/dev.exs b/config/dev.exs
index a438f8ea4..0eff300d8 100644
--- a/config/dev.exs
+++ b/config/dev.exs
@@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime
# Disable caching to ensure the rendered spec is refreshed
config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache
-config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
+# Disabled but can print to stdout with:
+# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
+config :opentelemetry, traces_exporter: :none
config :mix_test_watch, clear: true
diff --git a/config/runtime.exs b/config/runtime.exs
index 47961f98a..f09d22846 100644
--- a/config/runtime.exs
+++ b/config/runtime.exs
@@ -68,8 +68,9 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
-pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
+pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
+users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
no_channel_timeout_in_ms =
if config_env() == :test,
@@ -126,7 +127,8 @@ config :realtime,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform,
pubsub_adapter: pubsub_adapter,
- broadcast_pool_size: broadcast_pool_size
+ broadcast_pool_size: broadcast_pool_size,
+ users_scope_shards: users_scope_shards
if config_env() != :test && run_janitor? do
config :realtime,
diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex
index 65f4a33f1..34697572c 100644
--- a/lib/extensions/postgres_cdc_rls/replication_poller.ex
+++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex
@@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
alias Realtime.Adapters.Changes.NewRecord
alias Realtime.Adapters.Changes.UpdatedRecord
alias Realtime.Database
+ alias Realtime.RateCounter
+ alias Realtime.Tenants
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
@@ -26,6 +28,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
tenant_id = args["id"]
Logger.metadata(external_id: tenant_id, project: tenant_id)
+ %Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)
+
+ rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)
+
+ RateCounter.new(rate_counter_args)
+
state = %{
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
db_host: args["db_host"],
@@ -41,7 +49,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
retry_ref: nil,
retry_count: 0,
slot_name: args["slot_name"] <> slot_name_suffix(),
- tenant_id: tenant_id
+ tenant_id: tenant_id,
+ rate_counter_args: rate_counter_args
}
{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
@@ -74,7 +83,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
max_record_bytes: max_record_bytes,
max_changes: max_changes,
conn: conn,
- tenant_id: tenant_id
+ tenant_id: tenant_id,
+ rate_counter_args: rate_counter_args
} = state
) do
cancel_timer(poll_ref)
@@ -84,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant_id)
- case handle_list_changes_result(list_changes, tenant_id) do
+ case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
{:ok, row_count} ->
Backoff.reset(backoff)
@@ -177,20 +187,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
rows: [_ | _] = rows,
num_rows: rows_count
}},
- tenant_id
+ tenant_id,
+ rate_counter_args
) do
- for row <- rows,
- change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
- topic = "realtime:postgres:" <> tenant_id
+ case RateCounter.get(rate_counter_args) do
+ {:ok, %{limit: %{triggered: true}}} ->
+ :ok
- RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher)
+ _ ->
+ Realtime.GenCounter.add(rate_counter_args.id, rows_count)
+
+ for row <- rows,
+ change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
+ topic = "realtime:postgres:" <> tenant_id
+
+ RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
+ end
end
{:ok, rows_count}
end
- defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
- defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}
+ defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
+ defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}
def generate_record([
{"wal",
diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex
index 99096edfb..45cc0271e 100644
--- a/lib/realtime/application.ex
+++ b/lib/realtime/application.ex
@@ -46,8 +46,7 @@ defmodule Realtime.Application do
Realtime.PromEx.set_metrics_tags()
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
:syn.set_event_handler(Realtime.SynHandler)
-
- :ok = :syn.add_node_to_scopes([:users, RegionNodes, Realtime.Tenants.Connect])
+ :ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])
region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
diff --git a/lib/realtime/gen_rpc/pub_sub.ex b/lib/realtime/gen_rpc/pub_sub.ex
index b2a90b165..3ba9e053a 100644
--- a/lib/realtime/gen_rpc/pub_sub.ex
+++ b/lib/realtime/gen_rpc/pub_sub.ex
@@ -65,7 +65,11 @@ defmodule Realtime.GenRpcPubSub.Worker do
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)
@impl true
- def init(pubsub), do: {:ok, pubsub}
+ def init(pubsub) do
+ Process.flag(:message_queue_data, :off_heap)
+ Process.flag(:fullsweep_after, 100)
+ {:ok, pubsub}
+ end
@impl true
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
diff --git a/lib/realtime/monitoring/latency.ex b/lib/realtime/monitoring/latency.ex
index 52c46adb4..d9ddd0d9a 100644
--- a/lib/realtime/monitoring/latency.ex
+++ b/lib/realtime/monitoring/latency.ex
@@ -7,7 +7,7 @@ defmodule Realtime.Latency do
use Realtime.Logs
alias Realtime.Nodes
- alias Realtime.Rpc
+ alias Realtime.GenRpc
defmodule Payload do
@moduledoc false
@@ -33,7 +33,7 @@ defmodule Realtime.Latency do
}
end
- @every 5_000
+ @every 15_000
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
@@ -76,7 +76,7 @@ defmodule Realtime.Latency do
Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
{latency, response} =
:timer.tc(fn ->
- Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
+ GenRpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
end)
latency_ms = latency / 1_000
@@ -85,7 +85,7 @@ defmodule Realtime.Latency do
from_node = Nodes.short_node_id_from_name(Node.self())
case response do
- {:badrpc, reason} ->
+ {:error, :rpc_error, reason} ->
log_error(
"RealtimeNodeDisconnected",
"Unable to connect to #{short_name} from #{region}: #{reason}"
diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex
index 1bd324624..a3019a68a 100644
--- a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex
+++ b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex
@@ -36,10 +36,10 @@ defmodule Realtime.PromEx.Plugins.Tenant do
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Tenant payload size",
- tags: [:tenant],
+ tags: [:tenant, :message_type],
unit: :byte,
reporter_options: [
- buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
+ buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
),
distribution(
@@ -47,9 +47,10 @@ defmodule Realtime.PromEx.Plugins.Tenant do
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Payload size",
+ tags: [:message_type],
unit: :byte,
reporter_options: [
- buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
+ buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
)
]
@@ -157,6 +158,12 @@ defmodule Realtime.PromEx.Plugins.Tenant do
description: "Sum of messages sent on a Realtime Channel.",
tags: [:tenant]
),
+ sum(
+ [:realtime, :channel, :global, :events],
+ event_name: [:realtime, :rate_counter, :channel, :events],
+ measurement: :sum,
+ description: "Global sum of messages sent on a Realtime Channel."
+ ),
sum(
[:realtime, :channel, :presence_events],
event_name: [:realtime, :rate_counter, :channel, :presence_events],
@@ -164,6 +171,12 @@ defmodule Realtime.PromEx.Plugins.Tenant do
description: "Sum of presence messages sent on a Realtime Channel.",
tags: [:tenant]
),
+ sum(
+ [:realtime, :channel, :global, :presence_events],
+ event_name: [:realtime, :rate_counter, :channel, :presence_events],
+ measurement: :sum,
+ description: "Global sum of presence messages sent on a Realtime Channel."
+ ),
sum(
[:realtime, :channel, :db_events],
event_name: [:realtime, :rate_counter, :channel, :db_events],
@@ -171,6 +184,12 @@ defmodule Realtime.PromEx.Plugins.Tenant do
description: "Sum of db messages sent on a Realtime Channel.",
tags: [:tenant]
),
+ sum(
+ [:realtime, :channel, :global, :db_events],
+ event_name: [:realtime, :rate_counter, :channel, :db_events],
+ measurement: :sum,
+ description: "Global sum of db messages sent on a Realtime Channel."
+ ),
sum(
[:realtime, :channel, :joins],
event_name: [:realtime, :rate_counter, :channel, :joins],
diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex
index 0035e9594..e8106df58 100644
--- a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex
+++ b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex
@@ -21,6 +21,15 @@ defmodule Realtime.PromEx.Plugins.Tenants do
unit: {:microsecond, :millisecond},
tags: [:success, :tenant, :mechanism],
reporter_options: [buckets: [10, 250, 5000, 15_000]]
+ ),
+ distribution(
+ [:realtime, :global, :rpc],
+ event_name: [:realtime, :rpc],
+ description: "Global Latency of rpc calls",
+ measurement: :latency,
+ unit: {:microsecond, :millisecond},
+ tags: [:success, :mechanism],
+ reporter_options: [buckets: [10, 250, 5000, 15_000]]
)
])
end
diff --git a/lib/realtime/nodes.ex b/lib/realtime/nodes.ex
index ae237eb5f..34c9f3cfb 100644
--- a/lib/realtime/nodes.ex
+++ b/lib/realtime/nodes.ex
@@ -105,7 +105,7 @@ defmodule Realtime.Nodes do
iex> node = :"pink@127.0.0.1"
iex> Realtime.Helpers.short_node_id_from_name(node)
- "127.0.0.1"
+ "pink@127.0.0.1"
iex> node = :"pink@10.0.1.1"
iex> Realtime.Helpers.short_node_id_from_name(node)
@@ -124,6 +124,9 @@ defmodule Realtime.Nodes do
[_, _, _, _, _, one, two, _] ->
one <> two
+ ["127.0.0.1"] ->
+ Atom.to_string(name)
+
_other ->
host
end
diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex
index db2a02cc4..9e53e18f1 100644
--- a/lib/realtime/tenants.ex
+++ b/lib/realtime/tenants.ex
@@ -21,7 +21,8 @@ defmodule Realtime.Tenants do
"""
@spec list_connected_tenants(atom()) :: [String.t()]
def list_connected_tenants(node) do
- :syn.group_names(:users, node)
+ UsersCounter.scopes()
+ |> Enum.flat_map(fn scope -> :syn.group_names(scope, node) end)
end
@doc """
@@ -247,6 +248,31 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
end
+ @doc "RateCounter arguments for counting database events per second with a limit."
+ @spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
+ def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do
+ opts = [
+ telemetry: %{
+ event_name: [:channel, :db_events],
+ measurements: %{},
+ metadata: %{tenant: tenant_id}
+ },
+ limit: [
+ value: max_events_per_second,
+ measurement: :avg,
+ log: true,
+ log_fn: fn ->
+ Logger.error("MessagePerSecondRateLimitReached: Too many postgres changes messages per second",
+ external_id: tenant_id,
+ project: tenant_id
+ )
+ end
+ ]
+ ]
+
+ %RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
+ end
+
@doc """
The GenCounter key to use when counting events for RealtimeChannel events.
iex> Realtime.Tenants.db_events_per_second_key("tenant_id")
@@ -328,18 +354,18 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
end
- @connect_per_second_default 10
+ @connect_errors_per_second_default 10
@doc "RateCounter arguments for counting connect per second."
- @spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
- def connect_per_second_rate(%Tenant{external_id: external_id}) do
- connect_per_second_rate(external_id)
+ @spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
+ def connect_errors_per_second_rate(%Tenant{external_id: external_id}) do
+ connect_errors_per_second_rate(external_id)
end
- def connect_per_second_rate(tenant_id) do
+ def connect_errors_per_second_rate(tenant_id) do
opts = [
- max_bucket_len: 10,
+ max_bucket_len: 30,
limit: [
- value: @connect_per_second_default,
+ value: @connect_errors_per_second_default,
measurement: :sum,
log_fn: fn ->
Logger.critical(
diff --git a/lib/realtime/tenants/batch_broadcast.ex b/lib/realtime/tenants/batch_broadcast.ex
index 98427621b..9e4ed4c3c 100644
--- a/lib/realtime/tenants/batch_broadcast.ex
+++ b/lib/realtime/tenants/batch_broadcast.ex
@@ -129,7 +129,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}
GenCounter.add(events_per_second_rate.id)
- TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
+
+ TenantBroadcaster.pubsub_broadcast(
+ tenant.external_id,
+ tenant_topic,
+ broadcast,
+ RealtimeChannel.MessageDispatcher,
+ :broadcast
+ )
end
defp permissions_for_message(_, nil, _), do: nil
diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex
index 0ee43f161..caf49cc57 100644
--- a/lib/realtime/tenants/connect.ex
+++ b/lib/realtime/tenants/connect.ex
@@ -57,7 +57,7 @@ defmodule Realtime.Tenants.Connect do
| {:error, :connect_rate_limit_reached}
| {:error, :rpc_error, term()}
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
- rate_args = Tenants.connect_per_second_rate(tenant_id)
+ rate_args = Tenants.connect_errors_per_second_rate(tenant_id)
RateCounter.new(rate_args)
with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args),
@@ -68,8 +68,14 @@ defmodule Realtime.Tenants.Connect do
{:error, :connect_rate_limit_reached}
{:error, :tenant_database_connection_initializing} ->
- GenCounter.add(rate_args.id)
- call_external_node(tenant_id, opts)
+ case call_external_node(tenant_id, opts) do
+ {:ok, pid} ->
+ {:ok, pid}
+
+ error ->
+ GenCounter.add(rate_args.id)
+ error
+ end
{:error, :initializing} ->
{:error, :tenant_database_unavailable}
diff --git a/lib/realtime/user_counter.ex b/lib/realtime/user_counter.ex
index 6190030d9..9ea38c780 100644
--- a/lib/realtime/user_counter.ex
+++ b/lib/realtime/user_counter.ex
@@ -8,17 +8,32 @@ defmodule Realtime.UsersCounter do
Adds a RealtimeChannel pid to the `:users` scope for a tenant so we can keep track of all connected clients for a tenant.
"""
@spec add(pid(), String.t()) :: :ok
- def add(pid, tenant), do: :syn.join(:users, tenant, pid)
+ def add(pid, tenant_id), do: tenant_id |> scope() |> :syn.join(tenant_id, pid)
@doc """
Returns the count of all connected clients for a tenant for the cluster.
"""
@spec tenant_users(String.t()) :: non_neg_integer()
- def tenant_users(tenant), do: :syn.member_count(:users, tenant)
+ def tenant_users(tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id)
@doc """
Returns the count of all connected clients for a tenant for a single node.
"""
@spec tenant_users(atom, String.t()) :: non_neg_integer()
- def tenant_users(node_name, tenant), do: :syn.member_count(:users, tenant, node_name)
+ def tenant_users(node_name, tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id, node_name)
+
+ @doc """
+ Returns the scope for a given tenant id.
+ """
+ @spec scope(String.t()) :: atom()
+ def scope(tenant_id) do
+ shards = Application.get_env(:realtime, :users_scope_shards)
+ shard = :erlang.phash2(tenant_id, shards)
+ :"users_#{shard}"
+ end
+
+ def scopes() do
+ shards = Application.get_env(:realtime, :users_scope_shards)
+ Enum.map(0..(shards - 1), fn shard -> :"users_#{shard}" end)
+ end
end
diff --git a/lib/realtime_web/channels/payloads/config.ex b/lib/realtime_web/channels/payloads/config.ex
index 923020174..029aa93b5 100644
--- a/lib/realtime_web/channels/payloads/config.ex
+++ b/lib/realtime_web/channels/payloads/config.ex
@@ -17,6 +17,14 @@ defmodule RealtimeWeb.Channels.Payloads.Config do
end
def changeset(config, attrs) do
+ attrs =
+ attrs
+ |> Enum.map(fn
+ {k, v} when is_list(v) -> {k, Enum.filter(v, fn v -> v != nil end)}
+ {k, v} -> {k, v}
+ end)
+ |> Map.new()
+
config
|> cast(attrs, [:private], message: &Join.error_message/2)
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")
diff --git a/lib/realtime_web/channels/payloads/presence.ex b/lib/realtime_web/channels/payloads/presence.ex
index 53e09047d..785df9222 100644
--- a/lib/realtime_web/channels/payloads/presence.ex
+++ b/lib/realtime_web/channels/payloads/presence.ex
@@ -8,7 +8,7 @@ defmodule RealtimeWeb.Channels.Payloads.Presence do
embedded_schema do
field :enabled, :boolean, default: true
- field :key, :string, default: UUID.uuid1()
+ field :key, :any, default: UUID.uuid1(), virtual: true
end
def changeset(presence, attrs) do
diff --git a/lib/realtime_web/channels/presence.ex b/lib/realtime_web/channels/presence.ex
index f4d378b92..9e173febe 100644
--- a/lib/realtime_web/channels/presence.ex
+++ b/lib/realtime_web/channels/presence.ex
@@ -8,5 +8,6 @@ defmodule RealtimeWeb.Presence do
use Phoenix.Presence,
otp_app: :realtime,
pubsub_server: Realtime.PubSub,
+ dispatcher: RealtimeWeb.RealtimeChannel.MessageDispatcher,
pool_size: 10
end
diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex
index 91a417c21..104d9a077 100644
--- a/lib/realtime_web/channels/realtime_channel.ex
+++ b/lib/realtime_web/channels/realtime_channel.ex
@@ -18,7 +18,6 @@ defmodule RealtimeWeb.RealtimeChannel do
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
- alias Realtime.Tenants.Authorization.Policies.PresencePolicies
alias Realtime.Tenants.Connect
alias RealtimeWeb.Channels.Payloads.Join
@@ -259,27 +258,11 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end
- def handle_info(
- %{event: "presence_diff"},
- %{assigns: %{policies: %Policies{presence: %PresencePolicies{read: false}}}} = socket
- ) do
- Logger.warning("Presence message ignored")
- {:noreply, socket}
- end
-
def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
Logger.warning("Broadcast message ignored")
{:noreply, socket}
end
- def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do
- %{presence_rate_counter: presence_rate_counter} = socket.assigns
- GenCounter.add(presence_rate_counter.id)
- maybe_log_info(socket, msg)
- push(socket, "presence_diff", payload)
- {:noreply, socket}
- end
-
def handle_info(%{event: type, payload: payload} = msg, socket) do
count(socket)
maybe_log_info(socket, msg)
diff --git a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
index f8e736c2e..036ad9159 100644
--- a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
+++ b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
@@ -76,14 +76,21 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}
if self_broadcast do
- TenantBroadcaster.pubsub_broadcast(tenant_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
+ TenantBroadcaster.pubsub_broadcast(
+ tenant_id,
+ tenant_topic,
+ broadcast,
+ RealtimeChannel.MessageDispatcher,
+ :broadcast
+ )
else
TenantBroadcaster.pubsub_broadcast_from(
tenant_id,
self(),
tenant_topic,
broadcast,
- RealtimeChannel.MessageDispatcher
+ RealtimeChannel.MessageDispatcher,
+ :broadcast
)
end
end
diff --git a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex
index 32e1528f3..6604eb2bd 100644
--- a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex
+++ b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex
@@ -5,14 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
require Logger
- def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new())
-
- def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id, replayed_message_ids) do
- {:rc_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}, replayed_message_ids}
- end
-
- def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id, replayed_message_ids) do
- {:rc_fastlane, fastlane_pid, serializer, topic, replayed_message_ids}
+ def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do
+ {:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids}
end
@doc """
@@ -20,48 +14,58 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
It also sends an :update_rate_counter to the subscriber and it can conditionally log
"""
@spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok
- def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{} = msg) do
+ def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do
# fastlane_pid is the actual socket transport pid
# This reduce caches the serialization and bypasses the channel process going straight to the
# transport process
message_id = message_id(msg.payload)
- # Credo doesn't like that we don't use the result aggregation
- _ =
- Enum.reduce(subscribers, %{}, fn
- {pid, _}, cache when pid == from ->
- cache
+ {_cache, count} =
+ Enum.reduce(subscribers, {%{}, 0}, fn
+ {pid, _}, {cache, count} when pid == from ->
+ {cache, count}
- {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, replayed_message_ids}}, cache ->
+ {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}},
+ {cache, count} ->
if already_replayed?(message_id, replayed_message_ids) do
# skip already replayed message
- cache
+ {cache, count}
else
- send(pid, :update_rate_counter)
- do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
- end
+ if event != "presence_diff", do: send(pid, :update_rate_counter)
- {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}, replayed_message_ids}}, cache ->
- if already_replayed?(message_id, replayed_message_ids) do
- # skip already replayed message
- cache
- else
- send(pid, :update_rate_counter)
- log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
- Logger.info(log, external_id: tenant_id, project: tenant_id)
+ maybe_log(log_level, join_topic, msg, tenant_id)
- do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
+ cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
+ {cache, count + 1}
end
- {pid, _}, cache ->
+ {pid, _}, {cache, count} ->
send(pid, msg)
- cache
+ {cache, count}
end)
+ tenant_id = tenant_id(subscribers)
+ increment_presence_counter(tenant_id, event, count)
+
:ok
end
+ defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do
+ tenant_id
+ |> Realtime.Tenants.presence_events_per_second_key()
+ |> Realtime.GenCounter.add(count)
+ end
+
+ defp increment_presence_counter(_tenant_id, _event, _count), do: :ok
+
+ defp maybe_log(:info, join_topic, msg, tenant_id) do
+ log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
+ Logger.info(log, external_id: tenant_id, project: tenant_id)
+ end
+
+ defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok
+
defp message_id(%{"meta" => %{"id" => id}}), do: id
defp message_id(_), do: nil
@@ -82,4 +86,10 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
Map.put(cache, serializer, encoded_msg)
end
end
+
+ defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do
+ tenant_id
+ end
+
+ defp tenant_id(_), do: nil
end
diff --git a/lib/realtime_web/channels/realtime_channel/presence_handler.ex b/lib/realtime_web/channels/realtime_channel/presence_handler.ex
index 9dc23d219..ec16c7b16 100644
--- a/lib/realtime_web/channels/realtime_channel/presence_handler.ex
+++ b/lib/realtime_web/channels/realtime_channel/presence_handler.ex
@@ -11,7 +11,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
alias Phoenix.Tracker.Shard
alias Realtime.GenCounter
alias Realtime.RateCounter
- alias Realtime.Tenants
+ # alias Realtime.Tenants
alias Realtime.Tenants.Authorization
alias RealtimeWeb.Presence
alias RealtimeWeb.RealtimeChannel.Logging
@@ -109,6 +109,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
%{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket
payload = Map.get(payload, "payload", %{})
+ RealtimeWeb.TenantBroadcaster.collect_payload_size(socket.assigns.tenant, payload, :presence)
with :ok <- limit_presence_event(socket),
{:ok, _} <- Presence.track(self(), tenant_topic, presence_key, payload) do
@@ -138,13 +139,14 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
|> Phoenix.Presence.group()
end
+ @presence_limit 100
defp limit_presence_event(socket) do
- %{assigns: %{presence_rate_counter: presence_counter, tenant: tenant_id}} = socket
+ %{assigns: %{presence_rate_counter: presence_counter, tenant: _tenant_id}} = socket
{:ok, rate_counter} = RateCounter.get(presence_counter)
- tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id)
+ # tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id)
- if rate_counter.avg > tenant.max_presence_events_per_second do
+ if rate_counter.avg > @presence_limit do
{:error, :rate_limit_exceeded}
else
GenCounter.add(presence_counter.id)
diff --git a/lib/realtime_web/channels/tenant_rate_limiters.ex b/lib/realtime_web/channels/tenant_rate_limiters.ex
new file mode 100644
index 000000000..2101ac945
--- /dev/null
+++ b/lib/realtime_web/channels/tenant_rate_limiters.ex
@@ -0,0 +1,43 @@
+defmodule RealtimeWeb.TenantRateLimiters do
+ @moduledoc """
+ Rate limiters for tenants.
+ """
+ require Logger
+ alias Realtime.UsersCounter
+ alias Realtime.Tenants
+ alias Realtime.RateCounter
+ alias Realtime.Api.Tenant
+
+ @spec check_tenant(Realtime.Api.Tenant.t()) :: :ok | {:error, :too_many_connections | :too_many_joins}
+ def check_tenant(tenant) do
+ with :ok <- max_concurrent_users_check(tenant) do
+ max_joins_per_second_check(tenant)
+ end
+ end
+
+ defp max_concurrent_users_check(%Tenant{max_concurrent_users: max_conn_users, external_id: external_id}) do
+ total_conn_users = UsersCounter.tenant_users(external_id)
+
+ if total_conn_users < max_conn_users,
+ do: :ok,
+ else: {:error, :too_many_connections}
+ end
+
+ defp max_joins_per_second_check(%Tenant{max_joins_per_second: max_joins_per_second} = tenant) do
+ rate_args = Tenants.joins_per_second_rate(tenant.external_id, max_joins_per_second)
+
+ RateCounter.new(rate_args)
+
+ case RateCounter.get(rate_args) do
+ {:ok, %{limit: %{triggered: false}}} ->
+ :ok
+
+ {:ok, %{limit: %{triggered: true}}} ->
+ {:error, :too_many_joins}
+
+ error ->
+ Logger.error("UnknownErrorOnCounter: #{inspect(error)}")
+ {:error, error}
+ end
+ end
+end
diff --git a/lib/realtime_web/channels/user_socket.ex b/lib/realtime_web/channels/user_socket.ex
index 849aa052d..6d4bf9017 100644
--- a/lib/realtime_web/channels/user_socket.ex
+++ b/lib/realtime_web/channels/user_socket.ex
@@ -16,6 +16,7 @@ defmodule RealtimeWeb.UserSocket do
alias Realtime.PostgresCdc
alias Realtime.Tenants
+ alias RealtimeWeb.TenantRateLimiters
alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.RealtimeChannel
alias RealtimeWeb.RealtimeChannel.Logging
@@ -56,6 +57,7 @@ defmodule RealtimeWeb.UserSocket do
token when is_binary(token) <- token,
jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
{:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks),
+ :ok <- TenantRateLimiters.check_tenant(tenant),
{:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do
%Tenant{
extensions: extensions,
@@ -111,6 +113,16 @@ defmodule RealtimeWeb.UserSocket do
log_error("MalformedJWT", "The token provided is not a valid JWT")
{:error, :token_malformed}
+ {:error, :too_many_connections} ->
+ msg = "Too many connected users"
+ Logging.log_error(socket, "ConnectionRateLimitReached", msg)
+ {:error, :too_many_connections}
+
+ {:error, :too_many_joins} ->
+ msg = "Too many joins per second"
+ Logging.log_error(socket, "JoinsRateLimitReached", msg)
+ {:error, :too_many_joins}
+
error ->
log_error("ErrorConnectingToWebsocket", error)
error
diff --git a/lib/realtime_web/endpoint.ex b/lib/realtime_web/endpoint.ex
index 190e1a917..894911803 100644
--- a/lib/realtime_web/endpoint.ex
+++ b/lib/realtime_web/endpoint.ex
@@ -15,7 +15,7 @@ defmodule RealtimeWeb.Endpoint do
websocket: [
connect_info: [:peer_data, :uri, :x_headers],
fullsweep_after: 20,
- max_frame_size: 8_000_000,
+ max_frame_size: 5_000_000,
# https://github.com/ninenines/cowboy/blob/24d32de931a0c985ff7939077463fc8be939f0e9/doc/src/manual/cowboy_websocket.asciidoc#L228
# active_n: The number of packets Cowboy will request from the socket at once.
# This can be used to tweak the performance of the server. Higher values reduce
diff --git a/lib/realtime_web/live/status_live/index.ex b/lib/realtime_web/live/status_live/index.ex
index 8a2d32054..f55eddfa5 100644
--- a/lib/realtime_web/live/status_live/index.ex
+++ b/lib/realtime_web/live/status_live/index.ex
@@ -3,11 +3,18 @@ defmodule RealtimeWeb.StatusLive.Index do
alias Realtime.Latency.Payload
alias Realtime.Nodes
+ alias RealtimeWeb.Endpoint
@impl true
def mount(_params, _session, socket) do
- if connected?(socket), do: RealtimeWeb.Endpoint.subscribe("admin:cluster")
- {:ok, assign(socket, pings: default_pings(), nodes: Enum.count(all_nodes()))}
+ if connected?(socket), do: Endpoint.subscribe("admin:cluster")
+
+ socket =
+ socket
+ |> assign(nodes: Enum.count(all_nodes()))
+ |> stream(:pings, default_pings())
+
+ {:ok, socket}
end
@impl true
@@ -17,17 +24,14 @@ defmodule RealtimeWeb.StatusLive.Index do
@impl true
def handle_info(%Phoenix.Socket.Broadcast{payload: %Payload{} = payload}, socket) do
- pair = payload.from_node <> "_" <> payload.node
- payload = %{pair => payload}
-
- pings = Map.merge(socket.assigns.pings, payload)
+ pair = pair_id(payload.from_node, payload.node)
- {:noreply, assign(socket, pings: pings)}
+ {:noreply, stream(socket, :pings, [%{id: pair, payload: payload}])}
end
defp apply_action(socket, :index, _params) do
socket
- |> assign(:page_title, "Status - Supabase Realtime")
+ |> assign(:page_title, "Realtime Status")
end
defp all_nodes do
@@ -35,9 +39,14 @@ defmodule RealtimeWeb.StatusLive.Index do
end
defp default_pings do
- for n <- all_nodes(), f <- all_nodes(), into: %{} do
- pair = n <> "_" <> f
- {pair, %Payload{from_node: f, latency: "Loading...", node: n, timestamp: "Loading..."}}
+ for n <- all_nodes(), f <- all_nodes() do
+ pair = pair_id(f, n)
+
+ %{id: pair, payload: %Payload{from_node: f, latency: "Loading...", node: n, timestamp: "Loading..."}}
end
end
+
+ defp pair_id(from, to) do
+ from <> "_" <> to
+ end
end
diff --git a/lib/realtime_web/live/status_live/index.html.heex b/lib/realtime_web/live/status_live/index.html.heex
index 645001714..63ea4fc0d 100644
--- a/lib/realtime_web/live/status_live/index.html.heex
+++ b/lib/realtime_web/live/status_live/index.html.heex
@@ -1,16 +1,16 @@
<.h1>Supabase Realtime: Multiplayer Edition
+
<.h2>Cluster Status
+
Understand the latency between nodes across the Realtime cluster.
-
- <%= for {_pair, p} <- @pings do %>
-
-
From: <%= p.from_region %> - <%= p.from_node %>
-
To: <%= p.region %> - <%= p.node %>
-
<%= p.latency %> ms
-
<%= p.timestamp %>
-
- <% end %>
+
+
+
From: <%= p.payload.from_region %> - <%= p.payload.from_node %>
+
To: <%= p.payload.region %> - <%= p.payload.node %>
+
<%= p.payload.latency %> ms
+
<%= p.payload.timestamp %>
+
diff --git a/lib/realtime_web/tenant_broadcaster.ex b/lib/realtime_web/tenant_broadcaster.ex
index da02df79e..f8b739a0b 100644
--- a/lib/realtime_web/tenant_broadcaster.ex
+++ b/lib/realtime_web/tenant_broadcaster.ex
@@ -5,9 +5,12 @@ defmodule RealtimeWeb.TenantBroadcaster do
alias Phoenix.PubSub
- @spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok
- def pubsub_broadcast(tenant_id, topic, message, dispatcher) do
- collect_payload_size(tenant_id, message)
+ @type message_type :: :broadcast | :presence | :postgres_changes
+
+ @spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher(), message_type) ::
+ :ok
+ def pubsub_broadcast(tenant_id, topic, message, dispatcher, message_type) do
+ collect_payload_size(tenant_id, message, message_type)
if pubsub_adapter() == :gen_rpc do
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
@@ -23,11 +26,12 @@ defmodule RealtimeWeb.TenantBroadcaster do
from :: pid,
PubSub.topic(),
PubSub.message(),
- PubSub.dispatcher()
+ PubSub.dispatcher(),
+ message_type
) ::
:ok
- def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do
- collect_payload_size(tenant_id, message)
+ def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher, message_type) do
+ collect_payload_size(tenant_id, message, message_type)
if pubsub_adapter() == :gen_rpc do
PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
@@ -45,16 +49,18 @@ defmodule RealtimeWeb.TenantBroadcaster do
@payload_size_event [:realtime, :tenants, :payload, :size]
- defp collect_payload_size(tenant_id, payload) when is_struct(payload) do
+ @spec collect_payload_size(tenant_id :: String.t(), payload :: term, message_type :: message_type) :: :ok
+ def collect_payload_size(tenant_id, payload, message_type) when is_struct(payload) do
# Extracting from struct so the __struct__ bit is not calculated as part of the payload
- collect_payload_size(tenant_id, Map.from_struct(payload))
+ collect_payload_size(tenant_id, Map.from_struct(payload), message_type)
end
- defp collect_payload_size(tenant_id, payload) do
- :telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{tenant: tenant_id})
+ def collect_payload_size(tenant_id, payload, message_type) do
+ :telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{
+ tenant: tenant_id,
+ message_type: message_type
+ })
end
- defp pubsub_adapter do
- Application.fetch_env!(:realtime, :pubsub_adapter)
- end
+ defp pubsub_adapter, do: Application.fetch_env!(:realtime, :pubsub_adapter)
end
diff --git a/mix.exs b/mix.exs
index 4b0b1f40c..e98ac608f 100644
--- a/mix.exs
+++ b/mix.exs
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
- version: "2.51.5",
+ version: "2.53.4",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
@@ -53,7 +53,7 @@ defmodule Realtime.MixProject do
# Type `mix help deps` for examples and options.
defp deps do
[
- {:phoenix, "~> 1.7.0"},
+ {:phoenix, override: true, github: "supabase/phoenix", branch: "feat/presence-custom-dispatcher-1.7.19"},
{:phoenix_ecto, "~> 4.4.0"},
{:ecto_sql, "~> 3.11"},
{:ecto_psql_extras, "~> 0.8"},
diff --git a/mix.lock b/mix.lock
index c5fce6022..ba6f47328 100644
--- a/mix.lock
+++ b/mix.lock
@@ -66,7 +66,7 @@
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"},
"opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"},
"otel_http": {:hex, :otel_http, "0.2.0", "b17385986c7f1b862f5d577f72614ecaa29de40392b7618869999326b9a61d8a", [:rebar3], [], "hexpm", "f2beadf922c8cfeb0965488dd736c95cc6ea8b9efce89466b3904d317d7cc717"},
- "phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"},
+ "phoenix": {:git, "https://github.com/supabase/phoenix.git", "7b884cc0cc1a49ad2bc272acda2e622b3e11c139", [branch: "feat/presence-custom-dispatcher-1.7.19"]},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"},
"phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"},
diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs
index 23b1a3a7f..8f4607b01 100644
--- a/test/integration/rt_channel_test.exs
+++ b/test/integration/rt_channel_test.exs
@@ -1159,7 +1159,7 @@ defmodule Realtime.Integration.RtChannelTest do
}
end)
- assert log =~ "ChannelShutdown: Token has expired 1000 seconds ago"
+ assert log =~ "ChannelShutdown: Token has expired"
end
test "ChannelShutdown include sub if available in jwt claims", %{tenant: tenant, topic: topic} do
@@ -2240,7 +2240,8 @@ defmodule Realtime.Integration.RtChannelTest do
# 0 events as no broadcast used
assert 2 = get_count([:realtime, :rate_counter, :channel, :joins], external_id)
assert 2 = get_count([:realtime, :rate_counter, :channel, :presence_events], external_id)
- assert 10 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
+ # 5 + 5 + 5 (5 for each websocket and 5 while publishing)
+ assert 15 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id)
end
@@ -2587,7 +2588,7 @@ defmodule Realtime.Integration.RtChannelTest do
Realtime.Tenants.Cache.invalidate_tenant_cache(external_id)
end
- defp assert_process_down(pid, timeout \\ 100) do
+ defp assert_process_down(pid, timeout \\ 300) do
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout
end
diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs
index 5f341c134..704346aab 100644
--- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs
+++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs
@@ -235,6 +235,25 @@ defmodule Realtime.Extensions.CdcRlsTest do
end)
RateCounter.stop(tenant.external_id)
+ on_exit(fn -> RateCounter.stop(tenant.external_id) end)
+
+ on_exit(fn -> :telemetry.detach(__MODULE__) end)
+
+ :telemetry.attach(
+ __MODULE__,
+ [:realtime, :tenants, :payload, :size],
+ &__MODULE__.handle_telemetry/4,
+ pid: self()
+ )
+
+ on_exit(fn -> :telemetry.detach(__MODULE__) end)
+
+ :telemetry.attach(
+ __MODULE__,
+ [:realtime, :tenants, :payload, :size],
+ &__MODULE__.handle_telemetry/4,
+ pid: self()
+ )
%{tenant: tenant, conn: conn}
end
@@ -315,8 +334,91 @@ defmodule Realtime.Extensions.CdcRlsTest do
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
- assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
- assert 1 in bucket
+ assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} =
+ RateCounter.get(rate)
+
+ # 1 from ReplicationPoller and 1 from MessageDispatcher
+ assert Enum.sum(bucket) == 2
+
+ assert_receive {
+ :telemetry,
+ [:realtime, :tenants, :payload, :size],
+ %{size: 341},
+ %{tenant: "dev_tenant", message_type: :postgres_changes}
+ }
+ end
+
+ test "rate limit works", %{tenant: tenant, conn: conn} do
+ on_exit(fn -> PostgresCdcRls.handle_stop(tenant.external_id, 10_000) end)
+
+ %Tenant{extensions: extensions, external_id: external_id} = tenant
+ postgres_extension = PostgresCdc.filter_settings("postgres_cdc_rls", extensions)
+ args = Map.put(postgres_extension, "id", external_id)
+
+ pg_change_params = [
+ %{
+ id: UUID.uuid1(),
+ params: %{"event" => "*", "schema" => "public"},
+ channel_pid: self(),
+ claims: %{
+ "exp" => System.system_time(:second) + 100_000,
+ "iat" => 0,
+ "ref" => "127.0.0.1",
+ "role" => "anon"
+ }
+ }
+ ]
+
+ ids =
+ Enum.map(pg_change_params, fn %{id: id, params: params} ->
+ {UUID.string_to_binary!(id), :erlang.phash2(params)}
+ end)
+
+ topic = "realtime:test"
+ serializer = Phoenix.Socket.V1.JSONSerializer
+
+ subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
+ metadata = [metadata: subscription_metadata]
+ :ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
+
+ # First time it will return nil
+ PostgresCdcRls.handle_connect(args)
+ # Wait for it to start
+ Process.sleep(3000)
+ {:ok, response} = PostgresCdcRls.handle_connect(args)
+
+ # Now subscribe to the Postgres Changes
+ {:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
+ assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
+
+ log =
+ capture_log(fn ->
+ # increment artifically the counter to reach the limit
+ tenant.external_id
+ |> Realtime.Tenants.db_events_per_second_key()
+ |> Realtime.GenCounter.add(100_000_000)
+
+ # Wait for RateCounter to update
+ Process.sleep(1500)
+ end)
+
+ assert log =~ "MessagePerSecondRateLimitReached: Too many postgres changes messages per second"
+
+ # Insert a record
+ %{rows: [[_id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", [])
+
+ refute_receive {:socket_push, :text, _}, 5000
+
+ # Wait for RateCounter to update
+ Process.sleep(2000)
+
+ rate = Realtime.Tenants.db_events_per_second_rate(tenant)
+
+ assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket, limit: %{triggered: true}}} =
+ RateCounter.get(rate)
+
+ # Nothing has changed
+ assert Enum.sum(bucket) == 100_000_000
end
@aux_mod (quote do
@@ -414,4 +516,6 @@ defmodule Realtime.Extensions.CdcRlsTest do
:erpc.call(node, PostgresCdcRls, :handle_stop, [tenant.external_id, 10_000])
end
end
+
+ def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata})
end
diff --git a/test/realtime/gen_rpc_pub_sub_test.exs b/test/realtime/gen_rpc_pub_sub_test.exs
index 0013c2e7b..517c6c369 100644
--- a/test/realtime/gen_rpc_pub_sub_test.exs
+++ b/test/realtime/gen_rpc_pub_sub_test.exs
@@ -1,2 +1,18 @@
Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []})
Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__)
+
+defmodule Realtime.GenRpcPubSubTest do
+ use ExUnit.Case, async: true
+
+ test "it sets off_heap message_queue_data flag on the workers" do
+ assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1
+ |> Process.whereis()
+ |> Process.info(:message_queue_data) == {:message_queue_data, :off_heap}
+ end
+
+ test "it sets fullsweep_after flag on the workers" do
+ assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1
+ |> Process.whereis()
+ |> Process.info(:fullsweep_after) == {:fullsweep_after, 100}
+ end
+end
diff --git a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs
index 164c8d2eb..77c1dc7cf 100644
--- a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs
+++ b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs
@@ -129,6 +129,17 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
assert metric_value(pattern) == metric_value + 1
end
+ test "global event exists after counter added", %{tenant: %{external_id: external_id}} do
+ pattern =
+ ~r/realtime_channel_global_events\s(?
\d+)/
+
+ metric_value = metric_value(pattern)
+ FakeUserCounter.fake_event(external_id)
+
+ Process.sleep(200)
+ assert metric_value(pattern) == metric_value + 1
+ end
+
test "db_event exists after counter added", %{tenant: %{external_id: external_id}} do
pattern =
~r/realtime_channel_db_events{tenant="#{external_id}"}\s(?\d+)/
@@ -139,6 +150,16 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
assert metric_value(pattern) == metric_value + 1
end
+ test "global db_event exists after counter added", %{tenant: %{external_id: external_id}} do
+ pattern =
+ ~r/realtime_channel_global_db_events\s(?\d+)/
+
+ metric_value = metric_value(pattern)
+ FakeUserCounter.fake_db_event(external_id)
+ Process.sleep(200)
+ assert metric_value(pattern) == metric_value + 1
+ end
+
test "presence_event exists after counter added", %{tenant: %{external_id: external_id}} do
pattern =
~r/realtime_channel_presence_events{tenant="#{external_id}"}\s(?\d+)/
@@ -149,6 +170,16 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
assert metric_value(pattern) == metric_value + 1
end
+ test "global presence_event exists after counter added", %{tenant: %{external_id: external_id}} do
+ pattern =
+ ~r/realtime_channel_global_presence_events\s(?\d+)/
+
+ metric_value = metric_value(pattern)
+ FakeUserCounter.fake_presence_event(external_id)
+ Process.sleep(200)
+ assert metric_value(pattern) == metric_value + 1
+ end
+
test "metric read_authorization_check exists after check", context do
pattern =
~r/realtime_tenants_read_authorization_check_count{tenant="#{context.tenant.external_id}"}\s(?\d+)/
@@ -231,18 +262,18 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
external_id = context.tenant.external_id
pattern =
- ~r/realtime_tenants_payload_size_count{tenant="#{external_id}"}\s(?\d+)/
+ ~r/realtime_tenants_payload_size_count{message_type=\"presence\",tenant="#{external_id}"}\s(?\d+)/
metric_value = metric_value(pattern)
message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
- RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub)
+ RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :presence)
Process.sleep(200)
assert metric_value(pattern) == metric_value + 1
bucket_pattern =
- ~r/realtime_tenants_payload_size_bucket{tenant="#{external_id}",le="100"}\s(?\d+)/
+ ~r/realtime_tenants_payload_size_bucket{message_type=\"presence\",tenant="#{external_id}",le="250"}\s(?\d+)/
assert metric_value(bucket_pattern) > 0
end
@@ -250,17 +281,17 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
test "global metric payload size", context do
external_id = context.tenant.external_id
- pattern = ~r/realtime_payload_size_count\s(?\d+)/
+ pattern = ~r/realtime_payload_size_count{message_type=\"broadcast\"}\s(?\d+)/
metric_value = metric_value(pattern)
message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
- RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub)
+ RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :broadcast)
Process.sleep(200)
assert metric_value(pattern) == metric_value + 1
- bucket_pattern = ~r/realtime_payload_size_bucket{le="100"}\s(?\d+)/
+ bucket_pattern = ~r/realtime_payload_size_bucket{message_type=\"broadcast\",le="250"}\s(?\d+)/
assert metric_value(bucket_pattern) > 0
end
diff --git a/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs b/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs
index 080fd3cfb..ded087c74 100644
--- a/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs
+++ b/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs
@@ -37,6 +37,16 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do
assert metric_value(pattern) == previous_value + 1
end
+ test "global success" do
+ pattern = ~r/realtime_global_rpc_count{mechanism=\"erpc\",success="true"}\s(?\d+)/
+ # Enough time for the poll rate to be triggered at least once
+ Process.sleep(200)
+ previous_value = metric_value(pattern)
+ assert {:ok, "success"} = Rpc.enhanced_call(node(), Test, :success, [], tenant_id: "123")
+ Process.sleep(200)
+ assert metric_value(pattern) == previous_value + 1
+ end
+
test "failure" do
pattern = ~r/realtime_rpc_count{mechanism=\"erpc\",success="false",tenant="123"}\s(?\d+)/
# Enough time for the poll rate to be triggered at least once
@@ -47,6 +57,16 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do
assert metric_value(pattern) == previous_value + 1
end
+ test "global failure" do
+ pattern = ~r/realtime_global_rpc_count{mechanism=\"erpc\",success="false"}\s(?\d+)/
+ # Enough time for the poll rate to be triggered at least once
+ Process.sleep(200)
+ previous_value = metric_value(pattern)
+ assert {:error, "failure"} = Rpc.enhanced_call(node(), Test, :failure, [], tenant_id: "123")
+ Process.sleep(200)
+ assert metric_value(pattern) == previous_value + 1
+ end
+
test "exception" do
pattern = ~r/realtime_rpc_count{mechanism=\"erpc\",success="false",tenant="123"}\s(?\d+)/
# Enough time for the poll rate to be triggered at least once
@@ -59,6 +79,19 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do
Process.sleep(200)
assert metric_value(pattern) == previous_value + 1
end
+
+ test "global exception" do
+ pattern = ~r/realtime_global_rpc_count{mechanism=\"erpc\",success="false"}\s(?\d+)/
+ # Enough time for the poll rate to be triggered at least once
+ Process.sleep(200)
+ previous_value = metric_value(pattern)
+
+ assert {:error, :rpc_error, %RuntimeError{message: "runtime error"}} =
+ Rpc.enhanced_call(node(), Test, :exception, [], tenant_id: "123")
+
+ Process.sleep(200)
+ assert metric_value(pattern) == previous_value + 1
+ end
end
test "event_metrics rpc" do
diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs
index a52973d53..804b3018f 100644
--- a/test/realtime/tenants/connect_test.exs
+++ b/test/realtime/tenants/connect_test.exs
@@ -51,6 +51,36 @@ defmodule Realtime.Tenants.ConnectTest do
end
describe "handle cold start" do
+ test "multiple processes connecting calling Connect.connect", %{tenant: tenant} do
+ parent = self()
+
+ # Let's slow down Connect.connect so that multiple RPC calls are executed
+ stub(Connect, :connect, fn x, y, z ->
+ :timer.sleep(1000)
+ call_original(Connect, :connect, [x, y, z])
+ end)
+
+ connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end
+ # Let's call enough times to potentially trigger the Connect RateCounter
+
+ for _ <- 1..50, do: spawn(connect)
+
+ assert_receive({:ok, pid}, 1100)
+
+ for _ <- 1..49, do: assert_receive({:ok, ^pid})
+
+ # Does not trigger rate limit as connections eventually succeeded
+
+ {:ok, rate_counter} =
+ tenant.external_id
+ |> Tenants.connect_errors_per_second_rate()
+ |> Realtime.RateCounter.get()
+
+ assert rate_counter.sum == 0
+ assert rate_counter.avg == 0.0
+ assert rate_counter.limit.triggered == false
+ end
+
test "multiple proccesses succeed together", %{tenant: tenant} do
parent = self()
@@ -298,9 +328,9 @@ defmodule Realtime.Tenants.ConnectTest do
region = Tenants.region(tenant)
assert {_pid, %{conn: ^db_conn, region: ^region}} = :syn.lookup(Connect, external_id)
Process.sleep(1000)
- :syn.leave(:users, external_id, self())
+ external_id |> UsersCounter.scope() |> :syn.leave(external_id, self())
Process.sleep(1000)
- assert :undefined = :syn.lookup(Connect, external_id)
+ assert :undefined = external_id |> UsersCounter.scope() |> :syn.lookup(external_id)
refute Process.alive?(db_conn)
Connect.shutdown(external_id)
end
diff --git a/test/realtime_web/channels/payloads/join_test.exs b/test/realtime_web/channels/payloads/join_test.exs
index c1ea54a67..f02c2a73d 100644
--- a/test/realtime_web/channels/payloads/join_test.exs
+++ b/test/realtime_web/channels/payloads/join_test.exs
@@ -58,6 +58,14 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do
assert is_binary(key)
end
+ test "presence key can be number" do
+ config = %{"config" => %{"presence" => %{"enabled" => true, "key" => 123}}}
+
+ assert {:ok, %Join{config: %Config{presence: %Presence{key: key}}}} = Join.validate(config)
+
+ assert key == 123
+ end
+
test "invalid replay" do
config = %{"config" => %{"broadcast" => %{"replay" => 123}}}
@@ -105,5 +113,11 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do
user_token: ["unable to parse, expected string"]
}
end
+
+ test "handles postgres changes with nil value in array as empty array" do
+ config = %{"config" => %{"postgres_changes" => [nil]}}
+
+ assert {:ok, %Join{config: %Config{postgres_changes: []}}} = Join.validate(config)
+ end
end
end
diff --git a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs
index 44ce83b99..53be2e51f 100644
--- a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs
+++ b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs
@@ -16,12 +16,24 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
describe "fastlane_metadata/5" do
test "info level" do
assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :info, "tenant_id") ==
- {:rc_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}, MapSet.new()}
+ {:rc_fastlane, self(), Serializer, "realtime:topic", :info, "tenant_id", MapSet.new()}
end
test "non-info level" do
assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :warning, "tenant_id") ==
- {:rc_fastlane, self(), Serializer, "realtime:topic", MapSet.new()}
+ {:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new()}
+ end
+
+ test "replayed message ids" do
+ assert MessageDispatcher.fastlane_metadata(
+ self(),
+ Serializer,
+ "realtime:topic",
+ :warning,
+ "tenant_id",
+ MapSet.new([1])
+ ) ==
+ {:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new([1])}
end
end
@@ -50,8 +62,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
from_pid = :erlang.list_to_pid(~c'<0.2.1>')
subscribers = [
- {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}},
- {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}}
+ {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}},
+ {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}}
]
msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}}
@@ -74,6 +86,48 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
refute_receive _any
end
+ test "dispatches 'presence_diff' messages to fastlane subscribers" do
+ parent = self()
+
+ subscriber_pid =
+ spawn(fn ->
+ loop = fn loop ->
+ receive do
+ msg ->
+ send(parent, {:subscriber, msg})
+ loop.(loop)
+ end
+ end
+
+ loop.(loop)
+ end)
+
+ from_pid = :erlang.list_to_pid(~c'<0.2.1>')
+
+ subscribers = [
+ {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant456", MapSet.new()}},
+ {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant456", MapSet.new()}}
+ ]
+
+ msg = %Broadcast{topic: "some:other:topic", event: "presence_diff", payload: %{data: "test"}}
+
+ log =
+ capture_log(fn ->
+ assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok
+ end)
+
+ assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}"
+
+ assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}}
+ assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}}
+
+ assert Agent.get(TestSerializer, & &1) == 1
+
+ assert Realtime.GenCounter.get(Realtime.Tenants.presence_events_per_second_key("tenant456")) == 2
+
+ refute_receive _any
+ end
+
test "does not dispatch messages to fastlane subscribers if they already replayed it" do
parent = self()
@@ -95,8 +149,9 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
subscribers = [
{subscriber_pid,
- {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, replaeyd_message_ids}},
- {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", replaeyd_message_ids}}
+ {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", replaeyd_message_ids}},
+ {subscriber_pid,
+ {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", replaeyd_message_ids}}
]
msg = %Broadcast{
@@ -131,8 +186,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
from_pid = :erlang.list_to_pid(~c'<0.2.1>')
subscribers = [
- {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}},
- {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}}
+ {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}},
+ {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}}
]
msg = %Broadcast{topic: "some:other:topic", event: "event", payload: "not a map"}
diff --git a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs
index 0cdf422e2..219f13e55 100644
--- a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs
+++ b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs
@@ -100,25 +100,41 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
end
describe "handle/3" do
+ setup do
+ on_exit(fn -> :telemetry.detach(__MODULE__) end)
+
+ :telemetry.attach(
+ __MODULE__,
+ [:realtime, :tenants, :payload, :size],
+ &__MODULE__.handle_telemetry/4,
+ pid: self()
+ )
+ end
+
test "with true policy and is private, user can track their presence and changes", %{
tenant: tenant,
topic: topic,
db_conn: db_conn
} do
+ external_id = tenant.external_id
key = random_string()
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
socket =
socket_fixture(tenant, topic, key, policies: policies)
- PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
+ PresenceHandler.handle(%{"event" => "track", "payload" => %{"A" => "b", "c" => "b"}}, db_conn, socket)
topic = socket.assigns.tenant_topic
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)
+
+ assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 30},
+ %{tenant: ^external_id, message_type: :presence}}
end
test "when tracking already existing user, metadata updated", %{tenant: tenant, topic: topic, db_conn: db_conn} do
+ external_id = tenant.external_id
key = random_string()
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
socket = socket_fixture(tenant, topic, key, policies: policies)
@@ -134,10 +150,18 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)
+
+ assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6},
+ %{tenant: ^external_id, message_type: :presence}}
+
+ assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 55},
+ %{tenant: ^external_id, message_type: :presence}}
+
refute_receive :_
end
test "with false policy and is public, user can track their presence and changes", %{tenant: tenant, topic: topic} do
+ external_id = tenant.external_id
key = random_string()
policies = %Policies{presence: %PresencePolicies{read: false, write: false}}
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false)
@@ -147,6 +171,9 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
topic = socket.assigns.tenant_topic
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)
+
+ assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6},
+ %{tenant: ^external_id, message_type: :presence}}
end
test "user can untrack when they want", %{tenant: tenant, topic: topic, db_conn: db_conn} do
@@ -434,6 +461,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
assert log =~ "PresenceRateLimitReached"
end
+ @tag :skip
@tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence]
test "respects rate limits on private channels", %{tenant: tenant, topic: topic, db_conn: db_conn} do
key = random_string()
@@ -517,4 +545,6 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
}
}
end
+
+ def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata})
end
diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs
index ae6c1734a..8022d6ebd 100644
--- a/test/realtime_web/channels/realtime_channel_test.exs
+++ b/test/realtime_web/channels/realtime_channel_test.exs
@@ -239,23 +239,14 @@ defmodule RealtimeWeb.RealtimeChannelTest do
end
describe "presence" do
- test "events are counted", %{tenant: tenant} do
+ test "presence state event is counted", %{tenant: tenant} do
jwt = Generators.generate_jwt_token(tenant)
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))
assert {:ok, _, %Socket{} = socket} = subscribe_and_join(socket, "realtime:test", %{})
- presence_diff = %Socket.Broadcast{event: "presence_diff", payload: %{joins: %{}, leaves: %{}}}
- send(socket.channel_pid, presence_diff)
-
assert_receive %Socket.Message{topic: "realtime:test", event: "presence_state", payload: %{}}
- assert_receive %Socket.Message{
- topic: "realtime:test",
- event: "presence_diff",
- payload: %{joins: %{}, leaves: %{}}
- }
-
tenant_id = tenant.external_id
# Wait for RateCounter to tick
@@ -264,8 +255,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
assert {:ok, %RateCounter{id: {:channel, :presence_events, ^tenant_id}, bucket: bucket}} =
RateCounter.get(socket.assigns.presence_rate_counter)
- # presence_state + presence_diff
- assert 2 in bucket
+ # presence_state
+ assert Enum.sum(bucket) == 1
end
end
diff --git a/test/realtime_web/channels/tenant_rate_limiters_test.exs b/test/realtime_web/channels/tenant_rate_limiters_test.exs
new file mode 100644
index 000000000..05d56ec82
--- /dev/null
+++ b/test/realtime_web/channels/tenant_rate_limiters_test.exs
@@ -0,0 +1,31 @@
+defmodule RealtimeWeb.TenantRateLimitersTest do
+ use Realtime.DataCase, async: true
+
+ use Mimic
+ alias RealtimeWeb.TenantRateLimiters
+ alias Realtime.Api.Tenant
+
+ setup do
+ tenant = %Tenant{external_id: random_string(), max_concurrent_users: 1, max_joins_per_second: 1}
+
+ %{tenant: tenant}
+ end
+
+ describe "check_tenant/1" do
+ test "rate is not exceeded", %{tenant: tenant} do
+ assert TenantRateLimiters.check_tenant(tenant) == :ok
+ end
+
+ test "max concurrent users is exceeded", %{tenant: tenant} do
+ Realtime.UsersCounter.add(self(), tenant.external_id)
+
+ assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_connections}
+ end
+
+ test "max joins is exceeded", %{tenant: tenant} do
+ expect(Realtime.RateCounter, :get, fn _ -> {:ok, %{limit: %{triggered: true}}} end)
+
+ assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_joins}
+ end
+ end
+end
diff --git a/test/realtime_web/controllers/broadcast_controller_test.exs b/test/realtime_web/controllers/broadcast_controller_test.exs
index 7bd426353..900eb7aa9 100644
--- a/test/realtime_web/controllers/broadcast_controller_test.exs
+++ b/test/realtime_web/controllers/broadcast_controller_test.exs
@@ -272,8 +272,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
- connect_events_key = Tenants.connect_per_second_rate(tenant).id
- expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
+ expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _, _ -> :ok end)
messages_to_send =
Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end)
@@ -298,7 +297,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
- broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/4)
+ broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/5)
Enum.each(messages_to_send, fn %{topic: topic} ->
broadcast_topic = Tenants.tenant_topic(tenant, topic, false)
@@ -314,7 +313,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
}
assert Enum.any?(broadcast_calls, fn
- [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher] -> true
+ [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true
_ -> false
end)
end)
@@ -330,8 +329,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
- connect_events_key = Tenants.connect_per_second_rate(tenant).id
- expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end)
+ expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _, _ -> :ok end)
channels =
Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end)
@@ -366,7 +364,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
- broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/4)
+ broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/5)
Enum.each(channels, fn %{topic: topic} ->
broadcast_topic = Tenants.tenant_topic(tenant, topic, false)
@@ -382,7 +380,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
}
assert Enum.count(broadcast_calls, fn
- [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher] -> true
+ [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true
_ -> false
end) == 1
end)
@@ -401,7 +399,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
open_channel_topic = Tenants.tenant_topic(tenant, "open_channel", true)
assert Enum.count(broadcast_calls, fn
- [_, ^open_channel_topic, ^message, RealtimeChannel.MessageDispatcher] -> true
+ [_, ^open_channel_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true
_ -> false
end) == 1
@@ -416,8 +414,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
} do
request_events_key = Tenants.requests_per_second_key(tenant)
broadcast_events_key = Tenants.events_per_second_key(tenant)
- connect_events_key = Tenants.connect_per_second_rate(tenant).id
- expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
+ expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _, _ -> :ok end)
messages_to_send =
Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end)
@@ -438,12 +435,11 @@ defmodule RealtimeWeb.BroadcastControllerTest do
GenCounter
|> expect(:add, fn ^request_events_key -> :ok end)
# remove the one message that won't be broadcasted for this user
- |> expect(:add, 1, fn ^connect_events_key -> :ok end)
|> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end)
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
- broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/4)
+ broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/5)
Enum.each(messages_to_send, fn %{topic: topic} ->
broadcast_topic = Tenants.tenant_topic(tenant, topic, false)
@@ -459,7 +455,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
}
assert Enum.count(broadcast_calls, fn
- [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher] -> true
+ [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true
_ -> false
end) == 1
end)
@@ -472,7 +468,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
@tag role: "anon"
test "user without permission won't broadcast", %{conn: conn, db_conn: db_conn, tenant: tenant} do
request_events_key = Tenants.requests_per_second_key(tenant)
- reject(&TenantBroadcaster.pubsub_broadcast/4)
+ reject(&TenantBroadcaster.pubsub_broadcast/5)
messages =
Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end)
diff --git a/test/realtime_web/live/status_live/index_test.exs b/test/realtime_web/live/status_live/index_test.exs
new file mode 100644
index 000000000..ae3af0ad0
--- /dev/null
+++ b/test/realtime_web/live/status_live/index_test.exs
@@ -0,0 +1,33 @@
+defmodule RealtimeWeb.StatusLive.IndexTest do
+ use RealtimeWeb.ConnCase
+ import Phoenix.LiveViewTest
+
+ alias Realtime.Latency.Payload
+ alias Realtime.Nodes
+ alias RealtimeWeb.Endpoint
+
+ describe "Status LiveView" do
+ test "renders status page", %{conn: conn} do
+ {:ok, _view, html} = live(conn, ~p"/status")
+
+ assert html =~ "Realtime Status"
+ end
+
+ test "receives broadcast from PubSub", %{conn: conn} do
+ {:ok, view, _html} = live(conn, ~p"/status")
+
+ payload = %Payload{
+ from_node: Nodes.short_node_id_from_name(:"pink@127.0.0.1"),
+ node: Nodes.short_node_id_from_name(:"orange@127.0.0.1"),
+ latency: "42ms",
+ timestamp: DateTime.utc_now()
+ }
+
+ Endpoint.broadcast("admin:cluster", "ping", payload)
+
+ html = render(view)
+ assert html =~ "42ms"
+ assert html =~ "pink@127.0.0.1_orange@127.0.0.1"
+ end
+ end
+end
diff --git a/test/realtime_web/tenant_broadcaster_test.exs b/test/realtime_web/tenant_broadcaster_test.exs
index ddda381a1..bc3b4f90a 100644
--- a/test/realtime_web/tenant_broadcaster_test.exs
+++ b/test/realtime_web/tenant_broadcaster_test.exs
@@ -60,7 +60,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
test "pubsub_broadcast", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
- TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
+ TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub, :broadcast)
assert_receive ^message
@@ -71,13 +71,13 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 114},
- %{tenant: "realtime-dev"}
+ %{tenant: "realtime-dev", message_type: :broadcast}
}
end
test "pubsub_broadcast list payload", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
- TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
+ TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub, :broadcast)
assert_receive ^message
@@ -88,13 +88,13 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 130},
- %{tenant: "realtime-dev"}
+ %{tenant: "realtime-dev", message_type: :broadcast}
}
end
test "pubsub_broadcast string payload", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"}
- TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
+ TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub, :broadcast)
assert_receive ^message
@@ -105,7 +105,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 119},
- %{tenant: "realtime-dev"}
+ %{tenant: "realtime-dev", message_type: :broadcast}
}
end
end
@@ -131,7 +131,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
- TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub)
+ TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub, :broadcast)
assert_receive {:other_process, ^message}
@@ -142,7 +142,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 114},
- %{tenant: "realtime-dev"}
+ %{tenant: "realtime-dev", message_type: :broadcast}
}
# This process does not receive the message
@@ -151,5 +151,38 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
end
end
+ describe "collect_payload_size/3" do
+ @describetag pubsub_adapter: :gen_rpc
+
+ test "emit telemetry for struct" do
+ TenantBroadcaster.collect_payload_size(
+ "realtime-dev",
+ %Phoenix.Socket.Broadcast{event: "broadcast", payload: %{"a" => "b"}},
+ :broadcast
+ )
+
+ assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 65},
+ %{tenant: "realtime-dev", message_type: :broadcast}}
+ end
+
+ test "emit telemetry for map" do
+ TenantBroadcaster.collect_payload_size(
+ "realtime-dev",
+ %{event: "broadcast", payload: %{"a" => "b"}},
+ :postgres_changes
+ )
+
+ assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 53},
+ %{tenant: "realtime-dev", message_type: :postgres_changes}}
+ end
+
+ test "emit telemetry for non-map" do
+ TenantBroadcaster.collect_payload_size("realtime-dev", "some blob", :presence)
+
+ assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 15},
+ %{tenant: "realtime-dev", message_type: :presence}}
+ end
+ end
+
def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata})
end