@@ -168,11 +168,36 @@ defmodule Electric.Postgres.Configuration do
168168 )
169169 end
170170
171- configuration_result =
171+ # Notes on avoiding deadlocks
172+ # - `ALTER TABLE` should be after the publication altering, because it takes out an exclusive lock over this table,
173+ # but the publication altering takes out a shared lock on all mentioned tables, so a concurrent transaction will
174+ # deadlock if the order is reversed, and we've seen this happen even within the context of a single process perhaps
175+ # across multiple calls from separate deployments or timing issues.
176+ # - It is important for all table operations to also occur in the same order to avoid deadlocks due to
177+ # lock ordering issues, so despite splitting drop and add operations we sort them and process them together
178+ # in a sorted single pass
179+ results =
172180 Enum . concat (
173- Enum . map ( to_drop , & { & 1 , drop_table_from_publication ( conn , publication_name , & 1 ) } ) ,
174- Enum . map ( to_add , & { & 1 , add_table_to_publication ( conn , publication_name , & 1 ) } )
181+ Enum . map ( to_drop , & { & 1 , :drop } ) ,
182+ Enum . map ( to_add , & { & 1 , :add } )
175183 )
184+ |> Enum . sort ( & ( elem ( & 1 , 0 ) <= elem ( & 2 , 0 ) ) )
185+ |> Enum . map ( fn
186+ { rel , :drop } -> { rel , drop_table_from_publication ( conn , publication_name , rel ) }
187+ { rel , :add } -> { rel , add_table_to_publication ( conn , publication_name , rel ) }
188+ end )
189+ |> Enum . map ( fn
190+ { rel , { :ok , :added } } ->
191+ with { :ok , :configured } <- set_table_replica_identity_full ( conn , rel ) do
192+ { rel , { :ok , :added } }
193+ end
194+
195+ res ->
196+ res
197+ end )
198+
199+ configuration_result =
200+ results
176201 |> Enum . concat ( Enum . map ( to_preserve , & { & 1 , { :ok , :validated } } ) )
177202 |> Enum . concat ( Enum . map ( to_invalidate , & { & 1 , { :error , :schema_changed } } ) )
178203 |> Map . new ( )
@@ -186,17 +211,46 @@ defmodule Electric.Postgres.Configuration do
186211 { _oid , relation } = oid_relation
187212 table = Utils . relation_to_sql ( relation )
188213
189- Logger . debug (
190- "Adding #{ table } to publication #{ publication_name } and " <>
191- "setting its replica identity to FULL"
192- )
214+ Logger . debug ( "Adding #{ table } to publication #{ publication_name } " )
193215
194- with :ok <- exec_alter_publication_for_table ( conn , publication_name , :add , table ) ,
195- :ok <- exec_set_replica_identity_full ( conn , table ) do
216+ with :ok <- exec_alter_publication_for_table ( conn , publication_name , :add , table ) do
196217 { :ok , :added }
197218 end
198219 end
199220
221+ @ spec set_table_replica_identity_full ( Postgrex . conn ( ) , Electric . oid_relation ( ) ) ::
222+ { :ok , :configured } | { :error , term ( ) }
223+ defp set_table_replica_identity_full ( conn , oid_relation ) do
224+ { _oid , relation } = oid_relation
225+ { schema , name } = relation
226+ table = Utils . relation_to_sql ( relation )
227+
228+ case Postgrex . query (
229+ conn ,
230+ """
231+ SELECT c.relreplident
232+ FROM pg_class c
233+ JOIN pg_namespace n ON n.oid = c.relnamespace
234+ WHERE n.nspname = $1 AND c.relname = $2
235+ """ ,
236+ [ schema , name ]
237+ ) do
238+ { :ok , % Postgrex.Result { rows: [ [ << "f" >> ] ] } } ->
239+ Logger . debug ( "Replica identity already FULL for #{ table } , skipping" )
240+ { :ok , :configured }
241+
242+ { :ok , _ } ->
243+ Logger . debug ( "Setting #{ table } replica identity to FULL" )
244+
245+ with :ok <- exec_set_replica_identity_full ( conn , table ) do
246+ { :ok , :configured }
247+ end
248+
249+ { :error , reason } ->
250+ { :error , reason }
251+ end
252+ end
253+
200254 @ spec drop_table_from_publication ( Postgrex . conn ( ) , String . t ( ) , Electric . oid_relation ( ) ) ::
201255 { :ok , :dropped } | { :error , term ( ) }
202256 defp drop_table_from_publication ( conn , publication_name , oid_relation ) do
@@ -211,9 +265,9 @@ defmodule Electric.Postgres.Configuration do
211265
212266 @ spec exec_alter_publication_for_table ( Postgrex . conn ( ) , String . t ( ) , :add | :drop , String . t ( ) ) ::
213267 :ok | { :error , term ( ) }
214- defp exec_alter_publication_for_table ( conn , publication_name , op , table ) do
268+ defp exec_alter_publication_for_table ( conn , publication_name , op_atom , table ) do
215269 op =
216- case op do
270+ case op_atom do
217271 :add -> "ADD"
218272 :drop -> "DROP"
219273 end
@@ -233,9 +287,10 @@ defmodule Electric.Postgres.Configuration do
233287 Postgrex . query! ( conn , "RELEASE SAVEPOINT before_publication" , [ ] )
234288
235289 case reason do
236- # Duplicate object error is raised if we're trying to add a table
237- # to the publication when it's already there.
238- % { postgres: % { code: :undefined_table } } -> :ok
290+ # undefined table can happen when removing a table that was already removed
291+ % { postgres: % { code: :undefined_object } } when op_atom == :drop -> :ok
292+ # duplicate object can happen when adding a table that was already added
293+ % { postgres: % { code: :duplicate_object } } when op_atom == :add -> :ok
239294 _ -> { :error , reason }
240295 end
241296 end
0 commit comments