5757 QueryException , \
5858 StartNodeException , \
5959 TimeoutException , \
60+ InitNodeException , \
6061 TestgresException , \
6162 BackupException
6263
6364from .logger import TestgresLogger
6465
66+ from .pubsub import Publication , Subscription
67+
6568from .utils import \
6669 eprint , \
6770 get_bin_path , \
7073 reserve_port , \
7174 release_port , \
7275 execute_utility , \
76+ options_string , \
7377 clean_on_error
7478
7579from .backup import NodeBackup
@@ -300,24 +304,24 @@ def _create_recovery_conf(self, username, slot=None):
300304 master = self .master
301305 assert master is not None
302306
303- conninfo = (
304- u "application_name={} "
305- u "port={} "
306- u "user={} "
307- ). format ( self . name , master . port , username ) # yapf: disable
307+ conninfo = {
308+ "application_name" : self . name ,
309+ "port" : master . port ,
310+ "user" : username
311+ } # yapf: disable
308312
309313 # host is tricky
310314 try :
311315 import ipaddress
312316 ipaddress .ip_address (master .host )
313- conninfo += u "hostaddr={}" . format ( master .host )
317+ conninfo [ "hostaddr" ] = master .host
314318 except ValueError :
315- conninfo += u "host={}" . format ( master .host )
319+ conninfo [ "host" ] = master .host
316320
317321 line = (
318322 "primary_conninfo='{}'\n "
319323 "standby_mode=on\n "
320- ).format (conninfo ) # yapf: disable
324+ ).format (options_string ( ** conninfo ) ) # yapf: disable
321325
322326 if slot :
323327 # Connect to master for some additional actions
@@ -413,6 +417,7 @@ def default_conf(self,
413417 fsync = False ,
414418 unix_sockets = True ,
415419 allow_streaming = True ,
420+ allow_logical = False ,
416421 log_statement = 'all' ):
417422 """
418423 Apply default settings to this node.
@@ -421,6 +426,7 @@ def default_conf(self,
421426 fsync: should this node use fsync to keep data safe?
422427 unix_sockets: should we enable UNIX sockets?
423428 allow_streaming: should this node add a hba entry for replication?
429+ allow_logical: can this node be used as a logical replication publisher?
424430 log_statement: one of ('all', 'off', 'mod', 'ddl').
425431
426432 Returns:
@@ -497,6 +503,13 @@ def get_auth_method(t):
497503 WAL_KEEP_SEGMENTS ,
498504 wal_level )) # yapf: disable
499505
506+ if allow_logical :
507+ if not pg_version_ge ('10' ):
508+ raise InitNodeException (
509+ "Logical replication is only available for Postgres 10 "
510+ "and newer" )
511+ conf .write (u"wal_level = logical\n " )
512+
500513 # disable UNIX sockets if asked to
501514 if not unix_sockets :
502515 conf .write (u"unix_socket_directories = ''\n " )
@@ -937,13 +950,14 @@ def poll_query_until(self,
937950 if res is None :
938951 raise QueryException ('Query returned None' , query )
939952
940- if len (res ) == 0 :
941- raise QueryException ('Query returned 0 rows' , query )
942-
943- if len (res [0 ]) == 0 :
944- raise QueryException ('Query returned 0 columns' , query )
945-
946- if res [0 ][0 ] == expected :
953+ # result set is not empty
954+ if len (res ):
955+ if len (res [0 ]) == 0 :
956+ raise QueryException ('Query returned 0 columns' , query )
957+ if res [0 ][0 ] == expected :
958+ return # done
959+ # empty result set is considered as None
960+ elif expected is None :
947961 return # done
948962
949963 except ProgrammingError as e :
@@ -982,13 +996,11 @@ def execute(self,
982996
983997 with self .connect (dbname = dbname ,
984998 username = username ,
985- password = password ) as node_con : # yapf: disable
999+ password = password ,
1000+ autocommit = commit ) as node_con : # yapf: disable
9861001
9871002 res = node_con .execute (query )
9881003
989- if commit :
990- node_con .commit ()
991-
9921004 return res
9931005
9941006 def backup (self , ** kwargs ):
@@ -1052,6 +1064,37 @@ def catchup(self, dbname=None, username=None):
10521064 except Exception as e :
10531065 raise_from (CatchUpException ("Failed to catch up" , poll_lsn ), e )
10541066
1067+ def publish (self , name , ** kwargs ):
1068+ """
1069+ Create publication for logical replication
1070+
1071+ Args:
1072+ pubname: publication name
1073+ tables: tables names list
1074+ dbname: database name where objects or interest are located
1075+ username: replication username
1076+ """
1077+ return Publication (name = name , node = self , ** kwargs )
1078+
1079+ def subscribe (self , publication , name , dbname = None , username = None ,
1080+ ** params ):
1081+ """
1082+ Create subscription for logical replication
1083+
1084+ Args:
1085+ name: subscription name
1086+ publication: publication object obtained from publish()
1087+ dbname: database name
1088+ username: replication username
1089+ params: subscription parameters (see documentation on `CREATE SUBSCRIPTION
1090+ <https://www.postgresql.org/docs/current/static/sql-createsubscription.html>`_
1091+ for details)
1092+ """
1093+ # yapf: disable
1094+ return Subscription (name = name , node = self , publication = publication ,
1095+ dbname = dbname , username = username , ** params )
1096+ # yapf: enable
1097+
10551098 def pgbench (self ,
10561099 dbname = None ,
10571100 username = None ,
@@ -1150,14 +1193,21 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs):
11501193
11511194 return execute_utility (_params , self .utils_log_file )
11521195
1153- def connect (self , dbname = None , username = None , password = None ):
1196+ def connect (self ,
1197+ dbname = None ,
1198+ username = None ,
1199+ password = None ,
1200+ autocommit = False ):
11541201 """
11551202 Connect to a database.
11561203
11571204 Args:
11581205 dbname: database name to connect to.
11591206 username: database user name.
11601207 password: user's password.
1208+ autocommit: commit each statement automatically. Also it should be
1209+ set to `True` for statements requiring to be run outside
1210+ a transaction? such as `VACUUM` or `CREATE DATABASE`.
11611211
11621212 Returns:
11631213 An instance of :class:`.NodeConnection`.
@@ -1166,4 +1216,5 @@ def connect(self, dbname=None, username=None, password=None):
11661216 return NodeConnection (node = self ,
11671217 dbname = dbname ,
11681218 username = username ,
1169- password = password ) # yapf: disable
1219+ password = password ,
1220+ autocommit = autocommit ) # yapf: disable
0 commit comments