Skip to content

Commit eb8b91d

Browse files
h0lybytefilipecabacoedgurgelkevcodezchasers
authored
🔄 Sync with upstream changes (#15)
* fix: runtime setup error (supabase#1520) * fix: use primary instead of replica on rename_settings_field (supabase#1521) * feat: upgrade cowboy & ranch (supabase#1523) * fix: Fix GenRpc to not try to connect to nodes that are not alive (supabase#1525) * fix: enable presence on track message (supabase#1527) currently the user would need to have enabled from the beginning of the channel. this will enable users to enable presence later in the flow by sending a track message which will enable presence messages for them * fix: set cowboy active_n=100 as cowboy 2.12.0 (supabase#1530) cowboy 2.13.0 set the default active_n=1 * fix: provide error_code metadata on RealtimeChannel.Logging (supabase#1531) * feat: disable UTF8 validation on websocket frames (supabase#1532) Currently all text frames as handled only with JSON which already requires UTF-8 * fix: move DB setup to happen after Connect.init (supabase#1533) This change reduces the impact of slow DB setup impacting other tenants trying to connect at the same time that landed on the same partition * fix: handle wal bloat (supabase#1528) Verify that replication connection is able to reconnect when faced with WAL bloat issues * feat: replay realtime.messages (supabase#1526) A new index was created on inserted_at DESC, topic WHERE private IS TRUE AND extension = "broadast" The hardcoded limit is 25 for now. * feat: gen_rpc pub sub adapter (supabase#1529) Add a PubSub adapter that uses gen_rpc to send messages to other nodes. It uses :gen_rpc.abcast/3 instead of :erlang.send/2 The adapter works very similarly to the PG2 adapter. It consists of multiple workers that forward to the local node using PubSub.local_broadcast. The way to choose the worker to be used is based on the sending process just like PG2 adapter does The number of workers is controlled by `:pool_size` or `:broadcast_pool_size`. This distinction exists because Phoenix.PubSub uses `:pool_size` to define how many partitions the PubSub registry will use. It's possible to control them separately by using `:broadcast_pool_size` * fix: ensure message id doesn't raise on non-map payloads (supabase#1534) * fix: match error on Connect (supabase#1536) --------- Co-authored-by: Eduardo Gurgel Pinho <[email protected]> * feat: websocket max heap size configuration (supabase#1538) * fix: set max process heap size to 500MB instead of 8GB * feat: set websocket transport max heap size WEBSOCKET_MAX_HEAP_SIZE can be used to configure it * fix: update gen_rpc to fix gen_rpc_dispatcher issues (supabase#1537) Issues: * Single gen_rpc_dispatcher that can be a bottleneck if the connecting takes some time * Many calls can land on the dispatcher but the node might be gone already. If we don't validate the node it might keep trying to connect until it times out instead of quickly giving up due to not being an actively connected node. * fix: improve ErlSysMon logging for processes (supabase#1540) Include initial_call, ancestors, registered_name, message_queue_len and total_heap_size Also bump long_schedule and long_gc * fix: make pubsub adapter configurable (supabase#1539) * fix: specify that only private channels are allowed when replaying (supabase#1543) messages * fix: rate limit connect module (supabase#1541) On bad connection, we rate limit the Connect module so we prevent abuses and too much logging of errors * build: automatically cancel old tests/build on new push (supabase#1545) Currently, whenever you push any commit to your branch, the old builds are still running and a new build is started. Once a new commit is added, the old test results no longer matter and it's just a waste of CI resources. Also reduces confusion with multiple builds running in parallel for the same branch/possibly blocking any merges. With this little change, we ensure that whenever a new commit is added, the previous build is immediately canceled/stopped and only the build (latest commit) runs. * fix: move message queue data to off-heap for gen_rpc pub sub workers (supabase#1548) * fix: rate limit Connect.lookup_or_start_connection on error only (supabase#1549) * fix: increase connect error rate window to 30 seconds (supabase#1550) * fix: set a lower fullsweep_after flag for GenRpcPubSub workers (supabase#1551) * fix: hardcode presence limit (supabase#1552) * fix: further decrease limit on presence events (supabase#1553) * fix: bump up realtime (supabase#1554) * fix: lower rate limit to 100 events per second (supabase#1556) * fix: move connect rate limit to socket (supabase#1555) * fix: reduce max_frame_size to 5MB * fix: fullsweep_after=100 on gen rpc pub sub workers --------- Co-authored-by: Eduardo Gurgel Pinho <[email protected]> * fix: collect global metrics without tenant tagging (supabase#1557) * feat: presence payload size (supabase#1559) * Also tweak buckets to account all the way to 3000KB * Start tagging the payload size metrics with message_type. message_type can be presence, broadcast or postgres_changes * fix: use GenRpc for Realtime.Latency pings (supabase#1560) * Fastlane for phoenix presence_diff (supabase#1558) It uses a fork of Phoenix for time being * fix: count presence_diff events on MessageDispatcher * fix: remove traces from console during development * fix: limit db events (supabase#1562) * chore: split tests and lint workflows (supabase#1564) Also cache mix _build and deps * fix: use LiveView stream for status page (supabase#1565) * fix: use LiveView stream for status page * fix: need full node name on localhost for tests * fix: cleanup * fix: add tests * fix: bump version * fix: cleanup syntax * fix: format * fix: refine join payload checking (supabase#1567) * fix: shard user scopes in syn (supabase#1566) --------- Co-authored-by: Filipe Cabaço <[email protected]> Co-authored-by: Eduardo Gurgel <[email protected]> Co-authored-by: Kevin Grüneberg <[email protected]> Co-authored-by: Chase Granberry <[email protected]> Co-authored-by: Bradley Haljendi <[email protected]>
1 parent 7793cda commit eb8b91d

File tree

18 files changed

+350
-79
lines changed

18 files changed

+350
-79
lines changed

.github/workflows/lint.yml

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
name: Lint
2+
on:
3+
pull_request:
4+
paths:
5+
- "lib/**"
6+
- "test/**"
7+
- "config/**"
8+
- "priv/**"
9+
- "assets/**"
10+
- "rel/**"
11+
- "mix.exs"
12+
- "Dockerfile"
13+
- "run.sh"
14+
15+
push:
16+
branches:
17+
- main
18+
19+
concurrency:
20+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
21+
cancel-in-progress: true
22+
23+
jobs:
24+
tests:
25+
name: Lint
26+
runs-on: ubuntu-latest
27+
28+
steps:
29+
- uses: actions/checkout@v2
30+
- name: Setup elixir
31+
id: beam
32+
uses: erlef/setup-beam@v1
33+
with:
34+
otp-version: 27.x # Define the OTP version [required]
35+
elixir-version: 1.17.x # Define the elixir version [required]
36+
- name: Cache Mix
37+
uses: actions/cache@v4
38+
with:
39+
path: |
40+
deps
41+
_build
42+
key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
43+
restore-keys: |
44+
${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-
45+
46+
- name: Install dependencies
47+
run: mix deps.get
48+
- name: Set up Postgres
49+
run: docker compose -f docker-compose.dbs.yml up -d
50+
- name: Run main database migrations
51+
run: mix ecto.migrate --log-migrator-sql
52+
- name: Run database tenant migrations
53+
run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
54+
- name: Run format check
55+
run: mix format --check-formatted
56+
- name: Credo checks
57+
run: mix credo
58+
- name: Run hex audit
59+
run: mix hex.audit
60+
- name: Run mix_audit
61+
run: mix deps.audit
62+
- name: Run sobelow
63+
run: mix sobelow --config .sobelow-conf
64+
- name: Retrieve PLT Cache
65+
uses: actions/cache@v4
66+
id: plt-cache
67+
with:
68+
path: priv/plts
69+
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
70+
- name: Create PLTs
71+
if: steps.plt-cache.outputs.cache-hit != 'true'
72+
run: |
73+
mkdir -p priv/plts
74+
mix dialyzer.build
75+
- name: Run dialyzer
76+
run: mix dialyzer
77+
- name: Run dev seeds
78+
run: DB_ENC_KEY="1234567890123456" mix ecto.setup

.github/workflows/tests.yml

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ concurrency:
2020
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
2121
cancel-in-progress: true
2222

23+
env:
24+
MIX_ENV: test
25+
2326
jobs:
2427
tests:
2528
name: Tests
@@ -36,44 +39,19 @@ jobs:
3639
- name: Cache Mix
3740
uses: actions/cache@v4
3841
with:
39-
path: deps
40-
key: ${{ runner.os }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
42+
path: |
43+
deps
44+
_build
45+
key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
4146
restore-keys: |
42-
${{ runner.os }}-mix-
47+
${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-
4348
49+
- name: Pull postgres image quietly in background (used by test/support/containers.ex)
50+
run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 &
4451
- name: Install dependencies
4552
run: mix deps.get
4653
- name: Set up Postgres
4754
run: docker compose -f docker-compose.dbs.yml up -d
48-
- name: Run main database migrations
49-
run: mix ecto.migrate --log-migrator-sql
50-
- name: Run database tenant migrations
51-
run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
52-
- name: Run format check
53-
run: mix format --check-formatted
54-
- name: Credo checks
55-
run: mix credo
56-
- name: Run hex audit
57-
run: mix hex.audit
58-
- name: Run mix_audit
59-
run: mix deps.audit
60-
- name: Run sobelow
61-
run: mix sobelow --config .sobelow-conf
62-
- name: Retrieve PLT Cache
63-
uses: actions/cache@v4
64-
id: plt-cache
65-
with:
66-
path: priv/plts
67-
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
68-
- name: Create PLTs
69-
if: steps.plt-cache.outputs.cache-hit != 'true'
70-
run: |
71-
mkdir -p priv/plts
72-
mix dialyzer.build
73-
- name: Run dialyzer
74-
run: mix dialyzer
75-
- name: Run dev seeds
76-
run: DB_ENC_KEY="1234567890123456" mix ecto.setup
7755
- name: Start epmd
7856
run: epmd -daemon
7957
- name: Run tests

config/runtime.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws
7070
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
7171
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
7272
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
73+
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
7374

7475
no_channel_timeout_in_ms =
7576
if config_env() == :test,
@@ -126,7 +127,8 @@ config :realtime,
126127
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
127128
platform: platform,
128129
pubsub_adapter: pubsub_adapter,
129-
broadcast_pool_size: broadcast_pool_size
130+
broadcast_pool_size: broadcast_pool_size,
131+
users_scope_shards: users_scope_shards
130132

131133
if config_env() != :test && run_janitor? do
132134
config :realtime,

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
1818
alias Realtime.Adapters.Changes.NewRecord
1919
alias Realtime.Adapters.Changes.UpdatedRecord
2020
alias Realtime.Database
21+
alias Realtime.RateCounter
22+
alias Realtime.Tenants
2123

2224
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
2325

@@ -26,6 +28,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
2628
tenant_id = args["id"]
2729
Logger.metadata(external_id: tenant_id, project: tenant_id)
2830

31+
%Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)
32+
33+
rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)
34+
35+
RateCounter.new(rate_counter_args)
36+
2937
state = %{
3038
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
3139
db_host: args["db_host"],
@@ -41,7 +49,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
4149
retry_ref: nil,
4250
retry_count: 0,
4351
slot_name: args["slot_name"] <> slot_name_suffix(),
44-
tenant_id: tenant_id
52+
tenant_id: tenant_id,
53+
rate_counter_args: rate_counter_args
4554
}
4655

4756
{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
@@ -74,7 +83,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
7483
max_record_bytes: max_record_bytes,
7584
max_changes: max_changes,
7685
conn: conn,
77-
tenant_id: tenant_id
86+
tenant_id: tenant_id,
87+
rate_counter_args: rate_counter_args
7888
} = state
7989
) do
8090
cancel_timer(poll_ref)
@@ -84,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
8494
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
8595
record_list_changes_telemetry(time, tenant_id)
8696

87-
case handle_list_changes_result(list_changes, tenant_id) do
97+
case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
8898
{:ok, row_count} ->
8999
Backoff.reset(backoff)
90100

@@ -177,20 +187,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
177187
rows: [_ | _] = rows,
178188
num_rows: rows_count
179189
}},
180-
tenant_id
190+
tenant_id,
191+
rate_counter_args
181192
) do
182-
for row <- rows,
183-
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
184-
topic = "realtime:postgres:" <> tenant_id
193+
case RateCounter.get(rate_counter_args) do
194+
{:ok, %{limit: %{triggered: true}}} ->
195+
:ok
185196

186-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
197+
_ ->
198+
Realtime.GenCounter.add(rate_counter_args.id, rows_count)
199+
200+
for row <- rows,
201+
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
202+
topic = "realtime:postgres:" <> tenant_id
203+
204+
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
205+
end
187206
end
188207

189208
{:ok, rows_count}
190209
end
191210

192-
defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
193-
defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}
211+
defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
212+
defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}
194213

195214
def generate_record([
196215
{"wal",

lib/realtime/application.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ defmodule Realtime.Application do
4646
Realtime.PromEx.set_metrics_tags()
4747
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
4848
:syn.set_event_handler(Realtime.SynHandler)
49-
50-
:ok = :syn.add_node_to_scopes([:users, RegionNodes, Realtime.Tenants.Connect])
49+
:ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])
5150

5251
region = Application.get_env(:realtime, :region)
5352
:syn.join(RegionNodes, region, self(), node: node())

lib/realtime/nodes.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ defmodule Realtime.Nodes do
105105
106106
iex> node = :"[email protected]"
107107
iex> Realtime.Helpers.short_node_id_from_name(node)
108-
"127.0.0.1"
108+
"pink@127.0.0.1"
109109
110110
iex> node = :"[email protected]"
111111
iex> Realtime.Helpers.short_node_id_from_name(node)
@@ -124,6 +124,9 @@ defmodule Realtime.Nodes do
124124
[_, _, _, _, _, one, two, _] ->
125125
one <> two
126126

127+
["127.0.0.1"] ->
128+
Atom.to_string(name)
129+
127130
_other ->
128131
host
129132
end

lib/realtime/tenants.ex

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ defmodule Realtime.Tenants do
2121
"""
2222
@spec list_connected_tenants(atom()) :: [String.t()]
2323
def list_connected_tenants(node) do
24-
:syn.group_names(:users, node)
24+
UsersCounter.scopes()
25+
|> Enum.flat_map(fn scope -> :syn.group_names(scope, node) end)
2526
end
2627

2728
@doc """
@@ -247,6 +248,31 @@ defmodule Realtime.Tenants do
247248
%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
248249
end
249250

251+
@doc "RateCounter arguments for counting database events per second with a limit."
252+
@spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
253+
def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do
254+
opts = [
255+
telemetry: %{
256+
event_name: [:channel, :db_events],
257+
measurements: %{},
258+
metadata: %{tenant: tenant_id}
259+
},
260+
limit: [
261+
value: max_events_per_second,
262+
measurement: :avg,
263+
log: true,
264+
log_fn: fn ->
265+
Logger.error("MessagePerSecondRateLimitReached: Too many postgres changes messages per second",
266+
external_id: tenant_id,
267+
project: tenant_id
268+
)
269+
end
270+
]
271+
]
272+
273+
%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
274+
end
275+
250276
@doc """
251277
The GenCounter key to use when counting events for RealtimeChannel events.
252278
iex> Realtime.Tenants.db_events_per_second_key("tenant_id")

lib/realtime/user_counter.ex

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,32 @@ defmodule Realtime.UsersCounter do
88
Adds a RealtimeChannel pid to the `:users` scope for a tenant so we can keep track of all connected clients for a tenant.
99
"""
1010
@spec add(pid(), String.t()) :: :ok
11-
def add(pid, tenant), do: :syn.join(:users, tenant, pid)
11+
def add(pid, tenant_id), do: tenant_id |> scope() |> :syn.join(tenant_id, pid)
1212

1313
@doc """
1414
Returns the count of all connected clients for a tenant for the cluster.
1515
"""
1616
@spec tenant_users(String.t()) :: non_neg_integer()
17-
def tenant_users(tenant), do: :syn.member_count(:users, tenant)
17+
def tenant_users(tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id)
1818

1919
@doc """
2020
Returns the count of all connected clients for a tenant for a single node.
2121
"""
2222
@spec tenant_users(atom, String.t()) :: non_neg_integer()
23-
def tenant_users(node_name, tenant), do: :syn.member_count(:users, tenant, node_name)
23+
def tenant_users(node_name, tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id, node_name)
24+
25+
@doc """
26+
Returns the scope for a given tenant id.
27+
"""
28+
@spec scope(String.t()) :: atom()
29+
def scope(tenant_id) do
30+
shards = Application.get_env(:realtime, :users_scope_shards)
31+
shard = :erlang.phash2(tenant_id, shards)
32+
:"users_#{shard}"
33+
end
34+
35+
def scopes() do
36+
shards = Application.get_env(:realtime, :users_scope_shards)
37+
Enum.map(0..(shards - 1), fn shard -> :"users_#{shard}" end)
38+
end
2439
end

lib/realtime_web/channels/payloads/config.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ defmodule RealtimeWeb.Channels.Payloads.Config do
1717
end
1818

1919
def changeset(config, attrs) do
20+
attrs =
21+
attrs
22+
|> Enum.map(fn
23+
{k, v} when is_list(v) -> {k, Enum.filter(v, fn v -> v != nil end)}
24+
{k, v} -> {k, v}
25+
end)
26+
|> Map.new()
27+
2028
config
2129
|> cast(attrs, [:private], message: &Join.error_message/2)
2230
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")

lib/realtime_web/channels/payloads/presence.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule RealtimeWeb.Channels.Payloads.Presence do
88

99
embedded_schema do
1010
field :enabled, :boolean, default: true
11-
field :key, :string, default: UUID.uuid1()
11+
field :key, :any, default: UUID.uuid1(), virtual: true
1212
end
1313

1414
def changeset(presence, attrs) do

0 commit comments

Comments
 (0)