Skip to content

Commit d86d7ac

Browse files
authored
chore(sync-service): Add MonitoredCoreSupervisor (#3307)
Part of a series of PRs to separate the shape and connection subsystems. This PR moves the StatusMonitor further up the supervision tree so that it can supervise both subsystems. Currently the StatusMonitor is supervised by the Connection.Supervisor which means once the shape subsystem has been moved out of the Connection.Supervisor's descendants it will no longer be at the right level. This PR introduces the idea of the electric core - the shape and connection subsystems, supervised by the CoreSupervisor. There is also a MonitoredCoreSupervisor which supervises the CoreSupervisor along with the StatusMonitor with the the rest_for_one strategy which allows the StatusMonitor to stay consistent (everything it monitors is restarted if it dies, so there is no chance it will die and lose references to what it is monitoring). The supervision tree now looks like this: StackSupervisor - utility processes such as the EtsInspector that can be restarted indecently - MonitoredCoreSupervisor - StatusMonitor - CoreSupervisor - (to be added in a later PR) Replication.Supervisor (to later be renamed to ShapeSubsystemSupervisor) - Connection.Manager.Supervisor (to later be renamed to ConnectionSubsystemSupervisor)
1 parent 80cf009 commit d86d7ac

File tree

5 files changed

+70
-9
lines changed

5 files changed

+70
-9
lines changed

packages/sync-service/lib/electric/connection/supervisor.ex

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ defmodule Electric.Connection.Supervisor do
2222
whole OTP application shutting down.
2323
"""
2424

25-
# This supervisor is meant to be a child of Electric.StackSupervisor.
26-
#
2725
# The `restart: :transient, significant: true` combo allows for shutting the supervisor down
2826
# and signalling the parent supervisor to shut itself down as well if that one has
2927
# `:auto_shutdown` set to `:any_significant` or `:all_significant`.
@@ -63,15 +61,10 @@ defmodule Electric.Connection.Supervisor do
6361
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
6462

6563
children = [
66-
{Electric.StatusMonitor, stack_id: stack_id},
6764
{Electric.Connection.Restarter, stack_id: stack_id},
6865
{Electric.Connection.Manager.Supervisor, opts}
6966
]
7067

71-
# The :rest_for_one strategy is used here to ensure that if the StatusMonitor unexpectedly dies,
72-
# all subsequent child processes are also restarted. Since the StatusMonitor keeps track of the
73-
# statuses of the other children, losing it means losing that state. Restarting the other children
74-
# ensures they re-notify the StatusMonitor, allowing it to rebuild its internal state correctly.
7568
Supervisor.init(children, strategy: :rest_for_one)
7669
end
7770
end
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
defmodule Electric.CoreSupervisor do
2+
@moduledoc """
3+
A supervisor that starts the core components of the Electric system.
4+
This is divided into two subsystems:
5+
1. The connection subsystem (processes that may exit on a connection failure), started with Connection.Manager.Supervisor
6+
2. The shape subsystem (processes that are resilient to connection failures), started with Electric.Replication.Supervisor
7+
8+
NOTE: Currently the ShapeSubsystem is not directly supervised here, but this change with happen in an upcoming PR.
9+
"""
10+
11+
use Supervisor, restart: :transient, significant: true
12+
13+
def start_link(opts) do
14+
Supervisor.start_link(__MODULE__, opts)
15+
end
16+
17+
@impl true
18+
def init(opts) do
19+
stack_id = Keyword.fetch!(opts, :stack_id)
20+
21+
Process.set_label({:core_supervisor, stack_id})
22+
Logger.metadata(stack_id: stack_id)
23+
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
24+
25+
connection_manager_opts = Keyword.fetch!(opts, :connection_manager_opts)
26+
27+
children = [
28+
{Electric.Connection.Supervisor, connection_manager_opts}
29+
]
30+
31+
Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant)
32+
end
33+
end
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
defmodule Electric.MonitoredCoreSupervisor do
2+
@moduledoc """
3+
A supervisor that starts and monitors the core components of the Electric system.
4+
It needs to be a separate supervisor from the CoreSupervisor because of the way
5+
the StatusMonitor works (see the rest_for_one comments below).
6+
"""
7+
8+
use Supervisor, restart: :transient, significant: true
9+
10+
def start_link(opts) do
11+
Supervisor.start_link(__MODULE__, opts)
12+
end
13+
14+
@impl true
15+
def init(opts) do
16+
stack_id = Keyword.fetch!(opts, :stack_id)
17+
18+
Process.set_label({:monitored_core_supervisor, stack_id})
19+
Logger.metadata(stack_id: stack_id)
20+
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
21+
22+
children = [
23+
{Electric.StatusMonitor, stack_id: stack_id},
24+
{Electric.CoreSupervisor, opts}
25+
]
26+
27+
# The :rest_for_one strategy is used here to ensure that if the StatusMonitor unexpectedly dies,
28+
# all the processes it is monitoring are also restarted. Since the StatusMonitor keeps track of the
29+
# statuses of the other processes, losing it means losing that state. Restarting the other children
30+
# ensures they re-notify the StatusMonitor, allowing it to rebuild its internal state correctly.
31+
Supervisor.init(children, strategy: :rest_for_one, auto_shutdown: :any_significant)
32+
end
33+
end

packages/sync-service/lib/electric/stack_supervisor.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ defmodule Electric.StackSupervisor do
310310
shape_log_collector =
311311
Electric.Replication.ShapeLogCollector.name(stack_id)
312312

313-
new_connection_manager_opts = [
313+
connection_manager_opts = [
314314
stack_id: stack_id,
315315
# Coming from the outside, need validation
316316
connection_opts: config.connection_opts,
@@ -374,7 +374,8 @@ defmodule Electric.StackSupervisor do
374374
Keyword.take(shape_cache_opts, [:publication_manager])
375375
])},
376376
{Electric.ShapeCache.ShapeStatusOwner, [stack_id: stack_id, storage: storage]},
377-
{Electric.Connection.Supervisor, new_connection_manager_opts}
377+
{Electric.MonitoredCoreSupervisor,
378+
stack_id: stack_id, connection_manager_opts: connection_manager_opts}
378379
]
379380

380381
# Store the telemetry span attributes in the persistent term for this stack

packages/sync-service/test/electric/connection/manager_test.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ defmodule Electric.Connection.ConnectionManagerTest do
1111
setup [
1212
:with_unique_db,
1313
:with_stack_id_from_test,
14+
:with_status_monitor,
1415
:with_persistent_kv,
1516
:with_inspector,
1617
:with_slot_name_and_stream_id,

0 commit comments

Comments
 (0)