@@ -171,19 +171,12 @@ def stopped(self):
171171 return self .stop_event .isSet ()
172172
173173
174- def log_watch ( node_name , pg_logname ):
174+ class IsolationLevel ( Enum ):
175175 """
176- Starts thread for node that redirects
177- postgresql logs to python logging system
176+ Transaction isolation level for NodeConnection
178177 """
179178
180- reader = TestgresLogger (node_name , open (pg_logname , 'r' ))
181- reader .start ()
182-
183- global util_threads
184- util_threads .append (reader )
185-
186- return reader
179+ ReadUncommitted , ReadCommitted , RepeatableRead , Serializable = range (4 )
187180
188181
189182class NodeConnection (object ):
@@ -218,7 +211,7 @@ def __enter__(self):
218211 def __exit__ (self , type , value , traceback ):
219212 self .close ()
220213
221- def begin (self , isolation_level = 0 ):
214+ def begin (self , isolation_level = IsolationLevel . ReadCommitted ):
222215 # yapf: disable
223216 levels = [
224217 'read uncommitted' ,
@@ -227,40 +220,51 @@ def begin(self, isolation_level=0):
227220 'serializable'
228221 ]
229222
230- # Check if level is int [0..3]
231- if (isinstance (isolation_level , int ) and
232- isolation_level in range (0 , 4 )):
223+ # Check if level is an IsolationLevel
224+ if (isinstance (isolation_level , IsolationLevel )):
233225
234- # Replace index with isolation level type
235- isolation_level = levels [isolation_level ]
226+ # Get index of isolation level
227+ level_idx = isolation_level .value
228+ assert (level_idx in range (4 ))
236229
237- # Or it might be a string
238- elif (isinstance (isolation_level , six .text_type ) and
239- isolation_level .lower () in levels ):
230+ # Replace isolation level with its name
231+ isolation_level = levels [level_idx ]
240232
241- # Nothing to do here
242- pass
243-
244- # Something is wrong, emit exception
245233 else :
246- raise QueryException (
247- 'Invalid isolation level "{}"' .format (isolation_level ))
234+ # Get name of isolation level
235+ level_str = str (isolation_level ).lower ()
236+
237+ # Validate level string
238+ if level_str not in levels :
239+ error = 'Invalid isolation level "{}"'
240+ raise QueryException (error .format (level_str ))
241+
242+ # Replace isolation level with its name
243+ isolation_level = level_str
248244
249- self .cursor .execute (
250- 'SET TRANSACTION ISOLATION LEVEL {}' .format (isolation_level ))
245+ # Set isolation level
246+ cmd = 'SET TRANSACTION ISOLATION LEVEL {}'
247+ self .cursor .execute (cmd .format (isolation_level ))
248+
249+ return self
251250
252251 def commit (self ):
253252 self .connection .commit ()
254253
254+ return self
255+
255256 def rollback (self ):
256257 self .connection .rollback ()
257258
259+ return self
260+
258261 def execute (self , query , * args ):
259262 self .cursor .execute (query , args )
260263
261264 try :
262265 res = self .cursor .fetchall ()
263266
267+ # pg8000 might return tuples
264268 if isinstance (res , tuple ):
265269 res = [tuple (t ) for t in res ]
266270
@@ -1311,3 +1315,18 @@ def configure_testgres(**options):
13111315
13121316 for key , option in options .items ():
13131317 setattr (TestgresConfig , key , option )
1318+
1319+
1320+ def log_watch (node_name , pg_logname ):
1321+ """
1322+ Start thread for node that redirects
1323+ PostgreSQL logs to python logging system.
1324+ """
1325+
1326+ reader = TestgresLogger (node_name , open (pg_logname , 'r' ))
1327+ reader .start ()
1328+
1329+ global util_threads
1330+ util_threads .append (reader )
1331+
1332+ return reader
0 commit comments