Skip to content

Commit cea4c13

Browse files
authored
fix(sync-service): Pass shape directly to consumer process (#3964)
There may have been a time where we didn't have a loaded shape in the ShapeCache when starting a consumer and to avoid blocking the ShapeCache process we moved the shape load into the consumer. But that time has passed and the shape cache always has the Shape instance available when launching a consumer, even for resuming a previously suspended consumer. So instead of making the Consumer process load the shape, just include it in the launch opts and save a query to the ShapeDb. Most fetches from the ShapeDb are quick, especially for a new shape where the info is probably still in the in-memory buffer and so the fetch is just a copy from ets, but there is a very long tail, especially under load. If the consumer process is not scheduled immediately the ShapeDb could have time to move the shape data out of the cache and flush it to disk, which means the shape load has to go via the disk which can be hit and miss (pun intended). This should be an easy win as we're removing this unstable path for new shapes and reducing the shape fetch from twice to once for resumed consumers. I haven't gone through the tests to remove instances of starting a consumer directly, thus not including the shape, hence the `Map.get_lazy/3` call which falls back to the old shapedb fetch route.
1 parent 8daa822 commit cea4c13

File tree

4 files changed

+19
-1
lines changed

4 files changed

+19
-1
lines changed

.changeset/lovely-icons-sit.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Remove redundant ShapeDb fetch from Consumer initialization

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ defmodule Electric.ShapeCache do
358358
start_opts =
359359
opts
360360
|> Map.put(:shape_handle, shape_handle)
361+
|> Map.put(:shape, shape)
361362
|> Map.put(:subqueries_enabled_for_stack?, "allow_subqueries" in feature_flags)
362363

363364
case Shapes.DynamicConsumerSupervisor.start_shape_consumer(stack_id, start_opts) do

packages/sync-service/lib/electric/shape_cache/shape_status.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,15 @@ defmodule Electric.ShapeCache.ShapeStatus do
186186
end)
187187
end
188188

189+
@spec fetch_shape_by_handle!(stack_id(), shape_handle()) :: Shape.t() | no_return()
190+
def fetch_shape_by_handle!(stack_id, shape_handle)
191+
when is_stack_id(stack_id) and is_shape_handle(shape_handle) do
192+
case fetch_shape_by_handle(stack_id, shape_handle) do
193+
{:ok, shape} -> shape
194+
:error -> raise ArgumentError, message: "No shape found for handle #{inspect(shape_handle)}"
195+
end
196+
end
197+
189198
def has_shape_handle?(stack_id, shape_handle) do
190199
:ets.member(shape_meta_table(stack_id), shape_handle)
191200
end

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ defmodule Electric.Shapes.Consumer do
121121
shape_handle: shape_handle
122122
} = state
123123

124-
{:ok, shape} = ShapeCache.ShapeStatus.fetch_shape_by_handle(stack_id, shape_handle)
124+
shape =
125+
Map.get_lazy(config, :shape, fn ->
126+
ShapeCache.ShapeStatus.fetch_shape_by_handle!(stack_id, shape_handle)
127+
end)
125128

126129
state = State.initialize_shape(state, shape, config)
127130

0 commit comments

Comments
 (0)