Skip to content

Commit 3d410a6

Browse files
committed
make stats reading async
so stats retrieval is not blocked by concurrent reading of stats
1 parent 5272cb7 commit 3d410a6

File tree

4 files changed

+85
-44
lines changed

4 files changed

+85
-44
lines changed

packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
197197
end
198198

199199
@impl NimblePool
200-
def handle_ping(%__MODULE__{} = _conn, _pool_state) do
201-
# the idle timeout is only enabled in non-exclusive mode, so we're free
202-
# to close all the connections, including write.
203-
{:remove, :idle}
200+
def handle_ping(%__MODULE__{} = conn, pool_state) do
201+
if Keyword.get(pool_state, :exclusive_mode, false) do
202+
# keep the write connection alive in exclusive mode — closing it
203+
# would destroy an in-memory database
204+
{:ok, conn}
205+
else
206+
# the idle timeout is only enabled in non-exclusive mode, so we're free
207+
# to close all the connections, including write.
208+
{:remove, :idle}
209+
end
204210
end
205211

206212
@impl NimblePool

packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do
6565
exclusive_mode?: Keyword.get(args, :exclusive_mode, false),
6666
enable_memory_stats?: enable_memory_stats?,
6767
measurement_period: measurement_period,
68+
task: nil,
6869
pool_opts: args
6970
}}
7071
end
@@ -74,6 +75,41 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do
7475
{:noreply, read_stats(state), :hibernate}
7576
end
7677

78+
def handle_info({ref, read_stats_result}, %{task: %{ref: ref}} = state) do
79+
state =
80+
case read_stats_result do
81+
{:ok, stats} ->
82+
%{state | stats: stats}
83+
84+
{:error, reason} ->
85+
Logger.warning(["Failed to read SQLite statistics: ", inspect(reason)])
86+
state
87+
88+
:error ->
89+
Logger.warning("Failed to read SQLite statistics")
90+
state
91+
end
92+
|> then(fn
93+
%{first_run?: true} = state ->
94+
%{state | first_run?: false}
95+
96+
state ->
97+
state
98+
end)
99+
100+
{:noreply, state}
101+
end
102+
103+
def handle_info({:DOWN, ref, :process, _pid, _reason}, %{task: %{ref: ref}} = state) do
104+
Process.send_after(self(), :read_stats, state.measurement_period)
105+
{:noreply, %{state | task: nil}}
106+
end
107+
108+
def handle_info(msg, state) do
109+
Logger.warning(["Received unexpected message: ", inspect(msg)])
110+
{:noreply, state}
111+
end
112+
77113
@impl GenServer
78114
def handle_call(:statistics, _from, state) do
79115
{:reply, {:ok, Map.put(state.stats, :connections, state.connections)}, state}
@@ -113,41 +149,25 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do
113149
end
114150

115151
defp do_read_stats(state, include_memory?) do
116-
state
117-
|> open_connection(fn conn ->
118-
with {:ok, memstat_available?, page_size} <-
119-
initialize_connection(conn, state.first_run?, state.enable_memory_stats?),
120-
{:ok, stats} <-
121-
Connection.fetch_all(
122-
conn,
123-
stats_query(memstat_available? && include_memory?),
124-
[]
125-
) do
126-
{:ok, analyze_stats(stats, page_size)}
127-
end
128-
end)
129-
|> case do
130-
{:ok, stats} ->
131-
%{state | stats: stats}
132-
133-
{:error, reason} ->
134-
Logger.warning(["Failed to read SQLite statistics: ", inspect(reason)])
135-
state
136-
137-
:error ->
138-
Logger.warning("Failed to read SQLite statistics")
139-
state
140-
end
141-
|> then(fn
142-
%{first_run?: true} = state ->
143-
%{state | first_run?: false}
152+
# Read the stats in an async task so that we don't block reading the stats
153+
# and get spurious timeout errors
154+
task =
155+
Task.async(fn ->
156+
open_connection(state, fn conn ->
157+
with {:ok, memstat_available?, page_size} <-
158+
initialize_connection(conn, state.first_run?, state.enable_memory_stats?),
159+
{:ok, stats} <-
160+
Connection.fetch_all(
161+
conn,
162+
stats_query(memstat_available? && include_memory?),
163+
[]
164+
) do
165+
{:ok, analyze_stats(stats, page_size)}
166+
end
167+
end)
168+
end)
144169

145-
state ->
146-
state
147-
end)
148-
|> tap(fn state ->
149-
Process.send_after(self(), :read_stats, state.measurement_period)
150-
end)
170+
%{state | task: task}
151171
end
152172

153173
# In exclusive_mode we *must* use a pooled connection because the db maybe
@@ -176,7 +196,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do
176196
if enable_memory_stats? do
177197
case Connection.enable_extension(conn, "memstat") do
178198
:ok ->
179-
if first_run?, do: Logger.info("SQLite memory statistics enabled")
199+
if first_run?, do: Logger.notice("SQLite memory statistics enabled")
180200
true
181201

182202
{:error, reason} ->

packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do
2222
exclusive_mode = Keyword.get(opts, :exclusive_mode, false)
2323
idle_timeout = Keyword.get(opts, :connection_idle_timeout, @default_connection_idle_timeout)
2424
# don't close the write connection in exclusive mode
25+
# NimblePool treats `worker_idle_timeout: nil` as no idle timeout
2526
write_pool_idle_timeout = if(exclusive_mode, do: nil, else: idle_timeout)
2627

2728
read_pool_spec =

packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -548,17 +548,31 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do
548548
describe "statistics" do
549549
@tag shape_db_opts: [enable_memory_stats?: true]
550550
test "export memory and disk usage when enabled", ctx do
551-
assert {:ok, %{total_memory: memory, disk_size: disk_size}} =
552-
ShapeDb.statistics(ctx.stack_id)
551+
assert {:ok, %{total_memory: memory, disk_size: disk_size}} = wait_statistics(ctx)
553552

554553
assert memory > 0
555554
assert disk_size > 0
556555
end
557556

558557
test "only exports disk usage by default", ctx do
559-
assert {:ok, %{total_memory: 0, disk_size: disk_size}} = ShapeDb.statistics(ctx.stack_id)
558+
assert {:ok, %{total_memory: 0, disk_size: disk_size}} = wait_statistics(ctx)
560559
assert disk_size > 0
561560
end
561+
562+
defp wait_statistics(ctx, attempts \\ 10)
563+
564+
defp wait_statistics(_ctx, 0), do: :error
565+
566+
defp wait_statistics(ctx, remaining_attempts) do
567+
case ShapeDb.statistics(ctx.stack_id) do
568+
{:ok, %{updated_at: %DateTime{}} = stats} ->
569+
{:ok, stats}
570+
571+
{:ok, _} ->
572+
Process.sleep(10)
573+
wait_statistics(ctx, remaining_attempts - 1)
574+
end
575+
end
562576
end
563577

564578
describe "recovery" do
@@ -623,14 +637,14 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do
623637
@tag shape_db_opts: [
624638
exclusive_mode: true,
625639
connection_idle_timeout: 10,
626-
statistics_collection_period: 10
640+
statistics_collection_period: 1000
627641
]
628642
test "does not scale the pool in exclusive mode", ctx do
629643
ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn ->
630644
assert :ok = assert_stats_match(ctx, connections: 1)
631645
end)
632646

633-
assert :error = assert_stats_match(ctx, connections: 0, total_memory: 0)
647+
assert :error = assert_stats_match(ctx, connections: 0)
634648
end
635649

636650
defp assert_stats_match(ctx, match, repeats \\ 10)

0 commit comments

Comments
 (0)