Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ This is the list of operational codes that can help you understand your deployme
| RealtimeNodeDisconnected | Realtime is a distributed application and this means that one the system is unable to communicate with one of the distributed nodes |
| MigrationsFailedToRun | Error when running the migrations against the Tenant database that are required by Realtime |
| StartReplicationFailed | Error when starting the replication and listening of errors for database broadcasting |
| ReplicationConnectionTimeout | Replication connection timed out during initialization |
| ReplicationMaxWalSendersReached | Maximum number of WAL senders reached in tenant database, check how to increase this value in this [link](https://supabase.com/docs/guides/database/custom-postgres-config#cli-configurable-settings) |
| MigrationCheckFailed | Check to see if we require to run migrations fails |
| PartitionCreationFailed | Error when creating partitions for realtime.messages |
Expand Down
60 changes: 49 additions & 11 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,31 @@ defmodule Realtime.Tenants.Connect do
alias Realtime.Tenants.Rebalancer
alias Realtime.Tenants.ReplicationConnection
alias Realtime.UsersCounter
alias DBConnection.Backoff

@rpc_timeout_default 30_000
@check_connected_user_interval_default 50_000
@connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]
@type t :: %__MODULE__{
tenant_id: binary(),
db_conn_reference: reference(),
db_conn_pid: pid(),
replication_connection_pid: pid(),
replication_connection_reference: reference(),
backoff: Backoff.t(),
replication_connection_attempts: non_neg_integer(),
check_connected_user_interval: non_neg_integer(),
connected_users_bucket: list(non_neg_integer()),
check_connect_region_interval: non_neg_integer()
}

defstruct tenant_id: nil,
db_conn_reference: nil,
db_conn_pid: nil,
replication_connection_pid: nil,
replication_connection_reference: nil,
backoff: nil,
replication_connection_attempts: 0,
check_connected_user_interval: nil,
connected_users_bucket: [1],
check_connect_region_interval: nil
Expand Down Expand Up @@ -215,7 +230,8 @@ defmodule Realtime.Tenants.Connect do
state = %__MODULE__{
tenant_id: tenant_id,
check_connected_user_interval: check_connected_user_interval,
check_connect_region_interval: check_connect_region_interval
check_connect_region_interval: check_connect_region_interval,
backoff: Backoff.new(min: :timer.seconds(1), max: :timer.seconds(15), type: :rand_exp)
}

opts = Keyword.put(opts, :name, {:via, :syn, name})
Expand Down Expand Up @@ -350,29 +366,47 @@ defmodule Realtime.Tenants.Connect do
{:stop, :shutdown, state}
end

@replication_recovery_backoff 1000

# Handle replication connection termination
def handle_info(
{:DOWN, replication_connection_reference, _, _, _},
%{replication_connection_reference: replication_connection_reference} = state
) do
%{backoff: backoff} = state
log_warning("ReplicationConnectionDown", "Replication connection has been terminated")
Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil}
{timeout, backoff} = Backoff.backoff(backoff)
Process.send_after(self(), :recover_replication_connection, timeout)
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil, backoff: backoff}
{:noreply, state}
end

@replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'"
@max_replication_connection_attempts 60
def handle_info(
:recover_replication_connection,
%{replication_connection_attempts: @max_replication_connection_attempts} = state
) do
Logger.warning("Max replication connection attempts reached, terminating connection")
{:stop, :shutdown, state}
end

def handle_info(:recover_replication_connection, state) do
with %{num_rows: 0} <- Postgrex.query!(state.db_conn_pid, @replication_connection_query, []),
%{backoff: backoff, db_conn_pid: db_conn_pid, replication_connection_attempts: replication_connection_attempts} =
state

with %{num_rows: 0} <- Postgrex.query!(db_conn_pid, @replication_connection_query, []),
{:ok, state} <- start_replication_connection(state) do
{:noreply, state}
{:noreply, %{state | backoff: Backoff.reset(backoff), replication_connection_attempts: 0}}
else
_ ->
log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed")
Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
{:noreply, state}
{:error, error} ->
{timeout, backoff} = Backoff.backoff(backoff)

log_error(
"ReplicationConnectionRecoveryFailed",
"Replication connection recovery failed, next retry in #{timeout}ms : #{inspect(error)}"
)

Process.send_after(self(), :recover_replication_connection, timeout)
{:noreply, %{state | backoff: backoff, replication_connection_attempts: replication_connection_attempts + 1}}
end
end

Expand Down Expand Up @@ -451,6 +485,10 @@ defmodule Realtime.Tenants.Connect do
log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders")
{:error, state}

{:error, :replication_connection_timeout} ->
log_error("ReplicationConnectionTimeout", "Replication connection timed out during initialization")
{:error, state}

{:error, error} ->
log_error("StartReplicationFailed", error)
{:error, state}
Expand Down
3 changes: 3 additions & 0 deletions lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ defmodule Realtime.Tenants.ReplicationConnection do
{:error, %Postgrex.Error{postgres: %{pg_code: pg_code}}} when pg_code in ~w(53300 53400) ->
{:error, :max_wal_senders_reached}

{:error, :timeout} ->
{:error, :replication_connection_timeout}

error ->
error
end
Expand Down
38 changes: 34 additions & 4 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,7 @@ defmodule Realtime.Tenants.ConnectTest do

assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}

Process.sleep(1500)
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)

assert replication_connection_pid != new_replication_connection_pid
assert Process.alive?(new_replication_connection_pid)
Expand All @@ -503,14 +502,28 @@ defmodule Realtime.Tenants.ConnectTest do
Process.exit(replication_connection_pid, :kill)
assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}

Process.sleep(1500)
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)

assert replication_connection_pid != new_replication_connection_pid
assert Process.alive?(new_replication_connection_pid)
assert Process.alive?(pid)
end

test "handles replication connection timeout by logging and shutting down", %{tenant: tenant} do
expect(ReplicationConnection, :start, fn _tenant, _pid ->
{:error, :replication_connection_timeout}
end)

log =
capture_log(fn ->
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert_process_down(db_conn)
end)

assert log =~ "ReplicationConnectionTimeout"
assert log =~ "Replication connection timed out during initialization"
end

test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do
opts = tenant |> Database.from_tenant("realtime_test", :stop) |> Database.opts()

Expand Down Expand Up @@ -672,4 +685,21 @@ defmodule Realtime.Tenants.ConnectTest do

Realtime.Api.update_tenant_by_external_id(tenant.external_id, %{extensions: extensions})
end

defp assert_pid(call, attempts \\ 10)

defp assert_pid(_call, 0) do
raise "Timeout waiting for pid"
end

defp assert_pid(call, attempts) do
case call.() do
pid when is_pid(pid) ->
pid

_ ->
Process.sleep(500)
assert_pid(call, attempts - 1)
end
end
end
2 changes: 1 addition & 1 deletion test/realtime/tenants/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
call_original(ReplicationConnection, :init, [arg])
end)

{:error, :timeout} = ReplicationConnection.start(tenant, self(), 100)
assert {:error, :replication_connection_timeout} = ReplicationConnection.start(tenant, self(), 100)
end

test "handle standby connections exceeds max_wal_senders", %{tenant: tenant} do
Expand Down
Loading