Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ This is the list of operational codes that can help you understand your deployme
| RealtimeNodeDisconnected | Realtime is a distributed application and this means that one the system is unable to communicate with one of the distributed nodes |
| MigrationsFailedToRun | Error when running the migrations against the Tenant database that are required by Realtime |
| StartReplicationFailed | Error when starting the replication and listening of errors for database broadcasting |
| ReplicationConnectionTimeout | Replication connection timed out during initialization |
| ReplicationMaxWalSendersReached | Maximum number of WAL senders reached in tenant database, check how to increase this value in this [link](https://supabase.com/docs/guides/database/custom-postgres-config#cli-configurable-settings) |
| MigrationCheckFailed | Check to see if we require to run migrations fails |
| PartitionCreationFailed | Error when creating partitions for realtime.messages |
Expand Down
51 changes: 40 additions & 11 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,29 @@ defmodule Realtime.Tenants.Connect do
alias Realtime.Tenants.Rebalancer
alias Realtime.Tenants.ReplicationConnection
alias Realtime.UsersCounter
alias DBConnection.Backoff

@rpc_timeout_default 30_000
@check_connected_user_interval_default 50_000
@connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]
@type t :: %__MODULE__{
tenant_id: binary(),
db_conn_reference: reference(),
db_conn_pid: pid(),
replication_connection_pid: pid(),
replication_connection_reference: reference(),
backoff: Backoff.t(),
check_connected_user_interval: non_neg_integer(),
connected_users_bucket: list(non_neg_integer()),
check_connect_region_interval: non_neg_integer()
}

defstruct tenant_id: nil,
db_conn_reference: nil,
db_conn_pid: nil,
replication_connection_pid: nil,
replication_connection_reference: nil,
backoff: nil,
check_connected_user_interval: nil,
connected_users_bucket: [1],
check_connect_region_interval: nil
Expand Down Expand Up @@ -215,7 +228,8 @@ defmodule Realtime.Tenants.Connect do
state = %__MODULE__{
tenant_id: tenant_id,
check_connected_user_interval: check_connected_user_interval,
check_connect_region_interval: check_connect_region_interval
check_connect_region_interval: check_connect_region_interval,
backoff: Backoff.new(min: :timer.seconds(1), max: :timer.seconds(15), type: :rand_exp)
}

opts = Keyword.put(opts, :name, {:via, :syn, name})
Expand Down Expand Up @@ -350,29 +364,40 @@ defmodule Realtime.Tenants.Connect do
{:stop, :shutdown, state}
end

@replication_recovery_backoff 1000

# Handle replication connection termination
def handle_info(
{:DOWN, replication_connection_reference, _, _, _},
%{replication_connection_reference: replication_connection_reference} = state
) do
%{backoff: backoff} = state
log_warning("ReplicationConnectionDown", "Replication connection has been terminated")
Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil}
{timeout, backoff} = Backoff.backoff(backoff)
Process.send_after(self(), :recover_replication_connection, timeout)
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil, backoff: backoff}
{:noreply, state}
end

@replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'"
def handle_info(:recover_replication_connection, state) do
with %{num_rows: 0} <- Postgrex.query!(state.db_conn_pid, @replication_connection_query, []),
%{backoff: backoff, db_conn_pid: db_conn_pid} = state

with %{num_rows: 0} <- Postgrex.query!(db_conn_pid, @replication_connection_query, []),
{:ok, state} <- start_replication_connection(state) do
{:noreply, state}
{:noreply, %{state | backoff: Backoff.reset(backoff)}}
else
_ ->
log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed")
Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
{:noreply, state}
%{num_rows: _} ->
{:noreply, %{state | backoff: Backoff.reset(backoff)}}

{:error, error} ->
{timeout, backoff} = Backoff.backoff(backoff)

log_error(
"ReplicationConnectionRecoveryFailed",
"Replication connection recovery failed, next retry in #{timeout}ms : #{inspect(error)}"
)

Process.send_after(self(), :recover_replication_connection, timeout)
{:noreply, %{state | backoff: backoff}}
end
end

Expand Down Expand Up @@ -451,6 +476,10 @@ defmodule Realtime.Tenants.Connect do
log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders")
{:error, state}

{:error, :replication_connection_timeout} ->
log_error("ReplicationConnectionTimeout", "Replication connection timed out during initialization")
{:error, state}

{:error, error} ->
log_error("StartReplicationFailed", error)
{:error, state}
Expand Down
3 changes: 3 additions & 0 deletions lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ defmodule Realtime.Tenants.ReplicationConnection do
{:error, %Postgrex.Error{postgres: %{pg_code: pg_code}}} when pg_code in ~w(53300 53400) ->
{:error, :max_wal_senders_reached}

{:error, :timeout} ->
{:error, :replication_connection_timeout}

error ->
error
end
Expand Down
35 changes: 31 additions & 4 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,14 @@

assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}

Process.sleep(1500)
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)

assert replication_connection_pid != new_replication_connection_pid
assert Process.alive?(new_replication_connection_pid)
assert Process.alive?(pid)
end

test "on replication connection exit, Connect module recovers it", %{tenant: tenant} do

Check failure on line 494 in test/realtime/tenants/connect_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test lookup_or_start_connection/1 on replication connection exit, Connect module recovers it (Realtime.Tenants.ConnectTest)
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert Connect.ready?(tenant.external_id)

Expand All @@ -503,14 +502,28 @@
Process.exit(replication_connection_pid, :kill)
assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}

Process.sleep(1500)
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)

assert replication_connection_pid != new_replication_connection_pid
assert Process.alive?(new_replication_connection_pid)
assert Process.alive?(pid)
end

test "handles replication connection timeout by logging and shutting down", %{tenant: tenant} do
expect(ReplicationConnection, :start, fn _tenant, _pid ->
{:error, :replication_connection_timeout}
end)

log =
capture_log(fn ->
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert_process_down(db_conn)
end)

assert log =~ "ReplicationConnectionTimeout"
assert log =~ "Replication connection timed out during initialization"
end

test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do
opts = tenant |> Database.from_tenant("realtime_test", :stop) |> Database.opts()

Expand Down Expand Up @@ -672,4 +685,18 @@

Realtime.Api.update_tenant_by_external_id(tenant.external_id, %{extensions: extensions})
end
defp assert_pid(call, attempts \\ 10)

defp assert_pid(_call, 0) do
raise "Timeout waiting for pid"
end

defp assert_pid(call, attempts) do
case call.() do
pid when is_pid(pid) -> pid
_ ->
Process.sleep(500)
assert_pid(call, attempts - 1)
end
end
end
2 changes: 1 addition & 1 deletion test/realtime/tenants/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
call_original(ReplicationConnection, :init, [arg])
end)

{:error, :timeout} = ReplicationConnection.start(tenant, self(), 100)
assert {:error, :replication_connection_timeout} = ReplicationConnection.start(tenant, self(), 100)
end

test "handle standby connections exceeds max_wal_senders", %{tenant: tenant} do
Expand Down
Loading