Skip to content

Commit 58bdd19

Browse files
committed
scale sqlite pool to 0
1 parent 7cf8e5a commit 58bdd19

File tree

9 files changed

+269
-82
lines changed

9 files changed

+269
-82
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Add SQLite connection pool scaling to minimize memory usage in quiet instances

packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
44
alias Exqlite.Sqlite3
55
alias Electric.ShapeCache.ShapeStatus.ShapeDb.Query
66
alias Electric.ShapeCache.ShapeStatus.ShapeDb.PoolRegistry
7+
alias Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics
78
alias Electric.Telemetry.OpenTelemetry
89

910
require Logger
@@ -65,7 +66,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
6566

6667
defguardp is_raw_connection(conn) when is_reference(conn)
6768

68-
defstruct [:conn, :stmts]
69+
defstruct [:conn, :mode, :stmts]
6970

7071
def migrate(conn, opts) when is_raw_connection(conn) do
7172
# because we embed the storage version into the db path
@@ -119,7 +120,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
119120
end
120121

121122
def optimize(conn) when is_raw_connection(conn) do
122-
execute_all(conn, ["PRAGMA optimize=0x10002"])
123+
execute_all(conn, ["PRAGMA optimize=0x10002", "PRAGMA shrink_memory"])
123124
end
124125

125126
def enable_extension(conn, extension) when is_raw_connection(conn) do
@@ -131,17 +132,25 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
131132

132133
@impl NimblePool
133134
def init_worker(pool_state) do
134-
if Keyword.get(pool_state, :exclusive_mode, false) do
135+
with {:ok, conn} <- init_worker_for_pool(pool_state) do
136+
:ok = Statistics.worker_start(Keyword.get(pool_state, :stack_id))
137+
{:ok, conn, pool_state}
138+
end
139+
end
140+
141+
defp init_worker_for_pool(pool_state) do
142+
if(Keyword.get(pool_state, :exclusive_mode, false)) do
135143
init_worker_exclusive(pool_state)
136144
else
137145
init_worker_pooled(pool_state)
138146
end
139147
end
140148

141149
defp init_worker_pooled(pool_state) do
142-
with {:ok, conn} <- open(pool_state),
143-
stmts <- Query.prepare!(conn, pool_state) do
144-
{:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state}
150+
with mode = Keyword.get(pool_state, :mode, :readwrite),
151+
{:ok, conn} <- open(pool_state),
152+
stmts <- Query.prepare!(conn, mode) do
153+
{:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}}
145154
end
146155
end
147156

@@ -151,10 +160,11 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
151160
# over the same, single, connection since every connection is a separate db
152161
# in in-memory mode.
153162
defp init_worker_exclusive(pool_state) do
154-
with {:ok, conn} <- open(pool_state, integrity_check: true),
163+
with mode = :readwrite,
164+
{:ok, conn} <- open(pool_state, integrity_check: true),
155165
{:ok, _version} <- migrate(conn, pool_state),
156-
stmts <- Query.prepare!(conn, Keyword.put(pool_state, :mode, :readwrite)) do
157-
{:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state}
166+
stmts <- Query.prepare!(conn, mode) do
167+
{:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}}
158168
end
159169
end
160170

@@ -186,6 +196,23 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
186196
{:ok, client_state, pool_state}
187197
end
188198

199+
@impl NimblePool
200+
def handle_ping(%__MODULE__{} = state, pool_state) do
201+
Logger.debug(fn -> ["Closing idle SQLite ", to_string(state.mode), " connection"] end)
202+
203+
# the idle timeout is only enabled in non-exclusive mode, so we're free
204+
# to close all the connections, including write.
205+
206+
:ok = close(state)
207+
:ok = Statistics.worker_stop(Keyword.get(pool_state, :stack_id))
208+
209+
{:remove, :idle}
210+
end
211+
212+
def shrink_memory(conn) when is_raw_connection(conn) do
213+
execute(conn, "PRAGMA shrink_memory")
214+
end
215+
189216
@max_recovery_attempts 2
190217

191218
def open(pool_state, opts \\ []) do
@@ -258,6 +285,26 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
258285
end
259286
end
260287

288+
def close(%__MODULE__{conn: conn, stmts: stmts}) do
289+
# Need to release all the prepared statements on the connection or the
290+
# close doesn't actually release the resources.
291+
stmts
292+
|> Query.active_stmts()
293+
|> Enum.each(fn {name, stmt} ->
294+
case Sqlite3.release(conn, stmt) do
295+
:ok ->
296+
:ok
297+
298+
{:error, reason} ->
299+
Logger.warning(
300+
"Failed to release prepared statement #{inspect(name)}: #{inspect(reason)}"
301+
)
302+
end
303+
end)
304+
305+
close(conn)
306+
end
307+
261308
def close(conn) when is_raw_connection(conn) do
262309
Sqlite3.close(conn)
263310
end

packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,38 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Migrator do
2020
Logger.metadata(stack_id: stack_id)
2121
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
2222

23-
with {:ok, conn} <- apply_migration(stack_id, args, exclusive_mode) do
24-
{:ok, schedule_optimize(stack_id, conn), :hibernate}
23+
with :ok <- apply_migration(stack_id, args, exclusive_mode) do
24+
{:ok, schedule_optimize(stack_id), :hibernate}
2525
end
2626
end
2727

2828
defp apply_migration(_stack_id, _opts, true = _exclusive?) do
2929
# In exclusive mode we *must* apply the migrations within the pool
3030
# connection initialization because we might be using a memory db.
3131
# We return nil to trigger checkout-mode.
32-
{:ok, nil}
32+
:ok
3333
end
3434

3535
defp apply_migration(_stack_id, opts, false = _exclusive?) do
3636
with {:ok, conn} <- ShapeDb.Connection.open(opts, integrity_check: true),
3737
{:ok, _version} <- ShapeDb.Connection.migrate(conn, opts),
38-
:ok = ShapeDb.Connection.optimize(conn) do
39-
{:ok, conn}
38+
:ok = ShapeDb.Connection.optimize(conn),
39+
:ok = ShapeDb.Connection.close(conn) do
40+
:ok
4041
end
4142
end
4243

4344
@impl GenServer
44-
def handle_info(:optimize, {stack_id, nil}) do
45+
def handle_info(:optimize, stack_id) do
4546
ShapeDb.Connection.checkout_write!(stack_id, :optimize, fn %{conn: conn} ->
4647
:ok = ShapeDb.Connection.optimize(conn)
4748
end)
4849

49-
{:noreply, schedule_optimize(stack_id, nil), :hibernate}
50+
{:noreply, schedule_optimize(stack_id), :hibernate}
5051
end
5152

52-
def handle_info(:optimize, {stack_id, conn}) do
53-
Logger.notice("Optimizing shape db tables")
54-
:ok = ShapeDb.Connection.optimize(conn)
55-
56-
{:noreply, schedule_optimize(stack_id, conn), :hibernate}
57-
end
58-
59-
defp schedule_optimize(stack_id, conn) do
53+
defp schedule_optimize(stack_id) do
6054
Process.send_after(self(), :optimize, @optimization_period)
61-
{stack_id, conn}
55+
stack_id
6256
end
6357
end

packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do
5252
Keyword.take(@read_queries, [:handle_lookup])
5353
)
5454

55-
defstruct Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries))
55+
@stmt_names Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries))
56+
57+
defstruct @stmt_names
5658

5759
alias Electric.ShapeCache.ShapeStatus.ShapeDb.Connection, as: Conn
5860

@@ -68,8 +70,18 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do
6870
stream_query: 3
6971
]
7072

71-
def prepare!(conn, opts) do
72-
case Keyword.get(opts, :mode, :readwrite) do
73+
def active_stmts(nil) do
74+
[]
75+
end
76+
77+
def active_stmts(%__MODULE__{} = query) do
78+
@stmt_names
79+
|> Enum.map(&{&1, Map.fetch!(query, &1)})
80+
|> Enum.reject(&is_nil(elem(&1, 1)))
81+
end
82+
83+
def prepare!(conn, mode) do
84+
case mode do
7385
:readwrite ->
7486
struct(__MODULE__, prepare_stmts!(conn, @read_queries ++ @write_queries))
7587

0 commit comments

Comments
 (0)