11defmodule Electric.Replication.PublicationManager do
2- @ moduledoc false
2+ @ moduledoc """
3+ Manages a PostgreSQL publication for a given Electric stack, tracking shapes
4+ and ensuring that the publication configuration matches the required set of
5+ relations that need to be published for the shapes to function correctly.
6+
7+ Includes periodic checks of the publication to ensure that it remains valid,
8+ and expires any shapes that are no longer valid due to schema changes or
9+ permission issues.
10+ """
311 use GenServer
412
513 alias Electric.Postgres.Configuration
614 alias Electric.ShapeCache.ShapeCleaner
715 alias Electric.Shapes.Shape
16+ alias Electric.Telemetry.OpenTelemetry
817 alias Electric.Utils
918
1019 require Logger
@@ -14,6 +23,7 @@ defmodule Electric.Replication.PublicationManager do
1423 @ callback name ( binary ( ) | Keyword . t ( ) ) :: term ( )
1524 @ callback add_shape ( shape_handle ( ) , Shape . t ( ) , Keyword . t ( ) ) :: :ok
1625 @ callback remove_shape ( shape_handle ( ) , Keyword . t ( ) ) :: :ok
26+ @ callback wait_for_restore ( Keyword . t ( ) ) :: :ok
1727
1828 defstruct [
1929 :stack_id ,
@@ -23,13 +33,16 @@ defmodule Electric.Replication.PublicationManager do
2333 :can_alter_publication? ,
2434 :manual_table_publishing? ,
2535 :publication_refresh_period ,
36+ :restore_retry_timeout ,
2637 relation_ref_counts: % { } ,
2738 prepared_relation_filters: MapSet . new ( ) ,
2839 committed_relation_filters: MapSet . new ( ) ,
2940 tracked_shape_handles: % { } ,
3041 waiters: % { } ,
3142 scheduled_updated_ref: nil ,
32- next_update_forced?: false
43+ next_update_forced?: false ,
44+ restore_waiters: [ ] ,
45+ restore_complete?: false
3346 ]
3447
3548 @ type relation_filters ( ) :: MapSet . t ( Electric . oid_relation ( ) )
@@ -47,7 +60,10 @@ defmodule Electric.Replication.PublicationManager do
4760 can_alter_publication?: boolean ( ) ,
4861 manual_table_publishing?: boolean ( ) ,
4962 publication_refresh_period: non_neg_integer ( ) ,
50- next_update_forced?: boolean ( )
63+ next_update_forced?: boolean ( ) ,
64+ restore_waiters: [ GenServer . from ( ) ] ,
65+ restore_complete?: boolean ( ) ,
66+ restore_retry_timeout: non_neg_integer ( )
5167 }
5268
5369 # The default debounce timeout is 0, which means that the publication update
@@ -56,6 +72,9 @@ defmodule Electric.Replication.PublicationManager do
5672 # windows to aggregate shape filter updates
5773 @ default_debounce_timeout 0
5874
75+ # The default retry timeout in case of failed restore attempts
76+ @ default_restore_retry_timeout 1_000
77+
5978 @ name_schema_tuple { :tuple , [ :atom , :atom , :any ] }
6079 @ genserver_name_schema { :or , [ :atom , @ name_schema_tuple ] }
6180 @ schema NimbleOptions . new! (
@@ -67,7 +86,12 @@ defmodule Electric.Replication.PublicationManager do
6786 manual_table_publishing?: [ type: :boolean , required: false , default: false ] ,
6887 update_debounce_timeout: [ type: :timeout , default: @ default_debounce_timeout ] ,
6988 server: [ type: :any , required: false ] ,
70- refresh_period: [ type: :pos_integer , required: false , default: 60_000 ]
89+ refresh_period: [ type: :pos_integer , required: false , default: 60_000 ] ,
90+ restore_retry_timeout: [
91+ type: :pos_integer ,
92+ required: false ,
93+ default: @ default_restore_retry_timeout
94+ ]
7195 )
7296
7397 @ behaviour __MODULE__
@@ -83,26 +107,34 @@ defmodule Electric.Replication.PublicationManager do
83107 end
84108
85109 @ impl __MODULE__
86- def add_shape ( shape_id , shape , opts \\ [ ] ) do
110+ def add_shape ( shape_handle , shape , opts \\ [ ] ) do
87111 server = Access . get ( opts , :server , name ( opts ) )
88112 oid_relation = get_oid_relation_from_shape ( shape )
89113
90- case GenServer . call ( server , { :add_shape , shape_id , oid_relation } ) do
114+ case GenServer . call ( server , { :add_shape , shape_handle , oid_relation } ) do
91115 :ok -> :ok
92116 { :error , err } -> raise err
93117 end
94118 end
95119
96120 @ impl __MODULE__
97- def remove_shape ( shape_id , opts \\ [ ] ) do
121+ def remove_shape ( shape_handle , opts \\ [ ] ) do
98122 server = Access . get ( opts , :server , name ( opts ) )
99123
100- case GenServer . call ( server , { :remove_shape , shape_id } ) do
124+ case GenServer . call ( server , { :remove_shape , shape_handle } ) do
101125 :ok -> :ok
102126 { :error , err } -> raise err
103127 end
104128 end
105129
130+ @ impl __MODULE__
131+ def wait_for_restore ( opts \\ [ ] ) do
132+ server = Access . get ( opts , :server , name ( opts ) )
133+
134+ GenServer . call ( server , :wait_for_restore , Keyword . get ( opts , :timeout , :infinity ) )
135+ :ok
136+ end
137+
106138 def start_link ( opts ) do
107139 with { :ok , opts } <- NimbleOptions . validate ( opts , @ schema ) do
108140 stack_id = Keyword . fetch! ( opts , :stack_id )
@@ -133,10 +165,39 @@ defmodule Electric.Replication.PublicationManager do
133165 db_pool: opts . db_pool ,
134166 can_alter_publication?: opts . can_alter_publication? ,
135167 manual_table_publishing?: opts . manual_table_publishing? ,
136- publication_refresh_period: opts . refresh_period
168+ publication_refresh_period: opts . refresh_period ,
169+ restore_retry_timeout: opts . restore_retry_timeout
137170 }
138171
139- { :ok , state , state . publication_refresh_period }
172+ { :ok , state , { :continue , :restore_relations } }
173+ end
174+
175+ @ impl true
176+ def handle_continue ( :restore_relations , state ) do
177+ OpenTelemetry . with_span (
178+ "publication_manager.restore_relations" ,
179+ [ ] ,
180+ state . stack_id ,
181+ fn ->
182+ state =
183+ state . stack_id
184+ |> Electric.ShapeCache.ShapeStatus . list_shapes ( )
185+ |> Enum . reduce (
186+ state ,
187+ fn { shape_handle , shape } , state ->
188+ rel_key = get_oid_relation_from_shape ( shape )
189+ do_update_relation_filters_with_shape ( shape_handle , rel_key , :add , state )
190+ end
191+ )
192+
193+ state =
194+ if update_needed? ( state ) ,
195+ do: schedule_update_publication ( 0 , true , state ) ,
196+ else: mark_restore_complete ( state )
197+
198+ { :noreply , state , refresh_timeout ( state ) }
199+ end
200+ )
140201 end
141202
142203 @ impl true
@@ -146,9 +207,9 @@ defmodule Electric.Replication.PublicationManager do
146207
147208 if not relation_tracked? ( oid_rel , state ) do
148209 state = add_waiter ( from , oid_rel , state )
149- { :noreply , state }
210+ { :noreply , state , refresh_timeout ( state ) }
150211 else
151- { :reply , :ok , state , state . publication_refresh_period }
212+ { :reply , :ok , state , refresh_timeout ( state ) }
152213 end
153214 end
154215
@@ -160,7 +221,16 @@ defmodule Electric.Replication.PublicationManager do
160221 # reconcile the publication, otherwise you run into issues where only the last
161222 # removal fails and all others succeed. No removal guarantees anything about
162223 # the state of the publication.
163- { :reply , :ok , state , state . publication_refresh_period }
224+ { :reply , :ok , state , refresh_timeout ( state ) }
225+ end
226+
227+ def handle_call ( :wait_for_restore , from , state ) do
228+ if state . restore_complete? do
229+ { :reply , :ok , state , refresh_timeout ( state ) }
230+ else
231+ state = % { state | restore_waiters: [ from | state . restore_waiters ] }
232+ { :noreply , state , refresh_timeout ( state ) }
233+ end
164234 end
165235
166236 @ impl true
@@ -170,33 +240,45 @@ defmodule Electric.Replication.PublicationManager do
170240 state = % { state | scheduled_updated_ref: nil }
171241
172242 state =
173- case check_publication_status ( state ) do
174- { :ok , state } ->
175- case configure_publication ( state ) do
243+ OpenTelemetry . with_span (
244+ "publication_manager.update_publication" ,
245+ [
246+ is_restore: not state . restore_complete?
247+ ] ,
248+ state . stack_id ,
249+ fn ->
250+ case check_publication_status ( state ) do
176251 { :ok , state } ->
177- reply_to_all_waiters ( :ok , state )
252+ case configure_publication ( state ) do
253+ { :ok , state } ->
254+ state = mark_restore_complete ( state )
255+ reply_to_all_waiters ( :ok , state )
178256
179- { :ok , relations_configured , state } ->
180- handle_publication_update_result ( relations_configured , state )
257+ { :ok , relations_configured , state } ->
258+ state = mark_restore_complete ( state )
259+ handle_publication_update_result ( relations_configured , state )
260+
261+ { :error , err , state } ->
262+ reply_to_all_waiters ( { :error , err } , state )
263+ end
181264
182265 { :error , err , state } ->
266+ Logger . warning ( "Failed to confirm publication status: #{ inspect ( err ) } " )
183267 reply_to_all_waiters ( { :error , err } , state )
184268 end
185-
186- { :error , err , state } ->
187- Logger . warning ( "Failed to confirm publication status: #{ inspect ( err ) } " )
188- reply_to_all_waiters ( { :error , err } , state )
189- end
269+ end
270+ )
190271
191272 # Schedule a forced refresh to happen periodically unless there's an explicit call to
192273 # update the publication that happens sooner.
193- { :noreply , state , state . publication_refresh_period }
274+ { :noreply , state , refresh_timeout ( state ) }
194275 end
195276
196277 def handle_info ( :timeout , state ) do
197278 case Electric.StatusMonitor . status ( state . stack_id ) do
198279 % { conn: :up } ->
199- handle_info ( :update_publication , % { state | next_update_forced?: true } )
280+ state = schedule_update_publication ( 0 , true , state )
281+ { :noreply , state , refresh_timeout ( state ) }
200282
201283 status ->
202284 Logger . debug ( "Publication update skipped due to inactive stack: #{ inspect ( status ) } " )
@@ -247,22 +329,7 @@ defmodule Electric.Replication.PublicationManager do
247329 state = % { state | next_update_forced?: false }
248330
249331 try do
250- relations_configured =
251- if can_update? do
252- Configuration . configure_publication! (
253- state . db_pool ,
254- state . publication_name ,
255- state . prepared_relation_filters
256- )
257- else
258- Configuration . validate_publication_configuration! (
259- state . db_pool ,
260- state . publication_name ,
261- state . prepared_relation_filters
262- )
263- end
264-
265- { :ok , relations_configured , state }
332+ do_configure_publication! ( state )
266333 rescue
267334 err ->
268335 Logger . warning ( "Failed to #{ key_word } publication: #{ inspect ( err ) } " )
@@ -275,6 +342,25 @@ defmodule Electric.Replication.PublicationManager do
275342 end
276343 end
277344
345+ defp do_configure_publication! ( state ) do
346+ relations_configured =
347+ if can_update_publication? ( state ) do
348+ Configuration . configure_publication! (
349+ state . db_pool ,
350+ state . publication_name ,
351+ state . prepared_relation_filters
352+ )
353+ else
354+ Configuration . validate_publication_configuration! (
355+ state . db_pool ,
356+ state . publication_name ,
357+ state . prepared_relation_filters
358+ )
359+ end
360+
361+ { :ok , relations_configured , state }
362+ end
363+
278364 defguardp is_known_publication_error ( error )
279365 when is_exception ( error ) and
280366 ( is_struct ( error , Electric.DbConfigurationError ) or
@@ -349,6 +435,13 @@ defmodule Electric.Replication.PublicationManager do
349435 ) ,
350436 do: % { state | next_update_forced?: forced? or state . next_update_forced? }
351437
438+ defp mark_restore_complete ( % { restore_complete?: true } = state ) , do: state
439+
440+ defp mark_restore_complete ( state ) do
441+ for waiter <- state . restore_waiters , do: GenServer . reply ( waiter , :ok )
442+ % { state | restore_complete?: true , restore_waiters: [ ] }
443+ end
444+
352445 defp relation_tracked? ( oid_rel , % __MODULE__ { committed_relation_filters: committed } ) do
353446 MapSet . member? ( committed , oid_rel )
354447 end
@@ -367,6 +460,9 @@ defmodule Electric.Replication.PublicationManager do
367460 can_alter and not manual
368461 end
369462
463+ defp refresh_timeout ( % { restore_complete?: false , restore_retry_timeout: timeout } ) , do: timeout
464+ defp refresh_timeout ( % { publication_refresh_period: period } ) , do: period
465+
370466 defguardp is_tracking_shape_handle? ( shape_handle , state )
371467 when is_map_key ( state . tracked_shape_handles , shape_handle )
372468
0 commit comments