Skip to content

Commit 50891cd

Browse files
authored
fix: handle wal bloat (#1528)
Verify that replication connection is able to reconnect when faced with WAL bloat issues
1 parent 70339c7 commit 50891cd

File tree

7 files changed

+245
-35
lines changed

7 files changed

+245
-35
lines changed

lib/realtime/tenants/connect.ex

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -252,31 +252,10 @@ defmodule Realtime.Tenants.Connect do
252252
end
253253

254254
def handle_continue(:start_replication, state) do
255-
%{tenant: tenant} = state
256-
257-
with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()) do
258-
replication_connection_reference = Process.monitor(replication_connection_pid)
259-
260-
state = %{
261-
state
262-
| replication_connection_pid: replication_connection_pid,
263-
replication_connection_reference: replication_connection_reference
264-
}
265-
266-
{:noreply, state, {:continue, :setup_connected_user_events}}
267-
else
268-
{:error, :max_wal_senders_reached} ->
269-
log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders")
270-
{:stop, :shutdown, state}
271-
272-
{:error, error} ->
273-
log_error("StartReplicationFailed", error)
274-
{:stop, :shutdown, state}
255+
case start_replication_connection(state) do
256+
{:ok, state} -> {:noreply, state, {:continue, :setup_connected_user_events}}
257+
{:error, state} -> {:stop, :shutdown, state}
275258
end
276-
rescue
277-
error ->
278-
log_error("StartReplicationFailed", error)
279-
{:stop, :shutdown, state}
280259
end
281260

282261
def handle_continue(:setup_connected_user_events, state) do
@@ -348,13 +327,30 @@ defmodule Realtime.Tenants.Connect do
348327
{:stop, :shutdown, state}
349328
end
350329

330+
@replication_recovery_backoff 1000
331+
351332
# Handle replication connection termination
352333
def handle_info(
353334
{:DOWN, replication_connection_reference, _, _, _},
354335
%{replication_connection_reference: replication_connection_reference} = state
355336
) do
356-
Logger.warning("Replication connection has died")
357-
{:stop, :shutdown, state}
337+
log_warning("ReplicationConnectionDown", "Replication connection has been terminated")
338+
Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
339+
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil}
340+
{:noreply, state}
341+
end
342+
343+
@replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'"
344+
def handle_info(:recover_replication_connection, state) do
345+
with %{num_rows: 0} <- Postgrex.query!(state.db_conn_pid, @replication_connection_query, []),
346+
{:ok, state} <- start_replication_connection(state) do
347+
{:noreply, state}
348+
else
349+
_ ->
350+
log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed")
351+
Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
352+
{:noreply, state}
353+
end
358354
end
359355

360356
def handle_info(_, state), do: {:noreply, state}
@@ -414,4 +410,32 @@ defmodule Realtime.Tenants.Connect do
414410
defp tenant_suspended?(_), do: :ok
415411

416412
defp rebalance_check_interval_in_ms(), do: Application.fetch_env!(:realtime, :rebalance_check_interval_in_ms)
413+
414+
defp start_replication_connection(state) do
415+
%{tenant: tenant} = state
416+
417+
with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()) do
418+
replication_connection_reference = Process.monitor(replication_connection_pid)
419+
420+
state = %{
421+
state
422+
| replication_connection_pid: replication_connection_pid,
423+
replication_connection_reference: replication_connection_reference
424+
}
425+
426+
{:ok, state}
427+
else
428+
{:error, :max_wal_senders_reached} ->
429+
log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders")
430+
{:error, state}
431+
432+
{:error, error} ->
433+
log_error("StartReplicationFailed", error)
434+
{:error, state}
435+
end
436+
rescue
437+
error ->
438+
log_error("StartReplicationFailed", error)
439+
{:error, state}
440+
end
417441
end

lib/realtime/tenants/replication_connection.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ defmodule Realtime.Tenants.ReplicationConnection do
144144
port: connection_opts.port,
145145
socket_options: connection_opts.socket_options,
146146
ssl: connection_opts.ssl,
147-
backoff_type: :stop,
148147
sync_connect: true,
148+
auto_reconnect: false,
149149
parameters: [application_name: "realtime_replication_connection"]
150150
]
151151

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.48.1",
7+
version: "2.48.2",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ defmodule Realtime.Integration.RtChannelTest do
2525
alias Realtime.Tenants
2626
alias Realtime.Tenants.Authorization
2727
alias Realtime.Tenants.Connect
28+
alias Realtime.Tenants.ReplicationConnection
2829

2930
alias RealtimeWeb.RealtimeChannel.Tracker
3031
alias RealtimeWeb.SocketDisconnect
@@ -2354,6 +2355,135 @@ defmodule Realtime.Integration.RtChannelTest do
23542355
assert count == 2
23552356
end
23562357

2358+
describe "WAL bloat handling" do
2359+
setup %{tenant: tenant} do
2360+
topic = random_string()
2361+
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
2362+
2363+
%{rows: [[max_wal_size]]} = Postgrex.query!(db_conn, "SHOW max_wal_size", [])
2364+
%{rows: [[wal_keep_size]]} = Postgrex.query!(db_conn, "SHOW wal_keep_size", [])
2365+
%{rows: [[max_slot_wal_keep_size]]} = Postgrex.query!(db_conn, "SHOW max_slot_wal_keep_size", [])
2366+
2367+
assert max_wal_size == "32MB"
2368+
assert wal_keep_size == "32MB"
2369+
assert max_slot_wal_keep_size == "32MB"
2370+
2371+
Postgrex.query!(db_conn, "CREATE TABLE IF NOT EXISTS wal_test (id INT, data TEXT)", [])
2372+
2373+
Postgrex.query!(
2374+
db_conn,
2375+
"""
2376+
CREATE OR REPLACE FUNCTION wal_test_trigger_func() RETURNS TRIGGER AS $$
2377+
BEGIN
2378+
PERFORM realtime.send(json_build_object ('value', 'test' :: text)::jsonb, 'test', '#{topic}', false);
2379+
RETURN NULL;
2380+
END;
2381+
$$ LANGUAGE plpgsql;
2382+
""",
2383+
[]
2384+
)
2385+
2386+
Postgrex.query!(db_conn, "DROP TRIGGER IF EXISTS wal_test_trigger ON wal_test", [])
2387+
2388+
Postgrex.query!(
2389+
db_conn,
2390+
"""
2391+
CREATE TRIGGER wal_test_trigger
2392+
AFTER INSERT OR UPDATE OR DELETE ON wal_test
2393+
FOR EACH ROW
2394+
EXECUTE FUNCTION wal_test_trigger_func()
2395+
""",
2396+
[]
2397+
)
2398+
2399+
GenServer.stop(db_conn)
2400+
2401+
on_exit(fn ->
2402+
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
2403+
2404+
Postgrex.query!(db_conn, "DROP TABLE IF EXISTS wal_test CASCADE", [])
2405+
end)
2406+
2407+
%{topic: topic}
2408+
end
2409+
2410+
test "track PID changes during WAL bloat creation", %{tenant: tenant, topic: topic} do
2411+
{socket, _} = get_connection(tenant, "authenticated")
2412+
config = %{broadcast: %{self: true}, private: false}
2413+
full_topic = "realtime:#{topic}"
2414+
2415+
active_slot_query =
2416+
"SELECT active_pid FROM pg_replication_slots where active_pid is not null and slot_name = 'supabase_realtime_messages_replication_slot_'"
2417+
2418+
WebsocketClient.join(socket, full_topic, %{config: config})
2419+
2420+
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
2421+
assert_receive %Message{event: "presence_state"}, 500
2422+
2423+
assert Connect.ready?(tenant.external_id)
2424+
2425+
{:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
2426+
2427+
original_connect_pid = Connect.whereis(tenant.external_id)
2428+
original_replication_pid = ReplicationConnection.whereis(tenant.external_id)
2429+
%{rows: [[original_db_pid]]} = Postgrex.query!(db_conn, active_slot_query, [])
2430+
2431+
tasks =
2432+
for _ <- 1..5 do
2433+
Task.async(fn ->
2434+
{:ok, bloat_conn} = Database.connect(tenant, "realtime_bloat", :stop)
2435+
2436+
Postgrex.transaction(bloat_conn, fn conn ->
2437+
Postgrex.query(conn, "INSERT INTO wal_test SELECT generate_series(1, 100000), repeat('x', 2000)", [])
2438+
{:error, "test"}
2439+
end)
2440+
2441+
Process.exit(bloat_conn, :normal)
2442+
end)
2443+
end
2444+
2445+
Task.await_many(tasks, 20000)
2446+
2447+
# Kill all pending transactions still running
2448+
Postgrex.query!(
2449+
db_conn,
2450+
"SELECT pg_terminate_backend(pid) from pg_stat_activity where application_name='realtime_bloat'",
2451+
[]
2452+
)
2453+
2454+
# Does it recover?
2455+
assert Connect.ready?(tenant.external_id)
2456+
{:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
2457+
Process.sleep(1000)
2458+
%{rows: [[new_db_pid]]} = Postgrex.query!(db_conn, active_slot_query, [])
2459+
2460+
assert new_db_pid != original_db_pid
2461+
assert ^original_connect_pid = Connect.whereis(tenant.external_id)
2462+
assert original_replication_pid != ReplicationConnection.whereis(tenant.external_id)
2463+
2464+
# Check if socket is still connected
2465+
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
2466+
WebsocketClient.send_event(socket, full_topic, "broadcast", payload)
2467+
assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^full_topic}, 500
2468+
2469+
# Check if we are receiving the message from replication connection
2470+
Postgrex.query!(db_conn, "INSERT INTO wal_test VALUES (1, 'test')", [])
2471+
2472+
assert_receive %Phoenix.Socket.Message{
2473+
event: "broadcast",
2474+
payload: %{
2475+
"event" => "test",
2476+
"payload" => %{"value" => "test"},
2477+
"type" => "broadcast"
2478+
},
2479+
join_ref: nil,
2480+
ref: nil,
2481+
topic: ^full_topic
2482+
},
2483+
5000
2484+
end
2485+
end
2486+
23572487
defp mode(%{mode: :distributed}) do
23582488
tenant = Api.get_tenant_by_external_id("dev_tenant")
23592489

test/realtime/tenants/connect_test.exs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,13 @@ defmodule Realtime.Tenants.ConnectTest do
348348
assert replication_connection_before == replication_connection_after
349349
end
350350

351-
test "on replication connection postgres pid being stopped, also kills the Connect module", %{tenant: tenant} do
351+
test "on replication connection postgres pid being stopped, Connect module recovers it", %{tenant: tenant} do
352352
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
353353
assert Connect.ready?(tenant.external_id)
354354

355355
replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
356+
Process.monitor(replication_connection_pid)
357+
356358
assert Process.alive?(replication_connection_pid)
357359
pid = Connect.whereis(tenant.external_id)
358360

@@ -362,21 +364,33 @@ defmodule Realtime.Tenants.ConnectTest do
362364
[]
363365
)
364366

365-
assert_process_down(replication_connection_pid)
366-
assert_process_down(pid)
367+
assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}
368+
369+
Process.sleep(1500)
370+
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
371+
372+
assert replication_connection_pid != new_replication_connection_pid
373+
assert Process.alive?(new_replication_connection_pid)
374+
assert Process.alive?(pid)
367375
end
368376

369-
test "on replication connection exit, also kills the Connect module", %{tenant: tenant} do
377+
test "on replication connection exit, Connect module recovers it", %{tenant: tenant} do
370378
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
371379
assert Connect.ready?(tenant.external_id)
372380

373381
replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
382+
Process.monitor(replication_connection_pid)
374383
assert Process.alive?(replication_connection_pid)
375384
pid = Connect.whereis(tenant.external_id)
376385
Process.exit(replication_connection_pid, :kill)
386+
assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}
377387

378-
assert_process_down(replication_connection_pid)
379-
assert_process_down(pid)
388+
Process.sleep(1500)
389+
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
390+
391+
assert replication_connection_pid != new_replication_connection_pid
392+
assert Process.alive?(new_replication_connection_pid)
393+
assert Process.alive?(pid)
380394
end
381395

382396
test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do

test/realtime/tenants/replication_connection_test.exs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,26 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
331331

332332
assert {:error, :max_wal_senders_reached} = ReplicationConnection.start(tenant, self())
333333
end
334+
335+
test "handles WAL pressure gracefully", %{tenant: tenant} do
336+
{:ok, replication_pid} = ReplicationConnection.start(tenant, self())
337+
338+
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
339+
on_exit(fn -> Process.exit(conn, :normal) end)
340+
341+
large_payload = String.duplicate("x", 10 * 1024 * 1024)
342+
343+
for i <- 1..5 do
344+
message_fixture_with_conn(tenant, conn, %{
345+
"topic" => "stress_#{i}",
346+
"private" => true,
347+
"event" => "INSERT",
348+
"payload" => %{"data" => large_payload}
349+
})
350+
end
351+
352+
assert Process.alive?(replication_pid)
353+
end
334354
end
335355

336356
describe "whereis/1" do
@@ -409,4 +429,20 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
409429
ref = Process.monitor(pid)
410430
assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout
411431
end
432+
433+
defp message_fixture_with_conn(_tenant, conn, override) do
434+
create_attrs = %{
435+
"topic" => random_string(),
436+
"extension" => "broadcast"
437+
}
438+
439+
override = override |> Enum.map(fn {k, v} -> {"#{k}", v} end) |> Map.new()
440+
441+
{:ok, message} =
442+
create_attrs
443+
|> Map.merge(override)
444+
|> TenantConnection.create_message(conn)
445+
446+
message
447+
end
412448
end

test/support/containers.ex

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,13 @@ defmodule Containers do
267267
@image,
268268
"postgres",
269269
"-c",
270-
"config_file=/etc/postgresql/postgresql.conf"
270+
"config_file=/etc/postgresql/postgresql.conf",
271+
"-c",
272+
"wal_keep_size=32MB",
273+
"-c",
274+
"max_wal_size=32MB",
275+
"-c",
276+
"max_slot_wal_keep_size=32MB"
271277
])
272278
end
273279
end

0 commit comments

Comments
 (0)