Skip to content

Commit 97c0e50

Browse files
authored
Materialize shape into an ETS table (#77)
Fixes #58
1 parent 02f7958 commit 97c0e50

File tree

19 files changed

+1333
-147
lines changed

19 files changed

+1333
-147
lines changed

apps/phoenix_sync_example/mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"ecto": {:hex, :ecto, "3.13.2", "7d0c0863f3fc8d71d17fc3ad3b9424beae13f02712ad84191a826c7169484f01", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "669d9291370513ff56e7b7e7081b7af3283d02e046cf3d403053c557894a0b3e"},
1111
"ecto_sql": {:hex, :ecto_sql, "3.13.2", "a07d2461d84107b3d037097c822ffdd36ed69d1cf7c0f70e12a3d1decf04e2e1", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "539274ab0ecf1a0078a6a72ef3465629e4d6018a3028095dc90f60a19c371717"},
1212
"electric": {:hex, :electric, "1.0.24", "f17ee7971390cf710a731a349456f6da43750fbc6582d62793c8702c636ab203", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:bandit, "~> 1.6", [hex: :bandit, repo: "hexpm", optional: false]}, {:dotenvy, "~> 1.1", [hex: :dotenvy, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:electric_cubdb, "~> 2.0", [hex: :electric_cubdb, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5", [hex: :opentelemetry, repo: "hexpm", optional: true]}, {:opentelemetry_exporter, "~> 1.8", [hex: :opentelemetry_exporter, repo: "hexpm", optional: true]}, {:opentelemetry_semantic_conventions, "~> 1.27", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}, {:opentelemetry_telemetry, "~> 1.1", [hex: :opentelemetry_telemetry, repo: "hexpm", optional: false]}, {:otel_metric_exporter, "~> 0.3.9", [hex: :otel_metric_exporter, repo: "hexpm", optional: true]}, {:pg_query_ex, "0.7.0", [hex: :pg_query_ex, repo: "hexpm", optional: false]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.20", [hex: :postgrex, repo: "hexpm", optional: false]}, {:remote_ip, "~> 1.2", [hex: :remote_ip, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:sentry, "~> 10.9", [hex: :sentry, repo: "hexpm", optional: true]}, {:stream_split, "~> 0.1", [hex: :stream_split, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.1", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: true]}, {:telemetry_metrics_statsd, "~> 0.7", [hex: :telemetry_metrics_statsd, repo: "hexpm", optional: true]}, {:telemetry_poller, "~> 1.2", [hex: :telemetry_poller, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.27", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}, {:tz, "~> 0.28", [hex: :tz, repo: "hexpm", optional: false]}], "hexpm", "91e3a8b957c1e02d07da3a0e1e902420f32e1d7d5da25814475175517698fb61"},
13-
"electric_client": {:hex, :electric_client, "0.6.4", "a582b5df5aa6c94296e4d11c98431114f21136766c7f336f14cb1dabd44800d5", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.0.6", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "911768e0740cbe738e815c0ab9e378682ea4f800aa87442424f841d0b2c06fe9"},
13+
"electric_client": {:hex, :electric_client, "0.6.5-beta-3", "0344418d2ee25b5e43159a59092ca69942c41420804994330d8eb20f801d6ef0", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.0.6", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "ed49bfc57c914c6dd049dd7bfb1a079acdf600a5334ca3652cac31117617a213"},
1414
"electric_cubdb": {:hex, :electric_cubdb, "2.0.2", "36f86e3c52dc26f4e077a49fbef813b1a38d3897421cece851f149190b34c16c", [:mix], [], "hexpm", "0c0e24b31fb76ad1b33c5de2ab35c41a4ff9da153f5c1f9b15e2de78575acaf2"},
1515
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
1616
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},

lib/phoenix/sync.ex

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,14 @@ defmodule Phoenix.Sync do
1212
@shape_keys [:namespace, :where, :columns]
1313
@shape_params @shape_keys |> Enum.map(&to_string/1)
1414

15-
@type shape_options :: [
16-
unquote(NimbleOptions.option_typespec(PredefinedShape.schema()))
15+
@type queryable() :: Ecto.Queryable.t() | Ecto.Schema.t() | Ecto.Changeset.t()
16+
@type shape_specification :: [
17+
unquote(NimbleOptions.option_typespec(Phoenix.Sync.PredefinedShape.schema()))
1718
]
18-
19-
if Code.ensure_loaded?(Ecto) do
20-
@type shape_definition ::
21-
String.t()
22-
| Ecto.Queryable.t()
23-
| shape_options()
24-
else
25-
@type shape_definition() :: shape_options()
26-
end
27-
19+
@type shape_definition ::
20+
String.t()
21+
| queryable()
22+
| shape_specification()
2823
@type param_override ::
2924
{:namespace, String.t()}
3025
| {:table, String.t()}
@@ -287,7 +282,7 @@ defmodule Phoenix.Sync do
287282
- `columns` - The columns included in the shape. E.g. `["id", "title", "completed"]`
288283
- `params` - The values associated with a parameterized where clause. E.g. `[true, 1, "alive"]`, `%{1 => true}`
289284
"""
290-
@spec interrupt(shape_definition() | (match_shape_params() -> boolean()), shape_options()) ::
285+
@spec interrupt(shape_definition() | (match_shape_params() -> boolean()), shape_specification()) ::
291286
{:ok, non_neg_integer()}
292287
def interrupt(shape, shape_opts \\ []) do
293288
Phoenix.Sync.ShapeRequestRegistry.interrupt_matching(shape, shape_opts)
@@ -339,7 +334,7 @@ defmodule Phoenix.Sync do
339334
340335
#{NimbleOptions.docs(PredefinedShape.schema())}
341336
"""
342-
@spec shape!(shape_definition(), shape_options()) :: PredefinedShape.t()
337+
@spec shape!(shape_definition(), shape_specification()) :: PredefinedShape.t()
343338
def shape!(shape, shape_opts \\ []) do
344339
PredefinedShape.new!(shape, shape_opts)
345340
end

lib/phoenix/sync/application.ex

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ defmodule Phoenix.Sync.Application do
99

1010
@impl true
1111
def start(_type, _args) do
12-
base_children = [Phoenix.Sync.ShapeRequestRegistry]
12+
base_children = [Phoenix.Sync.Shape.Supervisor, Phoenix.Sync.ShapeRequestRegistry]
13+
14+
# Electric is noisy by default which isn't helpful when developing or in
15+
# production so up its log level to info (unless otherwise configured using
16+
# the `ELECTRIC_LOG_LEVEL` env var.
17+
Logger.put_application_level(:electric, electric_log_level())
1318

1419
children =
1520
case children() do
@@ -102,4 +107,16 @@ defmodule Phoenix.Sync.Application do
102107

103108
config
104109
end
110+
111+
defp electric_log_level do
112+
valid_levels = Enum.map(Logger.levels(), &to_string/1)
113+
configured_level = System.get_env("ELECTRIC_LOG_LEVEL", "info")
114+
115+
if configured_level in valid_levels do
116+
String.to_existing_atom(configured_level)
117+
else
118+
Logger.warning("Invalid ELECTRIC_LOG_LEVEL: #{configured_level}. Defaulting to :info.")
119+
:info
120+
end
121+
end
105122
end

lib/phoenix/sync/client.ex

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,23 @@ defmodule Phoenix.Sync.Client do
119119
@spec stream(Phoenix.Sync.shape_definition(), Electric.Client.stream_options()) :: Enum.t()
120120
def stream(shape, stream_opts \\ [])
121121

122-
def stream(shape, stream_opts) do
123-
stream(shape, stream_opts, nil)
122+
def stream(table, stream_opts) when is_binary(table) and is_list(stream_opts) do
123+
stream(Keyword.put(stream_opts, :table, table), [])
124124
end
125125

126-
@doc false
127-
# used for testing. `config` replace the application configuration
128-
def stream(shape, stream_opts, config) do
129-
client = new!(config)
130-
{shape, stream_opts} = resolve_shape(shape, stream_opts)
131-
Electric.Client.stream(client, shape, stream_opts)
126+
def stream(shape, []) when is_list(shape) do
127+
{client, shape} = Keyword.pop_lazy(shape, :client, &new!/0)
128+
129+
{shape, shape_stream_opts} = resolve_shape(shape, [])
130+
131+
Electric.Client.stream(client, shape, shape_stream_opts)
132+
end
133+
134+
def stream(shape, stream_opts) when not is_list(shape) and is_list(stream_opts) do
135+
{client, stream_opts} = Keyword.pop_lazy(stream_opts, :client, &new!/0)
136+
{shape, shape_stream_opts} = resolve_shape(shape, stream_opts)
137+
138+
Electric.Client.stream(client, shape, shape_stream_opts)
132139
end
133140

134141
defp resolve_shape(shape, stream_opts) do

lib/phoenix/sync/predefined_shape.ex

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ defmodule Phoenix.Sync.PredefinedShape do
4444
@type options() :: [option()]
4545

4646
if Code.ensure_loaded?(Ecto) do
47-
@type shape() :: options() | Electric.Client.ecto_shape()
47+
@type shape() :: options() | Phoenix.Sync.queryable()
4848
else
4949
@type shape() :: options()
5050
end
@@ -55,6 +55,14 @@ defmodule Phoenix.Sync.PredefinedShape do
5555
@doc false
5656
def schema, do: @keyword_shape_schema
5757

58+
def is_queryable?(schema) when is_atom(schema) do
59+
Code.ensure_loaded?(schema) && function_exported?(schema, :__schema__, 1) &&
60+
!is_nil(schema.__schema__(:source))
61+
end
62+
63+
def is_queryable?(q) when is_struct(q, Ecto.Query) or is_struct(q, Ecto.Changeset), do: true
64+
def is_queryable?(_), do: false
65+
5866
@doc false
5967
@spec new!(shape(), options()) :: t()
6068
def new!(opts, config \\ [])

lib/phoenix/sync/sandbox.ex

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
187187

188188
alias __MODULE__
189189

190+
defmodule Error do
191+
@moduledoc false
192+
defexception [:message]
193+
end
194+
190195
@type start_opts() :: [{:shared, boolean()}]
191196

192197
@registry __MODULE__.Registry
@@ -282,7 +287,7 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
282287

283288
stack_id ->
284289
case GenServer.whereis(__MODULE__.Stack.name(stack_id)) do
285-
nil -> raise RuntimeError, message: "no stack found for #{inspect(stack_id)}"
290+
nil -> raise Error, message: "no stack found for #{inspect(stack_id)}"
286291
_pid -> :ok
287292
end
288293
end
@@ -330,7 +335,8 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
330335

331336
@doc false
332337
def stack_id! do
333-
stack_id() || raise "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"
338+
stack_id() ||
339+
raise Error, message: "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"
334340
end
335341

336342
@doc false
@@ -344,14 +350,15 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
344350
# has even started
345351
case GenServer.whereis(Sandbox.StackRegistry) do
346352
nil ->
347-
raise """
348-
Phoenix.Sync.Sandbox is not running. Have you set the mode to `:sandbox` in `config/test.exs`?
353+
raise Error,
354+
message: """
355+
Phoenix.Sync.Sandbox is not running. Have you set the mode to `:sandbox` in `config/test.exs`?
349356
350-
# config/test.exs
351-
config :phoenix_sync,
352-
env: config_env(),
353-
mode: :sandbox
354-
"""
357+
# config/test.exs
358+
config :phoenix_sync,
359+
env: config_env(),
360+
mode: :sandbox
361+
"""
355362

356363
registry_pid when is_pid(registry_pid) ->
357364
pids
@@ -380,16 +387,19 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
380387
|> lookup_stack_id()
381388
|> case do
382389
nil ->
383-
raise RuntimeError,
384-
"No stack_id found for process #{inspect(parent)}. Did you call Phoenix.Sync.Sandbox.start!/1?"
390+
raise Error,
391+
message:
392+
"No stack_id found for process #{inspect(parent)}. Did you call Phoenix.Sync.Sandbox.start!/1?"
385393

386394
stack_id ->
387395
case GenServer.whereis(name_or_pid) do
388396
pid when is_pid(pid) ->
389397
Sandbox.StackRegistry.register(pid, stack_id)
390398

391399
other ->
392-
raise "`allow/4` expects a PID or a locally registered process name but lookup returned: #{inspect(other)}"
400+
raise Error,
401+
message:
402+
"`allow/4` expects a PID or a locally registered process name but lookup returned: #{inspect(other)}"
393403
end
394404
end
395405
end
@@ -424,7 +434,7 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
424434
if stack_id = stack_id() do
425435
Sandbox.StackRegistry.get_client(stack_id)
426436
else
427-
{:error, "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"}
437+
raise Error, message: "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"
428438
end
429439
end
430440

@@ -439,6 +449,18 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
439449
client
440450
end
441451

452+
@doc false
453+
def must_refetch!(shape) do
454+
stack_id = stack_id!()
455+
456+
{%{namespace: namespace, table: table}, _} =
457+
shape
458+
|> Phoenix.Sync.PredefinedShape.new!()
459+
|> Phoenix.Sync.PredefinedShape.to_stream_params()
460+
461+
Sandbox.Producer.truncate(stack_id, {namespace || "public", table})
462+
end
463+
442464
@impl true
443465
@doc false
444466
def init(_) do

lib/phoenix/sync/sandbox/producer.ex

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
66
Transaction,
77
NewRecord,
88
UpdatedRecord,
9-
DeletedRecord
9+
DeletedRecord,
10+
TruncatedRelation
1011
}
1112

1213
alias Electric.Replication.LogOffset
@@ -33,6 +34,10 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
3334
GenServer.cast(name(stack_id), {:emit_changes, changes})
3435
end
3536

37+
def truncate(stack_id, relation) do
38+
GenServer.cast(name(stack_id), {:truncate, relation})
39+
end
40+
3641
def name(stack_id) do
3742
Phoenix.Sync.Sandbox.name({__MODULE__, stack_id})
3843
end
@@ -60,6 +65,17 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
6065
{:noreply, %{state | txid: next_txid}}
6166
end
6267

68+
def handle_cast({:truncate, relation}, state) do
69+
changes = [%TruncatedRelation{relation: relation}]
70+
71+
:ok =
72+
state.txid
73+
|> transaction(changes)
74+
|> ShapeLogCollector.store_transaction(ShapeLogCollector.name(state.stack_id))
75+
76+
{:noreply, %{state | txid: state.txid + 100}}
77+
end
78+
6379
defp transaction(txid, changes) do
6480
%Transaction{
6581
xid: txid,

0 commit comments

Comments
 (0)