@@ -13,7 +13,6 @@ defmodule Sequin.Runtime.SlotProducer do
1313
1414 use Sequin.GenerateBehaviour
1515
16- alias Postgrex.Protocol
1716 alias Sequin.Error
1817 alias Sequin.Error.NotFoundError
1918 alias Sequin.Health
@@ -107,8 +106,9 @@ defmodule Sequin.Runtime.SlotProducer do
107106 field :publication_name , String . t ( )
108107 field :pg_major_version , integer ( )
109108 field :conn , ( -> Postgres . db_conn ( ) )
110- # Postgres replication connection
111- field :protocol , Postgrex.Protocol . state ( )
109+ # Postgres replication backend
110+ field :backend_mod , module ( )
111+ field :backend_state , term ( )
112112 field :connect_opts , keyword ( )
113113 field :on_connect_fail , ( any ( ) -> any ( ) )
114114 field :on_disconnect , ( -> :ok )
@@ -206,7 +206,8 @@ defmodule Sequin.Runtime.SlotProducer do
206206 Keyword . get ( opts , :restart_wal_cursor_update_interval , to_timeout ( second: 10 ) ) ,
207207 consumer_mod: Keyword . get_lazy ( opts , :consumer_mod , fn -> PipelineDefaults . processor_mod ( ) end ) ,
208208 conn: Keyword . fetch! ( opts , :conn ) ,
209- setting_batch_flush_interval: Keyword . get ( opts , :batch_flush_interval )
209+ setting_batch_flush_interval: Keyword . get ( opts , :batch_flush_interval ) ,
210+ backend_mod: Keyword . get ( opts , :backend_mod , Sequin.Postgres.PostgrexBackend )
210211 }
211212
212213 if test_pid = opts [ :test_pid ] do
@@ -241,21 +242,21 @@ defmodule Sequin.Runtime.SlotProducer do
241242
242243 @ impl GenStage
243244 def handle_info ( :connect , % State { } = state ) do
244- with { :ok , protocol } <- Protocol . connect ( state . connect_opts ) ,
245+ with { :ok , backend_state } <- state . backend_mod . connect ( state . connect_opts ) ,
245246 Logger . info ( "[SlotProducer] Connected" ) ,
246247 :ok <- put_connected_health ( state . id ) ,
247- { :ok , % State { } = state , protocol } <- init_restart_wal_cursor ( state , protocol ) ,
248- { :ok , protocol } <- Protocol . handle_streaming ( start_replication_query ( state ) , protocol ) ,
249- { :ok , protocol } <- Protocol . checkin ( protocol ) do
250- state = % { state | protocol: protocol , status: :active }
248+ { :ok , % State { } = state , backend_state } <- init_restart_wal_cursor ( state , backend_state ) ,
249+ { :ok , backend_state } <- state . backend_mod . handle_streaming ( start_replication_query ( state ) , backend_state ) ,
250+ { :ok , backend_state } <- state . backend_mod . checkin ( backend_state ) do
251+ state = % { state | backend_state: backend_state , status: :active }
251252 state = schedule_timers ( state )
252253 { :noreply , [ ] , state }
253254 else
254255 error ->
255256 reason =
256257 case error do
257258 { :error , msg } -> msg
258- { :error , msg , % Protocol { } } -> msg
259+ { :error , msg , _backend_state } -> msg
259260 end
260261
261262 error_msg = if is_exception ( reason ) , do: Exception . message ( reason ) , else: inspect ( reason )
@@ -274,17 +275,18 @@ defmodule Sequin.Runtime.SlotProducer do
274275 raise "Unexpectedly received a second socket message while buffering sock messages"
275276 end
276277
277- def handle_info ( msg , % State { protocol: protocol } = state ) when is_socket_message ( msg ) do
278+ def handle_info ( msg , % State { backend_state: backend_state } = state ) when is_socket_message ( msg ) do
278279 maybe_log_message ( state )
279280
280- with { :ok , copies , protocol } <- Protocol . handle_copy_recv ( msg , @ max_messages_per_protocol_read , protocol ) ,
281- { :ok , state } <- handle_copies ( copies , % { state | protocol: protocol } ) do
281+ with { :ok , copies , backend_state } <-
282+ state . backend_mod . handle_copy_recv ( msg , @ max_messages_per_protocol_read , backend_state ) ,
283+ { :ok , state } <- handle_copies ( copies , % { state | backend_state: backend_state } ) do
282284 { messages , state } = maybe_produce_and_flush ( state )
283285 state = maybe_toggle_buffering ( state )
284286 { :noreply , messages , state }
285287 else
286- { error , reason , protocol } ->
287- handle_disconnect ( error , reason , % { state | protocol: protocol } )
288+ { error , reason , backend_state } ->
289+ handle_disconnect ( error , reason , % { state | backend_state: backend_state } )
288290 end
289291 end
290292
@@ -622,15 +624,15 @@ defmodule Sequin.Runtime.SlotProducer do
622624
623625 defp maybe_toggle_buffering ( % State { status: :active } = state ) , do: state
624626
625- defp init_restart_wal_cursor ( % State { } = state , protocol ) do
627+ defp init_restart_wal_cursor ( % State { } = state , backend_state ) do
626628 query = "select restart_lsn from pg_replication_slots where slot_name = '#{ state . slot_name } '"
627629
628630 case Replication . restart_wal_cursor ( state . id ) do
629631 { :error , % NotFoundError { } } ->
630- case Protocol . handle_simple ( query , [ ] , protocol ) do
631- { :ok , [ % Postgrex.Result { rows: [ [ lsn ] ] } ] , protocol } when not is_nil ( lsn ) ->
632+ case state . backend_mod . handle_simple ( query , [ ] , backend_state ) do
633+ { :ok , [ % Postgrex.Result { rows: [ [ lsn ] ] } ] , backend_state } when not is_nil ( lsn ) ->
632634 cursor = % { commit_lsn: Postgres . lsn_to_int ( lsn ) , commit_idx: 0 }
633- { :ok , % { state | restart_wal_cursor: cursor } , protocol }
635+ { :ok , % { state | restart_wal_cursor: cursor } , backend_state }
634636
635637 { :ok , _res } ->
636638 { :error ,
@@ -648,15 +650,15 @@ defmodule Sequin.Runtime.SlotProducer do
648650 end
649651
650652 { :ok , cursor } ->
651- { :ok , % { state | restart_wal_cursor: cursor } , protocol }
653+ { :ok , % { state | restart_wal_cursor: cursor } , backend_state }
652654 end
653655 end
654656
655657 ## Helpers
656658
657659 defp handle_disconnect ( error , reason , % State { } = state ) when error in [ :error , :disconnect ] do
658660 Logger . error ( "[SlotProducer] Replication disconnected: #{ inspect ( reason ) } " )
659- Protocol . disconnect ( % RuntimeError { } , state . protocol )
661+ state . backend_mod . disconnect ( % RuntimeError { } , state . backend_state )
660662 Process . send_after ( self ( ) , :connect , state . setting_reconnect_interval )
661663
662664 if is_function ( state . on_disconnect ) do
@@ -748,12 +750,12 @@ defmodule Sequin.Runtime.SlotProducer do
748750
749751 msg = ack_message ( state . restart_wal_cursor . commit_lsn )
750752
751- case Protocol . handle_copy_send ( msg , state . protocol ) do
753+ case state . backend_mod . handle_copy_send ( msg , state . backend_state ) do
752754 :ok ->
753755 { :noreply , [ ] , % { state | last_sent_restart_wal_cursor: state . restart_wal_cursor } }
754756
755- { error , reason , protocol } ->
756- handle_disconnect ( error , reason , % { state | protocol: protocol } )
757+ { error , reason , backend_state } ->
758+ handle_disconnect ( error , reason , % { state | backend_state: backend_state } )
757759 end
758760 end
759761
0 commit comments