@@ -1068,7 +1068,8 @@ defmodule Sequin.Postgres do
10681068 Gets both restart_lsn and confirmed_flush_lsn for a replication slot.
10691069 """
10701070 @ spec get_replication_lsns ( db_conn ( ) , String . t ( ) ) ::
1071- { :ok , % { restart_lsn: non_neg_integer ( ) , confirmed_flush_lsn: non_neg_integer ( ) } } | { :error , Error . t ( ) }
1071+ { :ok , % { restart_lsn: non_neg_integer ( ) | nil , confirmed_flush_lsn: non_neg_integer ( ) | nil } }
1072+ | { :error , Error . t ( ) }
10721073 def get_replication_lsns ( conn , slot_name ) do
10731074 query = """
10741075 SELECT restart_lsn, confirmed_flush_lsn
@@ -1078,7 +1079,9 @@ defmodule Sequin.Postgres do
10781079
10791080 case query ( conn , query , [ slot_name ] ) do
10801081 { :ok , % { rows: [ [ restart_lsn , confirmed_flush_lsn ] ] } } ->
1081- { :ok , % { restart_lsn: lsn_to_int ( restart_lsn ) , confirmed_flush_lsn: lsn_to_int ( confirmed_flush_lsn ) } }
1082+ restart_lsn = unless is_nil ( restart_lsn ) , do: lsn_to_int ( restart_lsn )
1083+ confirmed_flush_lsn = unless is_nil ( confirmed_flush_lsn ) , do: lsn_to_int ( confirmed_flush_lsn )
1084+ { :ok , % { restart_lsn: restart_lsn , confirmed_flush_lsn: confirmed_flush_lsn } }
10821085
10831086 { :ok , % { rows: [ ] } } ->
10841087 { :error , Error . not_found ( entity: :replication_slot , params: % { name: slot_name } ) }
0 commit comments