@@ -36,6 +36,7 @@ defmodule Realtime.Tenants.Connect do
3636 replication_connection_pid: pid ( ) ,
3737 replication_connection_reference: reference ( ) ,
3838 backoff: Backoff . t ( ) ,
39+ replication_connection_attempts: non_neg_integer ( ) ,
3940 check_connected_user_interval: non_neg_integer ( ) ,
4041 connected_users_bucket: list ( non_neg_integer ( ) ) ,
4142 check_connect_region_interval: non_neg_integer ( )
@@ -47,6 +48,7 @@ defmodule Realtime.Tenants.Connect do
4748 replication_connection_pid: nil ,
4849 replication_connection_reference: nil ,
4950 backoff: nil ,
51+ replication_connection_attempts: 0 ,
5052 check_connected_user_interval: nil ,
5153 connected_users_bucket: [ 1 ] ,
5254 check_connect_region_interval: nil
@@ -378,16 +380,23 @@ defmodule Realtime.Tenants.Connect do
378380 end
379381
380382 @ replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'"
383+ @ max_replication_connection_attempts 60
384+ def handle_info (
385+ :recover_replication_connection ,
386+ % { replication_connection_attempts: @ max_replication_connection_attempts } = state
387+ ) do
388+ Logger . warning ( "Max replication connection attempts reached, terminating connection" )
389+ { :stop , :shutdown , state }
390+ end
391+
381392 def handle_info ( :recover_replication_connection , state ) do
382- % { backoff: backoff , db_conn_pid: db_conn_pid } = state
393+ % { backoff: backoff , db_conn_pid: db_conn_pid , replication_connection_attempts: replication_connection_attempts } =
394+ state
383395
384396 with % { num_rows: 0 } <- Postgrex . query! ( db_conn_pid , @ replication_connection_query , [ ] ) ,
385397 { :ok , state } <- start_replication_connection ( state ) do
386- { :noreply , % { state | backoff: Backoff . reset ( backoff ) } }
398+ { :noreply , % { state | backoff: Backoff . reset ( backoff ) , replication_connection_attempts: 0 } }
387399 else
388- % { num_rows: _ } ->
389- { :noreply , % { state | backoff: Backoff . reset ( backoff ) } }
390-
391400 { :error , error } ->
392401 { timeout , backoff } = Backoff . backoff ( backoff )
393402
@@ -397,7 +406,7 @@ defmodule Realtime.Tenants.Connect do
397406 )
398407
399408 Process . send_after ( self ( ) , :recover_replication_connection , timeout )
400- { :noreply , % { state | backoff: backoff } }
409+ { :noreply , % { state | backoff: backoff , replication_connection_attempts: replication_connection_attempts + 1 } }
401410 end
402411 end
403412
0 commit comments