Skip to content

Commit 03739b5

Browse files
committed
properly encode ecto structs
1 parent 4bd9697 commit 03739b5

File tree

5 files changed

+300
-25
lines changed

5 files changed

+300
-25
lines changed

lib/phoenix/sync/sandbox/producer.ex

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
7676
{
7777
%NewRecord{
7878
relation: relation(schema_meta),
79-
record: record(values),
79+
record: record(values, schema_meta),
8080
log_offset: log_offset(txid, i)
8181
},
8282
lsn + 100
@@ -87,8 +87,8 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
8787
{
8888
UpdatedRecord.new(
8989
relation: relation(schema_meta),
90-
old_record: record(old),
91-
record: record(new),
90+
old_record: record(old, schema_meta),
91+
record: record(new, schema_meta),
9292
log_offset: log_offset(txid, i)
9393
),
9494
lsn + 100
@@ -99,7 +99,7 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
9999
{
100100
%DeletedRecord{
101101
relation: relation(schema_meta),
102-
old_record: record(old),
102+
old_record: record(old, schema_meta),
103103
log_offset: log_offset(txid, i)
104104
},
105105
lsn + 100
@@ -113,13 +113,93 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
113113
defp namespace(nil), do: "public"
114114
defp namespace(ns) when is_binary(ns), do: ns
115115

116-
defp record(values) do
117-
# FIXME: should we use the schema to cast these values?
118-
Map.new(values, fn {k, v} -> {to_string(k), to_string(v)} end)
116+
defp record(values, %{schema: schema}) do
117+
Map.new(values, &load_value(&1, schema))
119118
end
120119

120+
defp load_value({field, raw_value}, schema) do
121+
type = schema.__schema__(:type, field)
122+
123+
{:ok, value} =
124+
Ecto.Type.adapter_load(
125+
Ecto.Adapters.Postgres,
126+
type,
127+
raw_value
128+
)
129+
130+
# Converts to lower level postgrex type which depends on the type's
131+
# `type/0` value. Postgrex does the actual serialization of maps in real
132+
# usage so this converts embed structs to plain maps.
133+
#
134+
# Postgrex also encodes date & time types, lists and decimals itself (so
135+
# ecto just leaves these as-is) so the `dump/1` function needs to do the
136+
# work normally done by postgrex
137+
{:ok, value} = Ecto.Type.dump(type, value)
138+
139+
{to_string(field), dump(value)}
140+
end
141+
142+
defp dump(%Decimal{} = decimal) do
143+
Decimal.to_string(decimal)
144+
end
145+
146+
defp dump(%type{} = datetime)
147+
when type in [NaiveDateTime, DateTime, Time, Date] do
148+
type.to_iso8601(datetime)
149+
end
150+
151+
defp dump(map) when is_map(map), do: JSON.encode!(map)
152+
defp dump(list) when is_list(list), do: encode_array(list)
153+
defp dump(nil), do: nil
154+
defp dump(value), do: to_string(value)
155+
121156
defp log_offset(txid, index) do
122157
LogOffset.new(txid, index)
123158
end
159+
160+
@doc ~S"""
161+
## Examples
162+
163+
iex> encode_array([1, 2, 3])
164+
"{1,2,3}"
165+
166+
iex> encode_array(["a", "b", "c"])
167+
~s|{"a","b","c"}|
168+
169+
iex> encode_array(["a\"", "b", "c"])
170+
~S|{"a\"","b","c"}|
171+
172+
iex> encode_array([])
173+
"{}"
174+
175+
iex> encode_array([1, nil, 3])
176+
"{1,NULL,3}"
177+
178+
iex> encode_array([[1, [2]], [3, 4]])
179+
"{{1,{2}},{3,4}}"
180+
"""
181+
def encode_array(array) when is_list(array) do
182+
encode_array_inner(array) |> IO.iodata_to_binary()
183+
end
184+
185+
defp encode_array_inner(array) do
186+
[?{, Enum.map_intersperse(array, ",", &encode_value/1), ?}]
187+
end
188+
189+
defp encode_value(list) when is_list(list) do
190+
encode_array_inner(list)
191+
end
192+
193+
defp encode_value(nil) do
194+
"NULL"
195+
end
196+
197+
defp encode_value(value) when is_binary(value) do
198+
[?", String.replace(value, "\"", "\\\""), ?"]
199+
end
200+
201+
defp encode_value(value) do
202+
dump(value)
203+
end
124204
end
125205
end

mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ defmodule Phoenix.Sync.MixProject do
4343
{:jason, "~> 1.0"},
4444
{:ecto_sql, "~> 3.10", optional: true},
4545
{:electric, "~> 1.0.24", optional: true},
46-
{:electric_client, ">= 0.6.4"}
46+
# 0.6.5 has the decoding fix
47+
{:electric_client, "> 0.6.4"}
4748
] ++ deps_for_env(Mix.env())
4849
end
4950

mix.lock

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"ecto": {:hex, :ecto, "3.13.2", "7d0c0863f3fc8d71d17fc3ad3b9424beae13f02712ad84191a826c7169484f01", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "669d9291370513ff56e7b7e7081b7af3283d02e046cf3d403053c557894a0b3e"},
1616
"ecto_sql": {:hex, :ecto_sql, "3.13.2", "a07d2461d84107b3d037097c822ffdd36ed69d1cf7c0f70e12a3d1decf04e2e1", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "539274ab0ecf1a0078a6a72ef3465629e4d6018a3028095dc90f60a19c371717"},
1717
"electric": {:hex, :electric, "1.0.24", "f17ee7971390cf710a731a349456f6da43750fbc6582d62793c8702c636ab203", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:bandit, "~> 1.6", [hex: :bandit, repo: "hexpm", optional: false]}, {:dotenvy, "~> 1.1", [hex: :dotenvy, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:electric_cubdb, "~> 2.0", [hex: :electric_cubdb, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5", [hex: :opentelemetry, repo: "hexpm", optional: true]}, {:opentelemetry_exporter, "~> 1.8", [hex: :opentelemetry_exporter, repo: "hexpm", optional: true]}, {:opentelemetry_semantic_conventions, "~> 1.27", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}, {:opentelemetry_telemetry, "~> 1.1", [hex: :opentelemetry_telemetry, repo: "hexpm", optional: false]}, {:otel_metric_exporter, "~> 0.3.9", [hex: :otel_metric_exporter, repo: "hexpm", optional: true]}, {:pg_query_ex, "0.7.0", [hex: :pg_query_ex, repo: "hexpm", optional: false]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.20", [hex: :postgrex, repo: "hexpm", optional: false]}, {:remote_ip, "~> 1.2", [hex: :remote_ip, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:sentry, "~> 10.9", [hex: :sentry, repo: "hexpm", optional: true]}, {:stream_split, "~> 0.1", [hex: :stream_split, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.1", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: true]}, {:telemetry_metrics_statsd, "~> 0.7", [hex: :telemetry_metrics_statsd, repo: "hexpm", optional: true]}, {:telemetry_poller, "~> 1.2", [hex: :telemetry_poller, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.27", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}, {:tz, "~> 0.28", [hex: :tz, repo: "hexpm", optional: false]}], "hexpm", "91e3a8b957c1e02d07da3a0e1e902420f32e1d7d5da25814475175517698fb61"},
18-
"electric_client": {:hex, :electric_client, "0.6.4", "a582b5df5aa6c94296e4d11c98431114f21136766c7f336f14cb1dabd44800d5", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.0.6", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "911768e0740cbe738e815c0ab9e378682ea4f800aa87442424f841d0b2c06fe9"},
18+
"electric_client": {:hex, :electric_client, "0.6.5-beta-1", "7e7153c28b30f1a4b54b84f3b08d20ca7827ecbbabce26e6429b76a732ba6e1f", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.0.6", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "444cd8002fee9473bfa2030a17c721603ad565d1991588650c36dd5e077c43b2"},
1919
"electric_cubdb": {:hex, :electric_cubdb, "2.0.2", "36f86e3c52dc26f4e077a49fbef813b1a38d3897421cece851f149190b34c16c", [:mix], [], "hexpm", "0c0e24b31fb76ad1b33c5de2ab35c41a4ff9da153f5c1f9b15e2de78575acaf2"},
2020
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
2121
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
@@ -50,12 +50,12 @@
5050
"pg_query_ex": {:hex, :pg_query_ex, "0.7.0", "189f0c0d2b6fce78def670f3cba411baa9311a099bcd0cdb0501adcfede37677", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:protox, "~> 1.7", [hex: :protox, repo: "hexpm", optional: false]}], "hexpm", "c39cb58690fa8f19cdd1939c41c5906b65f1e70351ea4a45a9da680ca3ad8c66"},
5151
"phoenix": {:hex, :phoenix, "1.7.21", "14ca4f1071a5f65121217d6b57ac5712d1857e40a0833aff7a691b7870fc9a3b", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "336dce4f86cba56fed312a7d280bf2282c720abb6074bdb1b61ec8095bdd0bc9"},
5252
"phoenix_html": {:hex, :phoenix_html, "4.2.1", "35279e2a39140068fc03f8874408d58eef734e488fc142153f055c5454fd1c08", [:mix], [], "hexpm", "cff108100ae2715dd959ae8f2a8cef8e20b593f8dfd031c9cba92702cf23e053"},
53-
"phoenix_live_view": {:hex, :phoenix_live_view, "1.1.1", "dfb8b5d60bb581eeeff0152d3dbe5d4f10f66d80cac8b6d9ad731454de2e16a3", [:mix], [{:igniter, ">= 0.6.16 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:lazy_html, "~> 0.1.0", [hex: :lazy_html, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6.15 or ~> 1.7.0 or ~> 1.8.0-rc", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.15", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c77e3a7af23f10de1eab0b9aa0c8b6ad6bee395517a69dbc8f0ac4bb625b64b9"},
53+
"phoenix_live_view": {:hex, :phoenix_live_view, "1.1.3", "0473936730cc76f9b02e52f131e081c63e5e5c3851003878dd3cbe12124fb39f", [:mix], [{:igniter, ">= 0.6.16 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:lazy_html, "~> 0.1.0", [hex: :lazy_html, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6.15 or ~> 1.7.0 or ~> 1.8.0-rc", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.15", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "942967524e8d256ce6847ca3143d94425fa5125b53563790a609c78740cfb6c9"},
5454
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"},
5555
"phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"},
5656
"plug": {:hex, :plug, "1.18.1", "5067f26f7745b7e31bc3368bc1a2b818b9779faa959b49c934c17730efc911cf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "57a57db70df2b422b564437d2d33cf8d33cd16339c1edb190cd11b1a3a546cc2"},
5757
"plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"},
58-
"postgrex": {:hex, :postgrex, "0.20.0", "363ed03ab4757f6bc47942eff7720640795eb557e1935951c1626f0d303a3aed", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d36ef8b36f323d29505314f704e21a1a038e2dc387c6409ee0cd24144e187c0f"},
58+
"postgrex": {:hex, :postgrex, "0.21.1", "2c5cc830ec11e7a0067dd4d623c049b3ef807e9507a424985b8dcf921224cd88", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "27d8d21c103c3cc68851b533ff99eef353e6a0ff98dc444ea751de43eb48bdac"},
5959
"protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"},
6060
"protox": {:hex, :protox, "1.7.8", "ccae41afec6e63cf061bee874d7d042ed585d501df1cd004661ffac0e5628686", [:mix], [{:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm", "f6702c9deb9fb7cd2eadd73d3dbc0303c506dc87635e509228c61309f7062933"},
6161
"remote_ip": {:hex, :remote_ip, "1.2.0", "fb078e12a44414f4cef5a75963c33008fe169b806572ccd17257c208a7bc760f", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2ff91de19c48149ce19ed230a81d377186e4412552a597d6a5137373e5877cb7"},
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
defmodule Phoenix.Sync.Sandbox.ProducerTest do
2+
use ExUnit.Case, async: true
3+
4+
doctest Phoenix.Sync.Sandbox.Producer, import: true
5+
end

0 commit comments

Comments
 (0)