Skip to content

Commit 4a28afa

Browse files
authored
fix: ensure absence of a materializer doesn't crash SLC (#3640)
1 parent 6f3f812 commit 4a28afa

File tree

6 files changed

+140
-5
lines changed

6 files changed

+140
-5
lines changed

.changeset/late-items-appear.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
fix: ensure abscense of a materializer doesn't crash a part of electric

packages/sync-service/lib/electric/shapes/consumer/materializer.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ defmodule Electric.Shapes.Consumer.Materializer do
5555

5656
def get_link_values(opts) do
5757
GenServer.call(name(opts), :get_link_values)
58+
catch
59+
:exit, _reason ->
60+
raise ~s|Materializer for stack "#{opts.stack_id}" and handle "#{opts.shape_handle}" is not available|
5861
end
5962

6063
def get_all_as_refs(shape, stack_id) when are_deps_filled(shape) do

packages/sync-service/lib/electric/shapes/filter.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ defmodule Electric.Shapes.Filter do
121121
OpenTelemetry.timed_fun("filter.affected_shapes.duration_µs", fn ->
122122
try do
123123
shapes_affected_by_change(filter, change)
124-
rescue
125-
error ->
124+
catch
125+
kind, error ->
126126
Logger.error("""
127127
Unexpected error in Filter.affected_shapes:
128-
#{Exception.format(:error, error, __STACKTRACE__)}
128+
#{Exception.format(kind, error, __STACKTRACE__)}
129129
""")
130130

131-
OpenTelemetry.record_exception(:error, error, __STACKTRACE__)
131+
OpenTelemetry.record_exception(kind, error, __STACKTRACE__)
132132

133133
# We can't tell which shapes are affected, the safest thing to do is return all shapes
134134
all_shape_ids(filter)

packages/sync-service/lib/electric/shapes/partitions.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ defmodule Electric.Shapes.Partitions do
3535
table) then this will expand the mapping function to add a change to the
3636
partition root for every change to a partition of that root.
3737
"""
38-
@spec add_shape(t(), shape_id(), Electric.Shapes.Shape.t()) :: t()
38+
@spec add_shape(t(), shape_id(), Electric.Shapes.Shape.t()) ::
39+
{:ok, t()} | {:error, :connection_not_available}
3940
def add_shape(%__MODULE__{} = state, shape_id, shape) do
4041
case Inspector.load_relation_info(shape.root_table_id, state.inspector) do
4142
{:ok, relation} ->

packages/sync-service/test/electric/replication/shape_log_collector_test.exs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,108 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
929929
end
930930
end
931931

932+
describe "handle_event/2 with shapes with dependencies" do
933+
@shape Shape.new!("test_table", inspector: @inspector)
934+
@shape2 Shape.new!("test_table",
935+
where: "id IN (SELECT id FROM test_table WHERE id > 10)",
936+
inspector: @inspector
937+
)
938+
setup :setup_log_collector
939+
940+
setup ctx do
941+
parent = self()
942+
943+
stub_inspector(
944+
load_relation_oid: fn {"public", "test_table"}, _ ->
945+
{:ok, {1234, {"public", "test_table"}}}
946+
end,
947+
load_relation_info: fn 1234, _ ->
948+
{:ok, %{id: 1234, schema: "public", name: "test_table", parent: nil, children: nil}}
949+
end,
950+
load_column_info: fn 1234, _ ->
951+
{:ok, [%{pk_position: 0, name: "id", is_generated: false}]}
952+
end
953+
)
954+
955+
consumers = [
956+
{:normal,
957+
start_link_supervised!(
958+
{Support.TransactionConsumer,
959+
id: :normal,
960+
stack_id: ctx.stack_id,
961+
parent: parent,
962+
shape: @shape,
963+
shape_handle: "normal-shape-handle"},
964+
id: {:consumer, :normal}
965+
)},
966+
{:inner,
967+
start_link_supervised!(
968+
{Support.TransactionConsumer,
969+
id: :inner,
970+
stack_id: ctx.stack_id,
971+
parent: parent,
972+
shape: @shape2.shape_dependencies |> List.first(),
973+
shape_handle: "inner-shape-handle"},
974+
id: {:consumer, :inner}
975+
)},
976+
{:outer,
977+
start_link_supervised!(
978+
{Support.TransactionConsumer,
979+
id: :outer,
980+
stack_id: ctx.stack_id,
981+
parent: parent,
982+
shape: %{@shape2 | shape_dependencies_handles: ["inner-shape-handle"]},
983+
shape_handle: "outer-shape-handle"},
984+
id: {:consumer, :outer}
985+
)}
986+
]
987+
988+
%{consumers: consumers}
989+
end
990+
991+
test "should handle a transaction that requires a materializer", ctx do
992+
start_link_supervised!(
993+
{Support.StubMaterializer,
994+
stack_id: ctx.stack_id,
995+
shape_handle: "inner-shape-handle",
996+
initial_values: MapSet.new([])}
997+
)
998+
999+
txn =
1000+
transaction(100, Lsn.from_string("0/10"), [
1001+
%Changes.NewRecord{
1002+
relation: {"public", "test_table"},
1003+
record: %{"id" => "11"},
1004+
log_offset: LogOffset.new(Lsn.from_string("0/10"), 0)
1005+
}
1006+
])
1007+
1008+
assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id)
1009+
1010+
# Outer consumer should not receive this, because empty initial values won't satisfy the where clause
1011+
Support.TransactionConsumer.assert_consume(ctx.consumers |> Keyword.drop([:outer]), [txn])
1012+
end
1013+
1014+
test "should not crash if the materializer is not there, instead skipping the depenencies",
1015+
ctx do
1016+
txn =
1017+
transaction(100, Lsn.from_string("0/10"), [
1018+
%Changes.NewRecord{
1019+
relation: {"public", "test_table"},
1020+
record: %{"id" => "11"},
1021+
log_offset: LogOffset.new(Lsn.from_string("0/10"), 0)
1022+
}
1023+
])
1024+
1025+
assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id)
1026+
1027+
# This assertion holds because our where clause processing in SLC is best-effort:
1028+
# any crash, like abscence of a materializer, causes the transaction to just be sent everywhere
1029+
# and hope the consumers will filter it out.
1030+
Support.TransactionConsumer.assert_consume(ctx.consumers, [txn])
1031+
end
1032+
end
1033+
9321034
defp transaction(xid, lsn, changes) do
9331035
last_log_offset =
9341036
case Enum.reverse(changes) do
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule Support.StubMaterializer do
2+
use GenServer, restart: :temporary
3+
4+
def start_link(opts) do
5+
opts = Map.new(opts)
6+
GenServer.start_link(__MODULE__, opts, name: Electric.Shapes.Consumer.Materializer.name(opts))
7+
end
8+
9+
def init(opts) do
10+
{:ok, %{current_values: Map.get(opts, :initial_values, MapSet.new())}}
11+
end
12+
13+
def set_link_values(name, values) do
14+
GenServer.call(Electric.Shapes.Consumer.Materializer.name(name), {:set_link_values, values})
15+
end
16+
17+
def handle_call(:get_link_values, _from, state) do
18+
{:reply, state.current_values, state}
19+
end
20+
21+
def handle_call({:set_link_values, values}, _from, state) do
22+
{:reply, :ok, %{state | current_values: MapSet.new(values)}}
23+
end
24+
end

0 commit comments

Comments
 (0)