Skip to content

Commit c3e2582

Browse files
authored
fix: subqueries restoration on startup (#3268)
- Fixed JSON deserialization not working at all for shapes with subqueries - Fixed startup process not restarting materializers for shapes - Made startup tiered with regards to shape dependency topology Closes #3261
1 parent 9cf77e5 commit c3e2582

File tree

10 files changed

+318
-73
lines changed

10 files changed

+318
-73
lines changed

.changeset/unlucky-doors-begin.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
fix: ensure shapes with subqueries are deserialized correctly when loading from disk, and materializers are properly started

packages/sync-service/lib/electric/replication/eval/expr.ex

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,19 @@ defmodule Electric.Replication.Eval.Expr do
5757

5858
@doc false
5959
@spec from_json_safe(map()) :: {:ok, t()} | {:error, String.t()}
60-
def from_json_safe(%{"version" => 1, "query" => query, "used_refs" => refs}) do
60+
def from_json_safe(map, sublink_queries \\ %{})
61+
62+
def from_json_safe(
63+
%{"version" => 1, "query" => query, "used_refs" => refs},
64+
sublink_queries
65+
) do
6166
refs =
6267
Map.new(refs, fn [k, v] -> {k, type_from_json_safe(v)} end)
6368

64-
Parser.parse_and_validate_expression(query, refs: refs)
69+
Parser.parse_and_validate_expression(query, refs: refs, sublink_queries: sublink_queries)
6570
end
6671

67-
def from_json_safe(_),
72+
def from_json_safe(_, _),
6873
do: {:error, "Incorrect serialized format: keys must be `version`, `query`, `used_refs`"}
6974

7075
defp type_from_json_safe(["array", type]), do: {:array, type_from_json_safe(type)}

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -260,42 +260,43 @@ defmodule Electric.ShapeCache do
260260
{:reply, !is_nil(ShapeStatus.get_existing_shape(state.stack_id, shape_handle)), state}
261261
end
262262

263-
defp shape_handles(state) do
264-
ShapeStatus.list_shapes(state.stack_id)
265-
end
266-
267263
# Timeout is per-shape, not for the entire function
268264
defp recover_shapes(state, timeout) do
269-
all_handles = shape_handles(state)
265+
# we're starting here group after group without parallelization between groups
266+
all_handles_and_shapes = ShapeStatus.list_shapes(state.stack_id)
270267

271268
recovered =
272-
Task.Supervisor.async_stream_nolink(
273-
Electric.ProcessRegistry.name(state.stack_id, Electric.StackTaskSupervisor),
274-
all_handles,
275-
fn {shape_handle, shape} -> start_and_recover_shape(shape_handle, shape, state) end,
276-
ordered: false,
277-
timeout: timeout,
278-
on_timeout: :kill_task,
279-
zip_input_on_exit: true
280-
)
281-
|> Stream.flat_map(fn
282-
{:ok, result} ->
283-
result
284-
285-
# All other exit reasons are caught in the `start_and_recover_shape/3` function
286-
{:exit, {{shape_handle, shape}, :timeout}} ->
287-
Logger.error(
288-
"shape #{inspect(shape)} (#{inspect(shape_handle)}) failed to start within #{timeout}ms"
289-
)
269+
all_handles_and_shapes
270+
|> group_into_layers()
271+
|> Enum.flat_map(fn shape_group ->
272+
Task.Supervisor.async_stream_nolink(
273+
Electric.ProcessRegistry.name(state.stack_id, Electric.StackTaskSupervisor),
274+
shape_group,
275+
fn {shape_handle, shape} -> start_and_recover_shape(shape_handle, shape, state) end,
276+
ordered: true,
277+
timeout: timeout,
278+
on_timeout: :kill_task,
279+
zip_input_on_exit: true
280+
)
281+
|> Stream.flat_map(fn
282+
{:ok, result} ->
283+
result
284+
285+
# All other exit reasons are caught in the `start_and_recover_shape/3` function
286+
{:exit, {{shape_handle, shape}, :timeout}} ->
287+
Logger.error(
288+
"shape #{inspect(shape)} (#{inspect(shape_handle)}) failed to start within #{timeout}ms"
289+
)
290290

291-
ShapeCleaner.remove_shape(shape_handle, stack_id: state.stack_id)
291+
ShapeCleaner.remove_shape(shape_handle, stack_id: state.stack_id)
292292

293-
[]
293+
[]
294+
end)
295+
|> Enum.to_list()
294296
end)
295-
|> Enum.to_list()
296297

297298
total_recovered = length(recovered)
298-
total_failed_to_recover = length(all_handles) - total_recovered
299+
total_failed_to_recover = length(all_handles_and_shapes) - total_recovered
299300

300301
{Lsn.max(recovered), total_recovered, total_failed_to_recover}
301302
end
@@ -331,21 +332,8 @@ defmodule Electric.ShapeCache do
331332
else
332333
shape_handles =
333334
shape.shape_dependencies
334-
|> Enum.map(&{&1, maybe_create_shape(&1, otel_ctx, state)})
335-
|> Enum.with_index(fn {inner_shape, {shape_handle, _}}, index ->
336-
materialized_type =
337-
shape.where.used_refs |> Map.fetch!(["$sublink", Integer.to_string(index)])
338-
339-
ConsumerSupervisor.start_materializer(%{
340-
stack_id: state.stack_id,
341-
shape_handle: shape_handle,
342-
storage: state.storage,
343-
columns: inner_shape.explicitly_selected_columns,
344-
materialized_type: materialized_type
345-
})
346-
347-
shape_handle
348-
end)
335+
|> Enum.map(&maybe_create_shape(&1, otel_ctx, state))
336+
|> Enum.map(&elem(&1, 0))
349337

350338
shape = %{shape | shape_dependencies_handles: shape_handles}
351339

@@ -362,6 +350,20 @@ defmodule Electric.ShapeCache do
362350
end
363351

364352
defp start_shape(shape_handle, shape, state, otel_ctx, action) do
353+
Enum.zip(shape.shape_dependencies_handles, shape.shape_dependencies)
354+
|> Enum.with_index(fn {shape_handle, inner_shape}, index ->
355+
materialized_type =
356+
shape.where.used_refs |> Map.fetch!(["$sublink", Integer.to_string(index)])
357+
358+
ConsumerSupervisor.start_materializer(%{
359+
stack_id: state.stack_id,
360+
shape_handle: shape_handle,
361+
storage: state.storage,
362+
columns: inner_shape.explicitly_selected_columns,
363+
materialized_type: materialized_type
364+
})
365+
end)
366+
365367
case Electric.Shapes.DynamicConsumerSupervisor.start_shape_consumer(
366368
state.consumer_supervisor,
367369
stack_id: state.stack_id,
@@ -388,4 +390,19 @@ defmodule Electric.ShapeCache do
388390
:error
389391
end
390392
end
393+
394+
@spec group_into_layers([{shape_handle(), Shape.t()}]) :: [[{shape_handle(), Shape.t()}], ...]
395+
defp group_into_layers(handles_and_shapes, acc \\ [], visited \\ MapSet.new())
396+
defp group_into_layers([], acc, _visited), do: Enum.reverse(acc)
397+
398+
defp group_into_layers(handles_and_shapes, acc, visited) do
399+
{appendable, missing_deps} =
400+
Enum.split_with(handles_and_shapes, fn {_, shape} ->
401+
Enum.all?(shape.shape_dependencies_handles, &MapSet.member?(visited, &1))
402+
end)
403+
404+
visited = MapSet.new(appendable, &elem(&1, 0)) |> MapSet.union(visited)
405+
406+
group_into_layers(missing_deps, [appendable | acc], visited)
407+
end
391408
end

packages/sync-service/lib/electric/shape_cache/shape_status.ex

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,30 @@ defmodule Electric.ShapeCache.ShapeStatus do
141141

142142
@impl true
143143
def list_shapes(stack_ref) do
144-
:ets.select(shape_meta_table(stack_ref), [
144+
shape_meta_table(stack_ref)
145+
|> :ets.select([
145146
{
146147
{{@shape_meta_data, :"$1"}, :"$2", :_, :_},
147148
[],
148149
[{{:"$1", :"$2"}}]
149150
}
150151
])
152+
|> topological_sort()
153+
end
154+
155+
@spec topological_sort([{shape_handle(), Shape.t()}]) :: [{shape_handle(), Shape.t()}]
156+
defp topological_sort(handles_and_shapes, acc \\ [], visited \\ MapSet.new())
157+
defp topological_sort([], acc, _visited), do: Enum.reverse(acc) |> List.flatten()
158+
159+
defp topological_sort(handles_and_shapes, acc, visited) do
160+
{appendable, missing_deps} =
161+
Enum.split_with(handles_and_shapes, fn {_, shape} ->
162+
Enum.all?(shape.shape_dependencies_handles, &MapSet.member?(visited, &1))
163+
end)
164+
165+
visited = MapSet.new(appendable, &elem(&1, 0)) |> MapSet.union(visited)
166+
167+
topological_sort(missing_deps, [appendable | acc], visited)
151168
end
152169

153170
@impl true
@@ -412,10 +429,33 @@ defmodule Electric.ShapeCache.ShapeStatus do
412429
:ets.insert(meta_table, meta_tuples)
413430
:ets.insert(last_used_table, last_used_tuples)
414431

432+
restore_dependency_handles(shapes, meta_table, storage)
433+
415434
:ok
416435
end
417436
end
418437

438+
defp restore_dependency_handles(shapes, meta_table, storage) do
439+
shapes
440+
|> Enum.filter(fn {_, shape} ->
441+
Shape.has_dependencies?(shape) and not Shape.dependency_handles_known?(shape)
442+
end)
443+
|> Enum.each(fn {handle, %Shape{shape_dependencies: deps} = shape} ->
444+
handles = Enum.map(deps, &get_existing_shape(meta_table, &1))
445+
446+
if not Enum.any?(handles, &is_nil/1) do
447+
handles = Enum.map(handles, &elem(&1, 0))
448+
shape = %Shape{shape | shape_dependencies_handles: handles}
449+
450+
:ets.update_element(meta_table, {@shape_meta_data, handle}, {2, shape})
451+
else
452+
Logger.warning("Shape #{inspect(handle)} has dependencies but some are unknown")
453+
remove_shape(meta_table, handle)
454+
Storage.cleanup!(storage, handle)
455+
end
456+
end)
457+
end
458+
419459
defp store_table_backup(meta_table, backup_dir) do
420460
case backup_dir do
421461
nil ->

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ defmodule Electric.Shapes.Consumer do
8585
buffer: [],
8686
monitors: [],
8787
cleaned?: false,
88+
terminating?: false,
8889
txn_offset_mapping: [],
8990
materializer_subscribed?: false,
9091
# The existing body of consumer tests made it impossible to replace this dynamic
@@ -128,27 +129,44 @@ defmodule Electric.Shapes.Consumer do
128129
normalized_latest_offset
129130
)
130131

131-
for shape_handle <- state.shape.shape_dependencies_handles do
132-
# TODO: handle a case when materializer is down
133-
Process.monitor(Materializer.whereis(state.stack_id, shape_handle),
134-
tag: {:dependency_materializer_down, shape_handle}
135-
)
132+
all_materializers_alive? =
133+
Enum.all?(state.shape.shape_dependencies_handles, fn shape_handle ->
134+
name = Materializer.name(state.stack_id, shape_handle)
136135

137-
Materializer.subscribe(state.stack_id, shape_handle)
138-
end
136+
with pid when is_pid(pid) <- GenServer.whereis(name),
137+
true <- Process.alive?(pid) do
138+
Process.monitor(pid,
139+
tag: {:dependency_materializer_down, shape_handle}
140+
)
141+
142+
Materializer.subscribe(state.stack_id, shape_handle)
143+
144+
true
145+
else
146+
_ -> false
147+
end
148+
end)
139149

140-
ShapeLogCollector.subscribe(state.stack_id, state.shape_handle, state.shape, action)
150+
if all_materializers_alive? do
151+
ShapeLogCollector.subscribe(state.stack_id, state.shape_handle, state.shape, action)
141152

142-
Logger.debug("Writer for #{state.shape_handle} initialized")
153+
Logger.debug("Writer for #{state.shape_handle} initialized")
143154

144-
Snapshotter.start_snapshot(state.stack_id, state.shape_handle)
155+
Snapshotter.start_snapshot(state.stack_id, state.shape_handle)
145156

146-
{:noreply,
147-
Map.merge(state, %{
148-
latest_offset: normalized_latest_offset,
149-
writer: writer,
150-
pg_snapshot: pg_snapshot
151-
}), state.hibernate_after}
157+
{:noreply,
158+
Map.merge(state, %{
159+
latest_offset: normalized_latest_offset,
160+
writer: writer,
161+
pg_snapshot: pg_snapshot
162+
}), state.hibernate_after}
163+
else
164+
Logger.warning(
165+
"Materializers for dependencies of #{state.shape_handle} are not alive, invalidating shape"
166+
)
167+
168+
{:noreply, terminate_safely(state)}
169+
end
152170
end
153171

154172
@impl GenServer
@@ -268,12 +286,22 @@ defmodule Electric.Shapes.Consumer do
268286
{:noreply, terminate_safely(state)}
269287
end
270288

289+
def handle_info({:materializer_shape_invalidated, shape_handle}, state) do
290+
Logger.warning("Materializer shape invalidated for #{shape_handle}")
291+
{:noreply, terminate_safely(state)}
292+
end
293+
271294
def handle_info({{:dependency_materializer_down, handle}, _ref, :process, pid, reason}, state) do
272295
Logger.warning(
273296
"Materializer down for a dependency: #{handle} (#{inspect(pid)}) (#{inspect(reason)})"
274297
)
275298

276-
{:noreply, terminate_safely(state)}
299+
case {reason, state.terminating?} do
300+
{_, true} -> {:noreply, state}
301+
{{:shutdown, _}, false} -> {:stop, reason, state}
302+
{:shutdown, false} -> {:stop, reason, state}
303+
_ -> {:noreply, terminate_safely(state)}
304+
end
277305
end
278306

279307
# We're trapping exists so that `terminate` is called to clean up the writer,
@@ -568,7 +596,13 @@ defmodule Electric.Shapes.Consumer do
568596
# 1. register that we want the shape data to be cleaned up.
569597
# 2. request a notification when all active shape data reads are complete
570598
# 3. exit the process when we receive that notification
571-
defp terminate_safely(state, reason \\ {:shutdown, :cleanup}) do
599+
defp terminate_safely(state, reason \\ {:shutdown, :cleanup})
600+
601+
defp terminate_safely(%{terminating?: true} = state, _reason) do
602+
state
603+
end
604+
605+
defp terminate_safely(state, reason) do
572606
%{
573607
stack_id: stack_id,
574608
shape_handle: shape_handle,
@@ -579,7 +613,7 @@ defmodule Electric.Shapes.Consumer do
579613

580614
:ok = Electric.Shapes.Monitor.notify_reader_termination(stack_id, shape_handle, reason)
581615

582-
notify_shape_rotation(state)
616+
notify_shape_rotation(%{state | terminating?: true})
583617
end
584618

585619
defp reply_to_snapshot_waiters(%{awaiting_snapshot_start: []} = state, _reply) do

packages/sync-service/lib/electric/shapes/consumer/materializer.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ defmodule Electric.Shapes.Consumer.Materializer do
7070
%{stack_id: stack_id, shape_handle: shape_handle} = opts
7171

7272
Process.set_label({:materializer, shape_handle})
73+
Process.flag(:trap_exit, true)
7374
metadata = [stack_id: stack_id, shape_handle: shape_handle]
7475
Logger.metadata(metadata)
7576
Electric.Telemetry.Sentry.set_tags_context(metadata)
@@ -93,6 +94,10 @@ defmodule Electric.Shapes.Consumer.Materializer do
9394
_ = Consumer.await_snapshot_start(state)
9495
Consumer.subscribe_materializer(state)
9596

97+
Process.monitor(GenServer.whereis(Consumer.name(state)),
98+
tag: {:consumer_down, state.shape_handle}
99+
)
100+
96101
{:noreply, state, {:continue, {:read_stream, storage}}}
97102
end
98103

@@ -171,6 +176,22 @@ defmodule Electric.Shapes.Consumer.Materializer do
171176
{:reply, :ok, %{state | subscribers: MapSet.put(state.subscribers, pid)}}
172177
end
173178

179+
def handle_info({:EXIT, _, reason}, state) do
180+
{:stop, reason, state}
181+
end
182+
183+
def handle_info({{:consumer_down, _}, _ref, :process, _pid, {:shutdown, :cleanup}}, state) do
184+
for pid <- state.subscribers do
185+
send(pid, {:materializer_shape_invalidated, state.shape_handle})
186+
end
187+
188+
{:noreply, state}
189+
end
190+
191+
def handle_info({{:consumer_down, _}, _ref, :process, _pid, _reason}, state) do
192+
{:noreply, state}
193+
end
194+
174195
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
175196
{:noreply, %{state | subscribers: MapSet.delete(state.subscribers, pid)}}
176197
end

0 commit comments

Comments
 (0)