Skip to content

Commit 9cf77e5

Browse files
authored
fix: Properly handle reconnections in connection manager (#3289)
Fixes #3288 We see tons of timer errors for clauses not existing in deploys on cloud, see issue above. I've noticed that for connection pools, we only allow starting them if the pool pids are set to none, which is only ever true on the first try, so the reconnection logic never really worked for them. I've made it so that we initialise `pool_pids` with admin and snapshot and start the appropriate ones. I've also made it so that we only attempt reconnections if we fail during one of the setup steps of the process that actually failed, cause the logic seemed to basically retry the current step on _any_ connection error, so the pools might fail and it might retry the replication client. The assumption there was that things would only ever exit and reconnect during their connection step, but the reality is that they can exit with a connection error _at any point_ during the setup. I continue to believe that our connection manager is a liability right now because it's far from fully mapped out. I haven't added tests for this issue which is really annoying, but opening this PR regardless to move work on this issue. Feel free to contribute to it. edit: I've manually reproduced the error by sporadically killing the pools after they are ready and observing startup - this fix indeed works and if the pools crash after having become ready all of the connection manager is brought down but in a graceful way.
1 parent b4b5c3f commit 9cf77e5

File tree

2 files changed

+67
-31
lines changed

2 files changed

+67
-31
lines changed

.changeset/smart-icons-jam.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Properly handle reconnections during setup of connection manager

packages/sync-service/lib/electric/connection/manager.ex

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ defmodule Electric.Connection.Manager do
9595
:lock_connection_pg_backend_pid,
9696
# Timer reference for the periodic lock status check
9797
:lock_connection_timer,
98-
# PIDs of the database connection pools
99-
:pool_pids,
10098
# Backoff term used for reconnection with exponential back-off
10199
:connection_backoff,
102100
# PostgreSQL server version
@@ -120,6 +118,8 @@ defmodule Electric.Connection.Manager do
120118
:expiry_batch_size,
121119
:persistent_kv,
122120
:purge_all_shapes?,
121+
# PIDs of the database connection pools
122+
pool_pids: %{admin: nil, snapshot: nil},
123123
validated_connection_opts: %{replication: nil, pool: nil},
124124
drop_slot_requested: false
125125
]
@@ -442,39 +442,38 @@ defmodule Electric.Connection.Manager do
442442
%State{
443443
current_phase: :connection_setup,
444444
current_step: {:start_connection_pool, _},
445-
pool_pids: nil
445+
pool_pids: pool_pids
446446
} = state
447-
) do
447+
)
448+
when is_nil(pool_pids.admin) or is_nil(pool_pids.snapshot) do
448449
Logger.debug("Starting connection pool for stack #{state.stack_id}")
449450

450451
case validate_connection(pooled_connection_opts(state), :pool, state) do
451452
{:ok, conn_opts, state} ->
452453
pool_sizes = pool_sizes(Keyword.get(state.pool_opts, :pool_size, 2))
453454

454-
{:ok, snapshot_pool_pid} =
455-
Electric.Connection.Manager.Pool.start_link(
456-
stack_id: state.stack_id,
457-
role: :snapshot,
458-
connection_manager: self(),
459-
pool_opts: Keyword.put(state.pool_opts, :pool_size, pool_sizes.snapshot),
460-
conn_opts: conn_opts
461-
)
462-
463-
{:ok, admin_pool_pid} =
464-
Electric.Connection.Manager.Pool.start_link(
465-
stack_id: state.stack_id,
466-
role: :admin,
467-
connection_manager: self(),
468-
pool_opts: Keyword.put(state.pool_opts, :pool_size, pool_sizes.admin),
469-
conn_opts: conn_opts
470-
)
471-
472-
state = %{
473-
state
474-
| pool_pids: %{admin: {admin_pool_pid, false}, snapshot: {snapshot_pool_pid, false}},
475-
current_step: {:start_connection_pool, :connecting}
476-
}
477-
455+
state =
456+
[:snapshot, :admin]
457+
|> Enum.filter(fn role -> is_nil(state.pool_pids[role]) end)
458+
|> Enum.reduce(state, fn pool_role, state ->
459+
pool_size = Map.fetch!(pool_sizes, pool_role)
460+
461+
{:ok, pool_pid} =
462+
Electric.Connection.Manager.Pool.start_link(
463+
stack_id: state.stack_id,
464+
role: pool_role,
465+
connection_manager: self(),
466+
pool_opts: Keyword.put(state.pool_opts, :pool_size, pool_size),
467+
conn_opts: conn_opts
468+
)
469+
470+
%{
471+
state
472+
| pool_pids: Map.put(state.pool_pids, pool_role, {pool_pid, false})
473+
}
474+
end)
475+
476+
state = %{state | current_step: {:start_connection_pool, :connecting}}
478477
{:noreply, state}
479478

480479
# the ConnectionResolver process was killed, as part of the application
@@ -1019,7 +1018,7 @@ defmodule Electric.Connection.Manager do
10191018

10201019
with false <- drop_slot_and_restart(error, state),
10211020
false <- stop_if_fatal_error(error, state) do
1022-
if state.current_phase == :connection_setup do
1021+
if should_retry_connection?(state, pid_type) do
10231022
state = schedule_reconnection_after_error(error, pid_type, state)
10241023
{:noreply, state}
10251024
else
@@ -1180,23 +1179,25 @@ defmodule Electric.Connection.Manager do
11801179
defp schedule_reconnection(
11811180
step,
11821181
%State{
1183-
connection_backoff: {conn_backoff, _}
1182+
connection_backoff: {conn_backoff, tref}
11841183
} = state
11851184
) do
11861185
{time, conn_backoff} = ConnectionBackoff.fail(conn_backoff)
1186+
if is_reference(tref), do: :erlang.cancel_timer(tref)
11871187
tref = :erlang.start_timer(time, self(), {:retry_connection, step})
11881188
Logger.warning("Reconnecting in #{inspect(time)}ms")
11891189
%{state | connection_backoff: {conn_backoff, tref}}
11901190
end
11911191

11921192
defp mark_connection_succeeded(%State{connection_backoff: {conn_backoff, tref}} = state) do
1193+
if is_reference(tref), do: :erlang.cancel_timer(tref)
11931194
{total_retry_time, conn_backoff} = ConnectionBackoff.succeed(conn_backoff)
11941195

11951196
if total_retry_time > 0 do
11961197
Logger.info("Reconnection succeeded after #{inspect(total_retry_time)}ms")
11971198
end
11981199

1199-
%{state | connection_backoff: {conn_backoff, tref}}
1200+
%{state | connection_backoff: {conn_backoff, nil}}
12001201
end
12011202

12021203
defp replication_connection_opts(state),
@@ -1274,6 +1275,36 @@ defmodule Electric.Connection.Manager do
12741275
)
12751276
end
12761277

1278+
defp should_retry_connection?(
1279+
%State{
1280+
current_phase: :connection_setup,
1281+
current_step: {:start_lock_connection, _}
1282+
},
1283+
:lock_connection
1284+
),
1285+
do: true
1286+
1287+
defp should_retry_connection?(
1288+
%State{
1289+
current_phase: :connection_setup,
1290+
current_step: {:start_replication_client, _}
1291+
},
1292+
:replication_client
1293+
),
1294+
do: true
1295+
1296+
defp should_retry_connection?(
1297+
%State{
1298+
current_phase: :connection_setup,
1299+
current_step: {:start_connection_pool, _}
1300+
},
1301+
pid_type
1302+
)
1303+
when pid_type in [:pools, :admin_pool, :snapshot_pool],
1304+
do: true
1305+
1306+
defp should_retry_connection?(_state, _pid_type), do: false
1307+
12771308
defp nillify_pid(%State{lock_connection_pid: pid} = state, pid),
12781309
do: {:lock_connection, %{state | lock_connection_pid: nil}}
12791310

0 commit comments

Comments
 (0)