Skip to content

Commit 3a00202

Browse files
authored
chore(sync-service): Recover shapes without holding supervisor up (#3293)
This is a small part of the shape subsystem work. This PR recovers shapes without holding it's supervisor up. Eventually this will mean we can recover shapes asynchronously while the database connection is being established. The effect of this currently is that the ExpiryManager will start before the shapes have been restores so this PR also makes the ExpiryManager check if the stack is ready before expiring. I've also used this opportunity to re-write the ExpiryManager tests that were a mess (I wrote them) The other process this affects is the SchemaReconciler, it may now reconcile before the shapes have been restored but given that it loads its own data from PersistentKV I don't think this is an issue (but please let me know if you know otherwise!)
1 parent 7b3e50c commit 3a00202

File tree

5 files changed

+97
-138
lines changed

5 files changed

+97
-138
lines changed

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,28 +220,24 @@ defmodule Electric.ShapeCache do
220220
snapshot_timeout_to_first_data: opts.snapshot_timeout_to_first_data
221221
}
222222

223+
{:ok, state, {:continue, {:recover_shapes, opts.recover_shape_timeout}}}
224+
end
225+
226+
@impl GenServer
227+
def handle_continue({:recover_shapes, recover_shape_timeout}, state) do
223228
{last_processed_lsn, total_recovered, total_failed_to_recover} =
224-
recover_shapes(state, opts.recover_shape_timeout)
229+
recover_shapes(state, recover_shape_timeout)
225230

226231
# Empirical evidence shows that after recovering 50K shapes ShapeStatusOwner and ShapeCache
227232
# each take up 200+MB of memory. Explicitly running garbage collection for both immediately
228233
# takes that down to 4-5MB.
229234
:erlang.garbage_collect()
230235

231-
# Let ShapeLogCollector that it can start processing after finishing this function so that
232-
# we're subscribed to the producer before it starts forwarding its demand.
233-
{:ok, state,
234-
{:continue, {:consumers_ready, last_processed_lsn, total_recovered, total_failed_to_recover}}}
235-
end
236-
237-
@impl GenServer
238-
def handle_continue(
239-
{:consumers_ready, last_processed_lsn, total_recovered, total_failed_to_recover},
240-
state
241-
) do
242236
{pub_man, pub_man_opts} = state.publication_manager
243237
pub_man.wait_for_restore(pub_man_opts)
244238

239+
# Let ShapeLogCollector that it can start processing after finishing this function so that
240+
# we're subscribed to the producer before it starts forwarding its demand.
245241
ShapeLogCollector.set_last_processed_lsn(state.stack_id, last_processed_lsn)
246242

247243
Electric.Connection.Manager.consumers_ready(

packages/sync-service/lib/electric/shape_cache/expiry_manager.ex

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule Electric.ShapeCache.ExpiryManager do
22
use GenServer
33

44
alias Electric.ShapeCache.ShapeStatus
5+
alias Electric.StatusMonitor
56
alias Electric.Telemetry.OpenTelemetry
67

78
require Logger
@@ -60,10 +61,19 @@ defmodule Electric.ShapeCache.ExpiryManager do
6061
defp maybe_expire_shapes(%{max_shapes: nil}), do: :ok
6162

6263
defp maybe_expire_shapes(%{max_shapes: max_shapes} = state) do
63-
shape_count = shape_count(state)
64-
65-
if shape_count > max_shapes do
66-
expire_shapes(shape_count, state)
64+
case StatusMonitor.status(state.stack_id) do
65+
%{shape: :up} ->
66+
shape_count = shape_count(state)
67+
68+
if shape_count > max_shapes do
69+
expire_shapes(shape_count, state)
70+
end
71+
72+
status ->
73+
# We do not expire shapes if the stack is not active since this may mean that
74+
# shapes have not fully restored yet and we don't want to expire while restoring
75+
# as this may cause race conditions.
76+
Logger.debug("Expiry check skipped due to inactive stack: #{inspect(status)}")
6777
end
6878
end
6979

packages/sync-service/lib/electric/shape_cache/shape_status.ex

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,17 @@ defmodule Electric.ShapeCache.ShapeStatus do
8484
@snapshot_started :snapshot_started
8585

8686
@impl true
87-
def initialise(stack_ref, storage) do
87+
def initialise(stack_ref, storage \\ nil) do
8888
last_used_table = shape_last_used_table(stack_ref)
8989
:ets.new(last_used_table, [:named_table, :public, :ordered_set])
9090

9191
meta_table = shape_meta_table(stack_ref)
9292

93-
case load_table_backup(meta_table, storage) do
94-
{:ok, ^meta_table, path} ->
95-
Logger.info("Loaded shape status from backup at #{path}")
96-
:ok
97-
98-
_ ->
99-
Logger.debug("No shape status backup loaded, creating new table #{meta_table}")
100-
:ets.new(meta_table, [:named_table, :public, :ordered_set])
101-
load(meta_table, last_used_table, storage)
93+
if storage do
94+
load_from_storage(stack_ref, storage)
95+
else
96+
:ets.new(meta_table, [:named_table, :public, :ordered_set])
97+
:ok
10298
end
10399
end
104100

@@ -416,6 +412,22 @@ defmodule Electric.ShapeCache.ShapeStatus do
416412
end
417413
end
418414

415+
defp load_from_storage(stack_ref, storage) do
416+
last_used_table = shape_last_used_table(stack_ref)
417+
meta_table = shape_meta_table(stack_ref)
418+
419+
case load_table_backup(meta_table, storage) do
420+
{:ok, ^meta_table, path} ->
421+
Logger.info("Loaded shape status from backup at #{path}")
422+
:ok
423+
424+
_ ->
425+
Logger.debug("No shape status backup loaded, creating new table #{meta_table}")
426+
:ets.new(meta_table, [:named_table, :public, :ordered_set])
427+
load(meta_table, last_used_table, storage)
428+
end
429+
end
430+
419431
defp load_table_backup(meta_table, storage) do
420432
case backup_dir(storage) do
421433
nil ->
Lines changed: 48 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,138 +1,79 @@
11
defmodule Electric.ExpiryManagerTest do
22
use ExUnit.Case, async: true
3-
use Support.Mock
43
use Repatch.ExUnit
54

6-
alias Electric.Replication.Changes
7-
alias Electric.Replication.LogOffset
8-
alias Electric.ShapeCache
95
alias Electric.ShapeCache.ExpiryManager
10-
alias Electric.ShapeCache.Storage
6+
alias Electric.ShapeCache.ShapeCleaner
7+
alias Electric.ShapeCache.ShapeStatus
118
alias Electric.Shapes.Shape
9+
alias Support.RepatchExt
1210

13-
import Mox
1411
import Support.ComponentSetup
1512
import Support.TestUtils
1613

17-
@stub_inspector Support.StubInspector.new(
18-
tables: [{1, {"public", "items"}}],
19-
columns: [
20-
%{
21-
name: "id",
22-
type: "int8",
23-
type_id: {20, 1},
24-
pk_position: 0,
25-
is_generated: false
26-
},
27-
%{name: "value", type: "text", type_id: {25, 1}, is_generated: false}
28-
]
29-
)
30-
@shape Shape.new!("items", inspector: @stub_inspector)
31-
32-
# {xmin, xmax, xip_list}
33-
@pg_snapshot_xmin_10 {10, 11, [10]}
34-
35-
@moduletag :tmp_dir
36-
37-
defmodule TempPubManager do
38-
def add_shape(_handle, _, opts) do
39-
send(opts[:test_pid], {:called, :prepare_tables_fn})
40-
end
41-
end
42-
43-
setup :verify_on_exit!
44-
45-
setup do
46-
%{inspector: @stub_inspector, pool: nil}
47-
end
48-
4914
setup [
50-
:with_persistent_kv,
5115
:with_stack_id_from_test,
52-
:with_async_deleter,
53-
:with_pure_file_storage,
54-
:with_shape_status,
55-
:with_shape_cleaner,
56-
:with_status_monitor,
57-
:with_shape_monitor,
58-
:with_log_chunking,
59-
:with_registry,
60-
:with_shape_log_collector,
61-
:with_noop_publication_manager
16+
:with_status_monitor
6217
]
6318

64-
test "expires shapes if shape count has gone over max_shapes", ctx do
65-
Support.TestUtils.patch_snapshotter(fn parent, shape_handle, _shape, %{storage: storage} ->
66-
GenServer.cast(parent, {:pg_snapshot_known, shape_handle, @pg_snapshot_xmin_10})
67-
Storage.make_new_snapshot!([["test"]], storage)
68-
GenServer.cast(parent, {:snapshot_started, shape_handle})
69-
end)
70-
71-
%{shape_cache_opts: opts} = with_shape_cache(ctx)
72-
73-
start_supervised!(
74-
{ExpiryManager, max_shapes: 1, expiry_batch_size: 1, period: 10, stack_id: ctx.stack_id}
75-
)
76-
77-
{shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape, opts)
78-
assert :started = ShapeCache.await_snapshot_start(shape_handle, opts)
19+
@max_shapes 10
7920

80-
consumer_ref =
81-
Electric.Shapes.Consumer.whereis(ctx.stack_id, shape_handle)
82-
|> Process.monitor()
21+
setup %{stack_id: stack_id} do
22+
ShapeStatus.initialise(stack_id)
8323

84-
storage = Storage.for_shape(shape_handle, ctx.storage)
85-
writer = Storage.init_writer!(storage, @shape)
86-
87-
Storage.append_to_log!(
88-
changes_to_log_items([
89-
%Changes.NewRecord{
90-
relation: {"public", "items"},
91-
record: %{"id" => "1", "value" => "Alice"},
92-
log_offset: LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0)
93-
}
94-
]),
95-
writer
96-
)
24+
expiry_manager =
25+
start_supervised!(
26+
{ExpiryManager,
27+
max_shapes: @max_shapes, expiry_batch_size: 1, period: 1, stack_id: stack_id}
28+
)
9729

98-
assert Storage.snapshot_started?(storage)
30+
Repatch.patch(ShapeCleaner, :remove_shape, [mode: :shared], fn shape_handle,
31+
stack_id: stack_id ->
32+
ShapeStatus.remove_shape(stack_id, shape_handle)
33+
end)
9934

100-
assert Enum.count(Storage.get_log_stream(LogOffset.last_before_real_offsets(), storage)) ==
101-
1
35+
Repatch.allow(self(), expiry_manager)
36+
%{expiry_manager: expiry_manager}
37+
end
10238

103-
{new_shape_handle, _} =
104-
ShapeCache.get_or_create_shape_handle(%{@shape | where: "1 == 1"}, opts)
39+
describe "when stack is active" do
40+
setup :set_status_to_active
10541

106-
assert :started = ShapeCache.await_snapshot_start(new_shape_handle, opts)
42+
test "expires shapes if shape count has gone over max_shapes", ctx do
43+
for i <- 1..(@max_shapes + 1) do
44+
ShapeStatus.add_shape(ctx.stack_id, create_shape(i))
45+
end
10746

108-
assert_receive {:DOWN, ^consumer_ref, :process, _pid, {:shutdown, :cleanup}}
47+
assert RepatchExt.called_within_ms?(ShapeCleaner, :remove_shape, 2, 50, ctx.expiry_manager)
48+
end
10949

110-
assert :ok = await_for_storage_to_raise(storage)
50+
test "does not expires shapes if shape count has not gone over max_shapes", ctx do
51+
for i <- 1..@max_shapes do
52+
ShapeStatus.add_shape(ctx.stack_id, create_shape(i))
53+
end
11154

112-
{shape_handle2, _} = ShapeCache.get_or_create_shape_handle(@shape, opts)
113-
assert shape_handle != shape_handle2
114-
assert :started = ShapeCache.await_snapshot_start(shape_handle2, opts)
55+
refute RepatchExt.called_within_ms?(ShapeCleaner, :remove_shape, 2, 50, ctx.expiry_manager)
56+
end
11557
end
11658

117-
defp await_for_storage_to_raise(storage, timeout \\ 5_000)
59+
describe "when stack is not active" do
60+
test "does not expires shapes even if shape count has gone over max_shapes", ctx do
61+
for i <- 1..(@max_shapes + 1) do
62+
ShapeStatus.add_shape(ctx.stack_id, create_shape(i))
63+
end
11864

119-
defp await_for_storage_to_raise(_storage, timeout) when timeout <= 0 do
120-
raise "Storage did not raise Storage.Error in time"
65+
refute RepatchExt.called_within_ms?(ShapeCleaner, :remove_shape, 2, 50, ctx.expiry_manager)
66+
end
12167
end
12268

123-
defp await_for_storage_to_raise(storage, timeout) do
124-
try do
125-
start_time = System.monotonic_time()
126-
Stream.run(Storage.get_log_stream(LogOffset.before_all(), storage))
127-
Process.sleep(50)
128-
elapsed = System.monotonic_time() - start_time
69+
@inspector Support.StubInspector.new(
70+
tables: ["t1"],
71+
columns: [
72+
%{name: "id", type: "int8", pk_position: 0}
73+
]
74+
)
12975

130-
await_for_storage_to_raise(
131-
storage,
132-
timeout - System.convert_time_unit(elapsed, :native, :millisecond)
133-
)
134-
rescue
135-
Storage.Error -> :ok
136-
end
76+
defp create_shape(id) do
77+
Shape.new!("t1", where: "id = #{id}", inspector: @inspector)
13778
end
13879
end
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
defmodule Support.RepatchExt do
2-
def called_within_ms?(module, function, args, ms) do
3-
called_within_ms?(module, function, args, ms, System.monotonic_time(:millisecond))
2+
def called_within_ms?(module, function, args, ms, by \\ :any) do
3+
called_within_ms?(module, function, args, ms, by, System.monotonic_time(:millisecond))
44
end
55

6-
defp called_within_ms?(module, function, args, ms, start_time) do
6+
defp called_within_ms?(module, function, args, ms, by, start_time) do
77
cond do
88
System.monotonic_time(:millisecond) - start_time > ms ->
99
false
1010

11-
Repatch.called?(module, function, args, by: :any) ->
11+
Repatch.called?(module, function, args, by: by) ->
1212
true
1313

1414
true ->
1515
Process.sleep(1)
16-
called_within_ms?(module, function, args, ms, start_time)
16+
called_within_ms?(module, function, args, ms, by, start_time)
1717
end
1818
end
1919
end

0 commit comments

Comments
 (0)