Skip to content

Commit 562d290

Browse files
authored
feat(sync-service): repopulate log collectors state from the status ets (#3242)
Don't rely on consumer processes existing in order to initialise the shape log collector -- all the information we need is in the shape status ets. We still need the consumer processes to register in order to synchronise the filters etc with the existing shapes but we only need to do that if the consumer was not in the original state. This is handled by having an extra `action` param to the subscribe call. Shape consumers being restored don't add their shape to the shape log collector's internal filters while those being newly created do.
1 parent 8eb1071 commit 562d290

File tree

8 files changed

+131
-28
lines changed

8 files changed

+131
-28
lines changed

.changeset/seven-ladybugs-shave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Restore ShapeLogCollector's state from the ShapeStatus ETS table at startup

packages/sync-service/lib/electric/replication/shape_log_collector.ex

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,17 @@ defmodule Electric.Replication.ShapeLogCollector do
7575
:ok = GenServer.call(server, {:relation_msg, rel, trace_context}, :infinity)
7676
end
7777

78-
def subscribe(server_ref, shape_handle, shape) do
79-
GenServer.call(server(server_ref), {:subscribe, shape_handle, shape})
78+
def subscribe(server_ref, shape_handle, shape, action) when action in [:restore, :create] do
79+
GenServer.call(server(server_ref), {:subscribe, shape_handle, shape, action})
8080
end
8181

8282
def notify_flushed(server_ref, shape_handle, offset) do
8383
GenServer.cast(server(server_ref), {:writer_flushed, shape_handle, offset})
8484
end
8585

8686
def init(opts) do
87+
activate_mocked_functions_from_test_process()
88+
8789
stack_id = opts.stack_id
8890

8991
Process.set_label({:shape_log_collector, stack_id})
@@ -124,7 +126,32 @@ defmodule Electric.Replication.ShapeLogCollector do
124126
)
125127
})
126128

127-
{:ok, state}
129+
{:ok, state, {:continue, :restore_shapes}}
130+
end
131+
132+
def handle_continue(:restore_shapes, state) do
133+
OpenTelemetry.with_span(
134+
"shape_log_collector.restore_shapes",
135+
[],
136+
state.stack_id,
137+
fn ->
138+
{partitions, filter, layers} =
139+
state.stack_id
140+
|> Electric.ShapeCache.ShapeStatus.list_shapes()
141+
|> Enum.reduce(
142+
{state.partitions, state.filter, state.dependency_layers},
143+
fn {shape_handle, shape}, {partitions, filter, layers} ->
144+
{
145+
Partitions.add_shape(partitions, shape_handle, shape),
146+
Filter.add_shape(filter, shape_handle, shape),
147+
DependencyLayers.add_dependency(layers, shape, shape_handle)
148+
}
149+
end
150+
)
151+
152+
{:noreply, %{state | partitions: partitions, filter: filter, dependency_layers: layers}}
153+
end
154+
)
128155
end
129156

130157
def handle_info({{:unsubscribe, shape_handle}, ref, :process, pid, _reason}, state) do
@@ -138,7 +165,7 @@ defmodule Electric.Replication.ShapeLogCollector do
138165
)
139166
end
140167

141-
def handle_call({:subscribe, shape_handle, shape}, {pid, _ref}, state) do
168+
def handle_call({:subscribe, shape_handle, shape, action}, {pid, _ref}, state) do
142169
OpenTelemetry.with_span(
143170
"shape_log_collector.subscribe",
144171
[shape_handle: shape_handle],
@@ -148,14 +175,23 @@ defmodule Electric.Replication.ShapeLogCollector do
148175
from = {pid, ref}
149176

150177
state =
151-
%{
152-
state
153-
| partitions: Partitions.add_shape(state.partitions, shape_handle, shape),
154-
filter: Filter.add_shape(state.filter, shape_handle, shape),
155-
pids_by_shape_handle: Map.put(state.pids_by_shape_handle, shape_handle, pid),
156-
dependency_layers:
157-
DependencyLayers.add_dependency(state.dependency_layers, shape, shape_handle)
158-
}
178+
case action do
179+
:restore ->
180+
# Once we move consumer monitoring out of this process,
181+
# subscribing with action :restore will be a no-op that we can
182+
# filter in the `subscribe/4` function
183+
state
184+
185+
:create ->
186+
%{
187+
state
188+
| partitions: Partitions.add_shape(state.partitions, shape_handle, shape),
189+
filter: Filter.add_shape(state.filter, shape_handle, shape),
190+
dependency_layers:
191+
DependencyLayers.add_dependency(state.dependency_layers, shape, shape_handle)
192+
}
193+
end
194+
|> Map.update!(:pids_by_shape_handle, &Map.put(&1, shape_handle, pid))
159195
|> Map.update!(:subscriptions, fn {count, set} ->
160196
{count + 1, MapSet.put(set, from)}
161197
end)
@@ -401,4 +437,12 @@ defmodule Electric.Replication.ShapeLogCollector do
401437
defp server(stack_id) when is_binary(stack_id), do: name(stack_id)
402438
defp server({:via, _, _} = name), do: name
403439
defp server(pid) when is_pid(pid), do: pid
440+
441+
if Mix.env() == :test do
442+
def activate_mocked_functions_from_test_process do
443+
Support.TestUtils.activate_mocked_functions_for_module(__MODULE__)
444+
end
445+
else
446+
def activate_mocked_functions_from_test_process, do: :noop
447+
end
404448
end

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ defmodule Electric.ShapeCache do
302302
end
303303

304304
defp start_and_recover_shape(shape_handle, shape, state) do
305-
case start_shape(shape_handle, shape, state) do
305+
case start_shape(shape_handle, shape, state, nil, :restore) do
306306
:ok ->
307307
consumer = Shapes.Consumer.name(state.stack_id, shape_handle)
308308
# This `initial_state` is a GenServer call, so we're blocked until consumer is ready
@@ -354,15 +354,15 @@ defmodule Electric.ShapeCache do
354354

355355
Logger.info("Creating new shape for #{inspect(shape)} with handle #{shape_handle}")
356356

357-
:ok = start_shape(shape_handle, shape, state, otel_ctx)
357+
:ok = start_shape(shape_handle, shape, state, otel_ctx, :create)
358358

359359
# In this branch of `if`, we're guaranteed to have a newly started shape, so we can be sure about it's
360360
# "latest offset" because it'll be in the snapshotting stage
361361
{shape_handle, LogOffset.last_before_real_offsets()}
362362
end
363363
end
364364

365-
defp start_shape(shape_handle, shape, state, otel_ctx \\ nil) do
365+
defp start_shape(shape_handle, shape, state, otel_ctx, action) do
366366
case Electric.Shapes.DynamicConsumerSupervisor.start_shape_consumer(
367367
state.consumer_supervisor,
368368
stack_id: state.stack_id,
@@ -376,7 +376,8 @@ defmodule Electric.ShapeCache do
376376
db_pool: state.db_pool,
377377
hibernate_after: state.shape_hibernate_after,
378378
otel_ctx: otel_ctx,
379-
snapshot_timeout_to_first_data: state.snapshot_timeout_to_first_data
379+
snapshot_timeout_to_first_data: state.snapshot_timeout_to_first_data,
380+
action: action
380381
) do
381382
{:ok, _supervisor_pid} ->
382383
:ok

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ defmodule Electric.Shapes.Consumer do
7676
Logger.metadata(metadata)
7777
Electric.Telemetry.Sentry.set_tags_context(metadata)
7878

79+
{action, config} = Map.pop!(config, :action)
80+
7981
state =
8082
Map.merge(config, %{
8183
snapshot_started: false,
@@ -91,11 +93,11 @@ defmodule Electric.Shapes.Consumer do
9193
shape_status_mod: Map.get(config, :shape_status_mod) || Electric.ShapeCache.ShapeStatus
9294
})
9395

94-
{:ok, state, {:continue, :init_storage}}
96+
{:ok, state, {:continue, {:init_consumer, action}}}
9597
end
9698

9799
@impl GenServer
98-
def handle_continue(:init_storage, state) do
100+
def handle_continue({:init_consumer, action}, state) do
99101
%{
100102
storage: storage,
101103
shape_status_mod: shape_status_mod
@@ -135,7 +137,7 @@ defmodule Electric.Shapes.Consumer do
135137
Materializer.subscribe(state.stack_id, shape_handle)
136138
end
137139

138-
ShapeLogCollector.subscribe(state.stack_id, state.shape_handle, state.shape)
140+
ShapeLogCollector.subscribe(state.stack_id, state.shape_handle, state.shape, action)
139141

140142
Logger.debug("Writer for #{state.shape_handle} initialized")
141143

packages/sync-service/lib/electric/shapes/consumer_supervisor.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ defmodule Electric.Shapes.ConsumerSupervisor do
2121
default: :timer.seconds(30)
2222
],
2323
hibernate_after: [type: :integer, required: true],
24+
action: [type: {:in, [:restore, :create]}, default: :create],
2425
otel_ctx: [type: :any, required: false]
2526
)
2627

@@ -109,7 +110,8 @@ defmodule Electric.Shapes.ConsumerSupervisor do
109110
shape_handle: shape_handle,
110111
shape_status_mod: Map.get(config, :shape_status_mod),
111112
stack_id: config.stack_id,
112-
storage: shape_storage
113+
storage: shape_storage,
114+
action: config.action
113115
}}
114116
]
115117

packages/sync-service/test/electric/replication/shape_log_collector_test.exs

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,25 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
4444
@shape_handle "the-shape-handle"
4545

4646
def setup_log_collector(ctx) do
47+
%{stack_id: stack_id} = ctx
4748
# Start a test Registry
4849
registry_name = Module.concat(__MODULE__, Registry)
4950
start_link_supervised!({Registry, keys: :duplicate, name: registry_name})
5051

52+
existing_shapes = Map.get(ctx, :restore_shapes, [])
53+
54+
Repatch.patch(Electric.ShapeCache.ShapeStatus, :list_shapes, [mode: :shared], fn ^stack_id ->
55+
existing_shapes
56+
end)
57+
58+
Support.TestUtils.activate_mocks_for_descendant_procs(ShapeLogCollector)
59+
60+
inspector = Map.get(ctx, :inspector, {Mock.Inspector, elem(@inspector, 1)})
61+
5162
# Start the ShapeLogCollector process
5263
opts = [
53-
stack_id: ctx.stack_id,
54-
inspector: {Mock.Inspector, elem(@inspector, 1)},
64+
stack_id: stack_id,
65+
inspector: inspector,
5566
persistent_kv: ctx.persistent_kv
5667
]
5768

@@ -61,16 +72,14 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
6172
:ok
6273
end)
6374

64-
Repatch.allow(self(), pid)
65-
6675
shape_cache_opts =
6776
[
6877
storage: {Mock.Storage, []},
6978
chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(),
7079
inspector: {Mock.Inspector, elem(@inspector, 1)},
7180
publication_manager: ctx.publication_manager,
72-
stack_id: ctx.stack_id,
73-
consumer_supervisor: Electric.Shapes.DynamicConsumerSupervisor.name(ctx.stack_id),
81+
stack_id: stack_id,
82+
consumer_supervisor: Electric.Shapes.DynamicConsumerSupervisor.name(stack_id),
7483
registry: registry_name
7584
]
7685

@@ -79,6 +88,44 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
7988
%{server: pid, registry: registry_name, shape_cache: shape_cache_pid}
8089
end
8190

91+
describe "shape restoration" do
92+
setup :setup_log_collector
93+
94+
@tag restore_shapes: [{@shape_handle, @shape}], inspector: @inspector
95+
test "populates the filter, partitions and layers from the shape_status table", ctx do
96+
parent = self()
97+
98+
xmin = 100
99+
lsn = Lsn.from_string("0/10")
100+
last_log_offset = LogOffset.new(lsn, 0)
101+
102+
consumer =
103+
start_link_supervised!(
104+
{Support.TransactionConsumer,
105+
[
106+
id: 1,
107+
parent: parent,
108+
producer: ctx.server,
109+
shape: @shape,
110+
shape_handle: @shape_handle,
111+
action: :restore
112+
]}
113+
)
114+
115+
txn =
116+
%Transaction{xid: xmin, lsn: lsn, last_log_offset: last_log_offset}
117+
|> Transaction.prepend_change(%Changes.NewRecord{
118+
relation: {"public", "test_table"},
119+
record: %{"id" => "2", "name" => "foo"}
120+
})
121+
122+
assert :ok = ShapeLogCollector.store_transaction(txn, ctx.server)
123+
124+
xids = Support.TransactionConsumer.assert_consume([{1, consumer}], [txn])
125+
assert xids == [xmin]
126+
end
127+
end
128+
82129
describe "store_transaction/2" do
83130
setup :setup_log_collector
84131

packages/sync-service/test/electric/shape_cache_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,8 +975,8 @@ defmodule Electric.ShapeCacheTest do
975975

976976
context =
977977
context
978-
|> Map.merge(with_shape_log_collector(context))
979978
|> Map.merge(with_shape_status(context))
979+
|> Map.merge(with_shape_log_collector(context))
980980

981981
with_shape_cache(context, opts)
982982
end

packages/sync-service/test/support/transaction_consumer.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ defmodule Support.TransactionConsumer do
4141
{:ok, producer} = Keyword.fetch(opts, :producer)
4242
{:ok, parent} = Keyword.fetch(opts, :parent)
4343
{:ok, id} = Keyword.fetch(opts, :id)
44+
action = Keyword.get(opts, :action, :create)
4445
shape = Keyword.fetch!(opts, :shape)
4546
shape_handle = Keyword.fetch!(opts, :shape_handle)
46-
Electric.Replication.ShapeLogCollector.subscribe(producer, shape_handle, shape)
47+
48+
Electric.Replication.ShapeLogCollector.subscribe(producer, shape_handle, shape, action)
4749

4850
{:ok, {id, parent}}
4951
end

0 commit comments

Comments
 (0)