Skip to content

Commit a428ae0

Browse files
committed
move to use DBConnection.Backoff
1 parent c1cc03f commit a428ae0

File tree

3 files changed

+46
-26
lines changed

3 files changed

+46
-26
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ This is the list of operational codes that can help you understand your deployme
259259
| RealtimeNodeDisconnected | Realtime is a distributed application and this means that one the system is unable to communicate with one of the distributed nodes |
260260
| MigrationsFailedToRun | Error when running the migrations against the Tenant database that are required by Realtime |
261261
| StartReplicationFailed | Error when starting the replication and listening of errors for database broadcasting |
262+
| ReplicationConnectionTimeout | Replication connection timed out during initialization |
262263
| ReplicationMaxWalSendersReached | Maximum number of WAL senders reached in tenant database, check how to increase this value in this [link](https://supabase.com/docs/guides/database/custom-postgres-config#cli-configurable-settings) |
263264
| MigrationCheckFailed | Check to see if we require to run migrations fails |
264265
| PartitionCreationFailed | Error when creating partitions for realtime.messages |

lib/realtime/tenants/connect.ex

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,29 @@ defmodule Realtime.Tenants.Connect do
2424
alias Realtime.Tenants.Rebalancer
2525
alias Realtime.Tenants.ReplicationConnection
2626
alias Realtime.UsersCounter
27+
alias DBConnection.Backoff
2728

2829
@rpc_timeout_default 30_000
2930
@check_connected_user_interval_default 50_000
3031
@connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]
32+
@type t :: %__MODULE__{
33+
tenant_id: binary(),
34+
db_conn_reference: reference(),
35+
db_conn_pid: pid(),
36+
replication_connection_pid: pid(),
37+
replication_connection_reference: reference(),
38+
backoff: Backoff.t(),
39+
check_connected_user_interval: non_neg_integer(),
40+
connected_users_bucket: list(non_neg_integer()),
41+
check_connect_region_interval: non_neg_integer()
42+
}
3143

3244
defstruct tenant_id: nil,
3345
db_conn_reference: nil,
3446
db_conn_pid: nil,
3547
replication_connection_pid: nil,
3648
replication_connection_reference: nil,
37-
replication_recovery_attempts: 0,
49+
backoff: nil,
3850
check_connected_user_interval: nil,
3951
connected_users_bucket: [1],
4052
check_connect_region_interval: nil
@@ -216,7 +228,8 @@ defmodule Realtime.Tenants.Connect do
216228
state = %__MODULE__{
217229
tenant_id: tenant_id,
218230
check_connected_user_interval: check_connected_user_interval,
219-
check_connect_region_interval: check_connect_region_interval
231+
check_connect_region_interval: check_connect_region_interval,
232+
backoff: Backoff.new(min: :timer.seconds(1), max: :timer.seconds(15), type: :rand_exp)
220233
}
221234

222235
opts = Keyword.put(opts, :name, {:via, :syn, name})
@@ -351,40 +364,40 @@ defmodule Realtime.Tenants.Connect do
351364
{:stop, :shutdown, state}
352365
end
353366

354-
@replication_recovery_backoff_min :timer.seconds(1)
355-
356367
# Handle replication connection termination
357368
def handle_info(
358369
{:DOWN, replication_connection_reference, _, _, _},
359370
%{replication_connection_reference: replication_connection_reference} = state
360371
) do
372+
%{backoff: backoff} = state
361373
log_warning("ReplicationConnectionDown", "Replication connection has been terminated")
362-
backoff = calculate_recovery_backoff(state.replication_recovery_attempts)
363-
Process.send_after(self(), :recover_replication_connection, backoff)
364-
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil}
374+
{timeout, backoff} = Backoff.backoff(backoff)
375+
Process.send_after(self(), :recover_replication_connection, timeout)
376+
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil, backoff: backoff}
365377
{:noreply, state}
366378
end
367379

368380
@replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'"
369381
def handle_info(:recover_replication_connection, state) do
370-
with %{num_rows: 0} <- Postgrex.query!(state.db_conn_pid, @replication_connection_query, []),
382+
%{backoff: backoff, db_conn_pid: db_conn_pid} = state
383+
384+
with %{num_rows: 0} <- Postgrex.query!(db_conn_pid, @replication_connection_query, []),
371385
{:ok, state} <- start_replication_connection(state) do
372-
{:noreply, %{state | replication_recovery_attempts: 0}}
386+
{:noreply, %{state | backoff: Backoff.reset(backoff)}}
373387
else
374-
%{num_rows: 0} ->
375-
{:noreply, %{state | replication_recovery_attempts: 0}}
388+
%{num_rows: _} ->
389+
{:noreply, %{state | backoff: Backoff.reset(backoff)}}
376390

377391
{:error, error} ->
378-
attempts = state.replication_recovery_attempts + 1
379-
backoff = calculate_recovery_backoff(attempts)
392+
{timeout, backoff} = Backoff.backoff(backoff)
380393

381394
log_error(
382395
"ReplicationConnectionRecoveryFailed",
383-
"Replication connection recovery failed, attempt #{attempts}, next retry in #{backoff}ms : #{inspect(error)}"
396+
"Replication connection recovery failed, next retry in #{timeout}ms : #{inspect(error)}"
384397
)
385398

386-
Process.send_after(self(), :recover_replication_connection, backoff)
387-
{:noreply, %{state | replication_recovery_attempts: attempts}}
399+
Process.send_after(self(), :recover_replication_connection, timeout)
400+
{:noreply, %{state | backoff: backoff}}
388401
end
389402
end
390403

@@ -476,10 +489,4 @@ defmodule Realtime.Tenants.Connect do
476489
log_error("StartReplicationFailed", error)
477490
{:error, state}
478491
end
479-
480-
defp calculate_recovery_backoff(attempts) do
481-
max_backoff = @replication_recovery_backoff_min * Integer.pow(2, attempts)
482-
half = div(max_backoff, 2)
483-
half + Enum.random(0..half)
484-
end
485492
end

test/realtime/tenants/connect_test.exs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,7 @@ defmodule Realtime.Tenants.ConnectTest do
484484

485485
assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}
486486

487-
Process.sleep(1500)
488-
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
487+
new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)
489488

490489
assert replication_connection_pid != new_replication_connection_pid
491490
assert Process.alive?(new_replication_connection_pid)
@@ -503,8 +502,7 @@ defmodule Realtime.Tenants.ConnectTest do
503502
Process.exit(replication_connection_pid, :kill)
504503
assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}
505504

506-
Process.sleep(1500)
507-
new_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
505+
new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)
508506

509507
assert replication_connection_pid != new_replication_connection_pid
510508
assert Process.alive?(new_replication_connection_pid)
@@ -687,4 +685,18 @@ defmodule Realtime.Tenants.ConnectTest do
687685

688686
Realtime.Api.update_tenant_by_external_id(tenant.external_id, %{extensions: extensions})
689687
end
688+
defp assert_pid(call, attempts \\ 10)
689+
690+
defp assert_pid(_call, 0) do
691+
raise "Timeout waiting for pid"
692+
end
693+
694+
defp assert_pid(call, attempts) do
695+
case call.() do
696+
pid when is_pid(pid) -> pid
697+
_ ->
698+
Process.sleep(500)
699+
assert_pid(call, attempts - 1)
700+
end
701+
end
690702
end

0 commit comments

Comments
 (0)