diff --git a/README.md b/README.md index fb9b6c8a4..fb8602929 100644 --- a/README.md +++ b/README.md @@ -259,6 +259,7 @@ This is the list of operational codes that can help you understand your deployme | RealtimeNodeDisconnected | Realtime is a distributed application and this means that one the system is unable to communicate with one of the distributed nodes | | MigrationsFailedToRun | Error when running the migrations against the Tenant database that are required by Realtime | | StartReplicationFailed | Error when starting the replication and listening of errors for database broadcasting | +| ReplicationConnectionTimeout | Replication connection timed out during initialization | | ReplicationMaxWalSendersReached | Maximum number of WAL senders reached in tenant database, check how to increase this value in this [link](https://supabase.com/docs/guides/database/custom-postgres-config#cli-configurable-settings) | | MigrationCheckFailed | Check to see if we require to run migrations fails | | PartitionCreationFailed | Error when creating partitions for realtime.messages | diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index ff0a00643..84bb9784a 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -24,16 +24,31 @@ defmodule Realtime.Tenants.Connect do alias Realtime.Tenants.Rebalancer alias Realtime.Tenants.ReplicationConnection alias Realtime.UsersCounter + alias DBConnection.Backoff @rpc_timeout_default 30_000 @check_connected_user_interval_default 50_000 @connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0] + @type t :: %__MODULE__{ + tenant_id: binary(), + db_conn_reference: reference(), + db_conn_pid: pid(), + replication_connection_pid: pid(), + replication_connection_reference: reference(), + backoff: Backoff.t(), + replication_connection_attempts: non_neg_integer(), + check_connected_user_interval: non_neg_integer(), + connected_users_bucket: list(non_neg_integer()), + check_connect_region_interval: non_neg_integer() + } defstruct tenant_id: nil, db_conn_reference: nil, db_conn_pid: nil, replication_connection_pid: nil, replication_connection_reference: nil, + backoff: nil, + replication_connection_attempts: 0, check_connected_user_interval: nil, connected_users_bucket: [1], check_connect_region_interval: nil @@ -215,7 +230,8 @@ defmodule Realtime.Tenants.Connect do state = %__MODULE__{ tenant_id: tenant_id, check_connected_user_interval: check_connected_user_interval, - check_connect_region_interval: check_connect_region_interval + check_connect_region_interval: check_connect_region_interval, + backoff: Backoff.new(min: :timer.seconds(1), max: :timer.seconds(15), type: :rand_exp) } opts = Keyword.put(opts, :name, {:via, :syn, name}) @@ -350,29 +366,47 @@ defmodule Realtime.Tenants.Connect do {:stop, :shutdown, state} end - @replication_recovery_backoff 1000 - # Handle replication connection termination def handle_info( {:DOWN, replication_connection_reference, _, _, _}, %{replication_connection_reference: replication_connection_reference} = state ) do + %{backoff: backoff} = state log_warning("ReplicationConnectionDown", "Replication connection has been terminated") - Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff) - state = %{state | replication_connection_pid: nil, replication_connection_reference: nil} + {timeout, backoff} = Backoff.backoff(backoff) + Process.send_after(self(), :recover_replication_connection, timeout) + state = %{state | replication_connection_pid: nil, replication_connection_reference: nil, backoff: backoff} {:noreply, state} end @replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'" + @max_replication_connection_attempts 60 + def handle_info( + :recover_replication_connection, + %{replication_connection_attempts: @max_replication_connection_attempts} = state + ) do + Logger.warning("Max replication connection attempts reached, terminating connection") + {:stop, :shutdown, state} + end + def handle_info(:recover_replication_connection, state) do - with %{num_rows: 0} <- Postgrex.query!(state.db_conn_pid, @replication_connection_query, []), + %{backoff: backoff, db_conn_pid: db_conn_pid, replication_connection_attempts: replication_connection_attempts} = + state + + with %{num_rows: 0} <- Postgrex.query!(db_conn_pid, @replication_connection_query, []), {:ok, state} <- start_replication_connection(state) do - {:noreply, state} + {:noreply, %{state | backoff: Backoff.reset(backoff), replication_connection_attempts: 0}} else - _ -> - log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed") - Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff) - {:noreply, state} + {:error, error} -> + {timeout, backoff} = Backoff.backoff(backoff) + + log_error( + "ReplicationConnectionRecoveryFailed", + "Replication connection recovery failed, next retry in #{timeout}ms : #{inspect(error)}" + ) + + Process.send_after(self(), :recover_replication_connection, timeout) + {:noreply, %{state | backoff: backoff, replication_connection_attempts: replication_connection_attempts + 1}} end end @@ -451,6 +485,10 @@ defmodule Realtime.Tenants.Connect do log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders") {:error, state} + {:error, :replication_connection_timeout} -> + log_error("ReplicationConnectionTimeout", "Replication connection timed out during initialization") + {:error, state} + {:error, error} -> log_error("StartReplicationFailed", error) {:error, state} diff --git a/lib/realtime/tenants/replication_connection.ex b/lib/realtime/tenants/replication_connection.ex index 26650ad5e..ed19f70a0 100644 --- a/lib/realtime/tenants/replication_connection.ex +++ b/lib/realtime/tenants/replication_connection.ex @@ -115,6 +115,9 @@ defmodule Realtime.Tenants.ReplicationConnection do {:error, %Postgrex.Error{postgres: %{pg_code: pg_code}}} when pg_code in ~w(53300 53400) -> {:error, :max_wal_senders_reached} + {:error, :timeout} -> + {:error, :replication_connection_timeout} + error -> error end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 43f641dca..0d7811014 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -484,8 +484,7 @@ defmodule Realtime.Tenants.ConnectTest do assert_receive {:DOWN, _, :process, ^replication_connection_pid, _} - Process.sleep(1500) - new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id) + new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end) assert replication_connection_pid != new_replication_connection_pid assert Process.alive?(new_replication_connection_pid) @@ -503,14 +502,28 @@ defmodule Realtime.Tenants.ConnectTest do Process.exit(replication_connection_pid, :kill) assert_receive {:DOWN, _, :process, ^replication_connection_pid, _} - Process.sleep(1500) - new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id) + new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end) assert replication_connection_pid != new_replication_connection_pid assert Process.alive?(new_replication_connection_pid) assert Process.alive?(pid) end + test "handles replication connection timeout by logging and shutting down", %{tenant: tenant} do + expect(ReplicationConnection, :start, fn _tenant, _pid -> + {:error, :replication_connection_timeout} + end) + + log = + capture_log(fn -> + assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert_process_down(db_conn) + end) + + assert log =~ "ReplicationConnectionTimeout" + assert log =~ "Replication connection timed out during initialization" + end + test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do opts = tenant |> Database.from_tenant("realtime_test", :stop) |> Database.opts() @@ -672,4 +685,21 @@ defmodule Realtime.Tenants.ConnectTest do Realtime.Api.update_tenant_by_external_id(tenant.external_id, %{extensions: extensions}) end + + defp assert_pid(call, attempts \\ 10) + + defp assert_pid(_call, 0) do + raise "Timeout waiting for pid" + end + + defp assert_pid(call, attempts) do + case call.() do + pid when is_pid(pid) -> + pid + + _ -> + Process.sleep(500) + assert_pid(call, attempts - 1) + end + end end diff --git a/test/realtime/tenants/replication_connection_test.exs b/test/realtime/tenants/replication_connection_test.exs index 031f3cae6..5338c430e 100644 --- a/test/realtime/tenants/replication_connection_test.exs +++ b/test/realtime/tenants/replication_connection_test.exs @@ -516,7 +516,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do call_original(ReplicationConnection, :init, [arg]) end) - {:error, :timeout} = ReplicationConnection.start(tenant, self(), 100) + assert {:error, :replication_connection_timeout} = ReplicationConnection.start(tenant, self(), 100) end test "handle standby connections exceeds max_wal_senders", %{tenant: tenant} do