1212After that :meth:`~.PostgresNode.publish()` and :meth:`~.PostgresNode.subscribe()`
1313methods may be used to setup replication. Example:
1414
15- >>> from .api import get_new_node
15+ >>> from testgres import get_new_node
1616>>> with get_new_node() as nodeA, get_new_node() as nodeB:
1717... nodeA.init(allow_logical=True).start()
1818... nodeB.init().start()
4444
4545from six import raise_from
4646
47+ from .consts import LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS
4748from .defaults import default_dbname , default_username
4849from .exceptions import CatchUpException
4950from .utils import options_string
@@ -56,11 +57,11 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
5657 constructing publication objects.
5758
5859 Args:
59- name: publication name
60- node: publisher's node
61- tables: tables list or None for all tables
62- dbname: database name used to connect and perform subscription
63- username: username used to connect to the database
60+ name: publication name.
61+ node: publisher's node.
62+ tables: tables list or None for all tables.
63+ dbname: database name used to connect and perform subscription.
64+ username: username used to connect to the database.
6465 """
6566 self .name = name
6667 self .node = node
@@ -70,7 +71,7 @@ def __init__(self, name, node, tables=None, dbname=None, username=None):
7071 # create publication in database
7172 t = "table " + ", " .join (tables ) if tables else "all tables"
7273 query = "create publication {} for {}"
73- node .safe_psql (query .format (name , t ), dbname = dbname , username = username )
74+ node .execute (query .format (name , t ), dbname = dbname , username = username )
7475
7576 def drop (self , dbname = None , username = None ):
7677 """
@@ -87,13 +88,13 @@ def add_tables(self, tables, dbname=None, username=None):
8788 created with empty tables list.
8889
8990 Args:
90- tables: a list of tables to be added to the publication
91+ tables: a list of tables to be added to the publication.
9192 """
9293 if not tables :
9394 raise ValueError ("Tables list is empty" )
9495
9596 query = "alter publication {} add table {}"
96- self .node .safe_psql (
97+ self .node .execute (
9798 query .format (self .name , ", " .join (tables )),
9899 dbname = dbname or self .dbname ,
99100 username = username or self .username )
@@ -112,15 +113,15 @@ def __init__(self,
112113 constructing subscription objects.
113114
114115 Args:
115- name: subscription name
116- node: subscriber's node
116+ name: subscription name.
117+ node: subscriber's node.
117118 publication: :class:`.Publication` object we are subscribing to
118- (see :meth:`.PostgresNode.publish()`)
119- dbname: database name used to connect and perform subscription
120- username: username used to connect to the database
119+ (see :meth:`.PostgresNode.publish()`).
120+ dbname: database name used to connect and perform subscription.
121+ username: username used to connect to the database.
121122 params: subscription parameters (see documentation on `CREATE SUBSCRIPTION
122123 <https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_
123- for details)
124+ for details).
124125 """
125126 self .name = name
126127 self .node = node
@@ -142,28 +143,29 @@ def __init__(self,
142143 if params :
143144 query += " with ({})" .format (options_string (** params ))
144145
145- node .safe_psql (query , dbname = dbname , username = username )
146+ # Note: cannot run 'create subscription' query in transaction mode
147+ node .execute (query , dbname = dbname , username = username )
146148
147149 def disable (self , dbname = None , username = None ):
148150 """
149151 Disables the running subscription.
150152 """
151153 query = "alter subscription {} disable"
152- self .node .safe_psql (query .format (self .name ), dbname = None , username = None )
154+ self .node .execute (query .format (self .name ), dbname = None , username = None )
153155
154156 def enable (self , dbname = None , username = None ):
155157 """
156158 Enables the previously disabled subscription.
157159 """
158160 query = "alter subscription {} enable"
159- self .node .safe_psql (query .format (self .name ), dbname = None , username = None )
161+ self .node .execute (query .format (self .name ), dbname = None , username = None )
160162
161163 def refresh (self , copy_data = True , dbname = None , username = None ):
162164 """
163165 Disables the running subscription.
164166 """
165167 query = "alter subscription {} refresh publication with (copy_data={})"
166- self .node .safe_psql (
168+ self .node .execute (
167169 query .format (self .name , copy_data ),
168170 dbname = dbname ,
169171 username = username )
@@ -172,7 +174,7 @@ def drop(self, dbname=None, username=None):
172174 """
173175 Drops subscription
174176 """
175- self .node .safe_psql (
177+ self .node .execute (
176178 "drop subscription {}" .format (self .name ),
177179 dbname = dbname ,
178180 username = username )
@@ -182,19 +184,19 @@ def catchup(self, username=None):
182184 Wait until subscription catches up with publication.
183185
184186 Args:
185- username: remote node's user name
187+ username: remote node's user name.
186188 """
187- query = (
188- " select pg_current_wal_lsn() - replay_lsn = 0 "
189- " from pg_stat_replication where application_name = '{}'" ). format (
190- self .name )
189+ query = """
190+ select pg_current_wal_lsn() - replay_lsn = 0
191+ from pg_catalog. pg_stat_replication where application_name = '{}'
192+ """ . format ( self .name )
191193
192194 try :
193195 # wait until this LSN reaches subscriber
194196 self .pub .node .poll_query_until (
195197 query = query ,
196198 dbname = self .pub .dbname ,
197199 username = username or self .pub .username ,
198- max_attempts = 60 )
200+ max_attempts = LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS )
199201 except Exception as e :
200202 raise_from (CatchUpException ("Failed to catch up" , query ), e )
0 commit comments