Skip to content

Commit 59f6b14

Browse files
authored
feat: Support generated columns in Postgres 18 (#3297)
Closes #3220 ## Overview of changes I've made the following core changes to support this: - When creating a new publication, if on PG18 and above, always set `publish_generated_columns = 'stored'`, see [PG docs](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-GENERATED-COLUMNS) for details - If we find an existing publication, whether owned or not, we do not alter it. - For people upgrading Electric on a PG18 db, they will have to manually set the publication parameter or somehow reset the electric slot/publication/timeline to get this feature. I could add a step to always enable it if we can, but ideally in a separate PR. - In the `PublicationManager`, before each update, we check the publication status which now includes whether generated columns are replicated - We cache the result, and we initialise with an optimistic assumption that generated columns are supported - In PubMan, we now keep track of which shape handles use generated columns, and in case the configuration changes we do an O(N) scan over our tracked shapes and schedule removals for any shape that used generated columns - Slow scan is fine, it's just a list traversal that happens _only_ if the publication configuration is disabled at runtime, which might be the case for manually configured publications but never for our own owned publication. - Any shapes pending creation that require generated columns would also receive an error reply - Once the configuration result is cached, shape creations immediately get an error answer from the publication manager until a check confirms that generated columns are supported - The Inspector now loads `supported_features` from Postgres, so at the validation stage we can determine if we should block generated column shapes the same way we have until now, or whether we should let them through and let the PublicationManager determine if they can be created. - We will now get different errors if you are on e.g. PG17 and request a shape with generated columns, and if you are on PG18 and above. In the latter you now receive: ``` Publication "{pub_name}" does not publish generated columns. This is a feature introduced in PostgreSQL 18 and requires setting the publication parameter 'publish_generated_columns' to 'stored'. Alternatively, you can exclude them from the shape by explicitly listing which columns to fetch in the 'columns' query param. ``` - Refactored the Inspector ETS table name to `inspector_table` rather than `relation_table` for more generality as it now holds more Postgres inspection information - I think it makes sense to reuse the Inspector for this to keep the infrastructure decoupled, this is purely information about what Postgres supports as part of a validation step. - Parse PublicationManager error, which gets turned into a `SnapshotError`, at the API level to return 400 in case the feature is supported in the database but not configured. - Some minor typespec fixes and cleanups of unnecessary info (e.g. `can_alter_publication?` flag in ConnMan - the pubman will determine it by itself at runtime) ## Potential alternatives - Rather than return errors from the publication manager, we can just clean up shapes when the configuration changes and set the flag appropriately somewhere so that we continue to reject generated columns in the same way at the shape creation validation stage - PRO: simplify handling of cases by introducing a sort of "supported features" table for the validation to lookup - PRO: by separating into a separate "supported features" table we could offload it from the inspector (although it would still need to check postgres somehow, so I don't like this) - CON: I think it's important to separate when a feature is not available and when it is not enabled by choice - by letting the publication manager reply, we are notifying developers that they _could_ use generated columns if they wanted to if they are using PG18 and above.
1 parent 14ce221 commit 59f6b14

27 files changed

+799
-297
lines changed

packages/sync-service/dev/docker-compose-otel.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: 'electric_dev_otel'
33

44
services:
55
postgres:
6-
image: postgres:16-alpine
6+
image: postgres:18-alpine
77
environment:
88
POSTGRES_DB: electric
99
POSTGRES_USER: postgres

packages/sync-service/dev/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: 'electric_dev'
33

44
services:
55
postgres:
6-
image: postgres:17-alpine
6+
image: postgres:18-alpine
77
environment:
88
POSTGRES_DB: electric
99
POSTGRES_USER: postgres
@@ -22,7 +22,7 @@ services:
2222
- -c
2323
- config_file=/etc/postgresql.conf
2424
postgres2:
25-
image: postgres:17-alpine
25+
image: postgres:18-alpine
2626
environment:
2727
POSTGRES_DB: electric
2828
POSTGRES_USER: postgres

packages/sync-service/lib/electric/connection/manager.ex

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ defmodule Electric.Connection.Manager do
103103
:pg_system_identifier,
104104
# PostgreSQL timeline ID
105105
:pg_timeline_id,
106-
# Capability flag that is set during replication client initialization and shows whether
107-
# the PG role has the necessary privilege to alter the PG publication.
108-
:can_alter_publication?,
109106
# User setting that determines whether the table publishing is to be automatically
110107
# managed by the stack or whether it's the user's responsibility.
111108
:manual_table_publishing?,
@@ -225,10 +222,6 @@ defmodule Electric.Connection.Manager do
225222
GenServer.cast(manager, :replication_client_created_new_slot)
226223
end
227224

228-
def replication_client_has_insufficient_privilege(manager) do
229-
GenServer.cast(manager, :replication_client_has_insufficient_privilege)
230-
end
231-
232225
def replication_client_ready_to_stream(manager) do
233226
GenServer.cast(manager, :replication_client_ready_to_stream)
234227
end
@@ -315,7 +308,6 @@ defmodule Electric.Connection.Manager do
315308
stack_events_registry: Keyword.fetch!(opts, :stack_events_registry),
316309
tweaks: Keyword.fetch!(opts, :tweaks),
317310
persistent_kv: Keyword.fetch!(opts, :persistent_kv),
318-
can_alter_publication?: true,
319311
manual_table_publishing?: Keyword.get(opts, :manual_table_publishing?, false),
320312
max_shapes: Keyword.fetch!(opts, :max_shapes),
321313
expiry_batch_size: Keyword.fetch!(opts, :expiry_batch_size)
@@ -534,7 +526,6 @@ defmodule Electric.Connection.Manager do
534526
pool_opts: state.pool_opts,
535527
replication_opts: state.replication_opts,
536528
tweaks: state.tweaks,
537-
can_alter_publication?: state.can_alter_publication?,
538529
manual_table_publishing?: state.manual_table_publishing?,
539530
persistent_kv: state.persistent_kv,
540531
max_shapes: state.max_shapes,
@@ -807,16 +798,6 @@ defmodule Electric.Connection.Manager do
807798
{:noreply, %{state | purge_all_shapes?: true}}
808799
end
809800

810-
def handle_cast(
811-
:replication_client_has_insufficient_privilege,
812-
%State{
813-
current_phase: :connection_setup,
814-
current_step: {:start_replication_client, :configuring_connection}
815-
} = state
816-
) do
817-
{:noreply, %{state | can_alter_publication?: false}}
818-
end
819-
820801
def handle_cast(
821802
:replication_client_ready_to_stream,
822803
%State{

packages/sync-service/lib/electric/connection/manager/supervisor.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ defmodule Electric.Connection.Manager.Supervisor do
5959
{Electric.Replication.PublicationManager,
6060
stack_id: stack_id,
6161
publication_name: Keyword.fetch!(replication_opts, :publication_name),
62-
can_alter_publication?: Keyword.fetch!(opts, :can_alter_publication?),
6362
manual_table_publishing?: Keyword.fetch!(opts, :manual_table_publishing?),
6463
db_pool: Electric.Connection.Manager.admin_pool(stack_id),
6564
update_debounce_timeout: Keyword.get(tweaks, :publication_alter_debounce_ms, 0),

packages/sync-service/lib/electric/db_configuration_error.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ defmodule Electric.DbConfigurationError do
1818
}
1919
end
2020

21+
def publication_missing_generated_columns(pub_name) do
22+
%Electric.DbConfigurationError{
23+
type: :publication_missing_generated_columns,
24+
message:
25+
"Publication #{Utils.quote_name(pub_name)} does not publish generated columns." <>
26+
" This is a feature introduced in PostgreSQL 18 and requires setting the publication parameter 'publish_generated_columns' to 'stored'." <>
27+
" Alternatively, you can exclude them from the shape by explicitly listing which columns to fetch in the 'columns' query param."
28+
}
29+
end
30+
2131
def publication_not_owned(pub_name) do
2232
%Electric.DbConfigurationError{
2333
type: :publication_not_owned,

packages/sync-service/lib/electric/postgres/configuration.ex

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ defmodule Electric.Postgres.Configuration do
2121
| :misconfigured_replica_identity}
2222
}
2323

24+
@type publication_status :: %{
25+
can_alter_publication?: boolean(),
26+
publishes_all_operations?: boolean(),
27+
publishes_generated_columns?: boolean()
28+
}
29+
2430
@typep changed_relation ::
2531
{
2632
Electric.relation_id(),
@@ -32,41 +38,44 @@ defmodule Electric.Postgres.Configuration do
3238
@typep publication_relation :: {Electric.relation_id(), Electric.relation(), <<_::8>>}
3339

3440
@doc """
35-
Check whether the publication with the given name exists, is configured to
36-
publish all required operations, and is owned by the current user.
41+
Check whether the publication with the given name exists, and return its status.
42+
43+
The status includes whether the publication is owned, whether it publishes all operations,
44+
and whether it publishes generated columns (if supported by the Postgres version).
3745
"""
38-
@spec check_publication_status!(Postgrex.conn(), String.t()) :: :ok
46+
@spec check_publication_status!(Postgrex.conn(), String.t()) ::
47+
publication_status() | :not_found
3948
def check_publication_status!(conn, publication_name) do
49+
# note: we could do a separate query to get the PG version, unsure which is more efficient
50+
# but avoiding having to know the PG version in advance keeps things contained and simple
4051
query =
4152
"""
4253
SELECT
43-
pubinsert AND pubupdate AND pubdelete AND pubtruncate as all_operations,
44-
pg_get_userbyid(p.pubowner) = current_role as is_owned
45-
FROM pg_publication as p WHERE pubname = $1
54+
pg_get_userbyid(p.pubowner) = current_role as can_alter_publication,
55+
pubinsert AND pubupdate AND pubdelete AND pubtruncate as publishes_all_operations,
56+
CASE WHEN current_setting('server_version_num')::int >= 180000
57+
THEN (to_jsonb(p) ->> 'pubgencols') = 's'
58+
ELSE FALSE
59+
END AS publishes_generated_columns
60+
FROM pg_publication as p WHERE pubname = $1;
4661
"""
4762

4863
Postgrex.query!(conn, query, [publication_name])
49-
|> one_or_nil()
5064
|> case do
51-
%{"all_operations" => true, "is_owned" => true} ->
52-
:ok
53-
54-
%{"is_owned" => false} ->
55-
raise Electric.DbConfigurationError.publication_not_owned(publication_name)
56-
57-
%{"all_operations" => false} ->
58-
raise Electric.DbConfigurationError.publication_missing_operations(publication_name)
65+
%Postgrex.Result{
66+
rows: [[can_alter_publication, publishes_all_operations, publishes_generated_columns]]
67+
} ->
68+
%{
69+
can_alter_publication?: can_alter_publication,
70+
publishes_all_operations?: publishes_all_operations,
71+
publishes_generated_columns?: publishes_generated_columns
72+
}
5973

60-
_ ->
61-
raise Electric.DbConfigurationError.publication_missing(publication_name)
74+
%Postgrex.Result{num_rows: 0} ->
75+
:not_found
6276
end
6377
end
6478

65-
defp one_or_nil(%Postgrex.Result{rows: [row], columns: cols}),
66-
do: Enum.zip(cols, row) |> Map.new()
67-
68-
defp one_or_nil(_), do: nil
69-
7079
@doc """
7180
Check whether the state of the publication relations in the database matches the sets of
7281
filters passed into the function.

packages/sync-service/lib/electric/postgres/inspector.ex

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ defmodule Electric.Postgres.Inspector do
2828
children: nil | [relation(), ...]
2929
}
3030

31+
@type supported_features :: %{
32+
supports_generated_column_replication: boolean()
33+
}
34+
3135
@callback load_relation_oid(relation(), opts :: term()) ::
3236
{:ok, Electric.oid_relation()}
3337
| :table_not_found
@@ -43,6 +47,10 @@ defmodule Electric.Postgres.Inspector do
4347
| :table_not_found
4448
| {:error, String.t() | :connection_not_available}
4549

50+
@callback load_supported_features(opts :: term()) ::
51+
{:ok, supported_features()}
52+
| {:error, String.t() | :connection_not_available}
53+
4654
@callback clean(relation_id(), opts :: term()) :: :ok
4755

4856
@callback list_relations_with_stale_cache(opts :: term()) ::
@@ -91,6 +99,15 @@ defmodule Electric.Postgres.Inspector do
9199
module.load_column_info(relation_id, opts)
92100
end
93101

102+
@doc """
103+
Load the supported features on the target database using a provided inspector.
104+
"""
105+
@spec load_supported_features(inspector()) ::
106+
{:ok, supported_features()} | {:error, String.t() | :connection_not_available}
107+
def load_supported_features({module, opts}) do
108+
module.load_supported_features(opts)
109+
end
110+
94111
@doc """
95112
Clean up all information about a given relation using a provided inspector.
96113
"""

packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
230230
defp parse_type_kind("r"), do: :range
231231
defp parse_type_kind("m"), do: :multirange
232232

233+
@impl Electric.Postgres.Inspector
234+
def load_supported_features(conn) do
235+
with {:ok, %{rows: [[pg_version]]}} <-
236+
Postgrex.query(conn, "SELECT current_setting('server_version_num')::int", []) do
237+
{:ok,
238+
%{
239+
supports_generated_column_replication: pg_version >= 180_000
240+
}}
241+
end
242+
end
243+
233244
@impl Electric.Postgres.Inspector
234245
def clean(_, _), do: :ok
235246
end

0 commit comments

Comments
 (0)