66import time
77import xml .etree .ElementTree as ET
88import yaml
9+ from time import sleep
910
1011def wait (conn ):
1112 """wait for some event on connection to postgres"""
@@ -52,10 +53,10 @@ def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \
5253
5354 conn = psycopg2 .connect (** config )
5455 curs = conn .cursor ()
55-
56- curs . callproc ( 'pg_query_state' , ( pid , verbose , costs , timing , buffers , triggers , format ))
57- result = curs .fetchall ( )
58-
56+ result = []
57+ while not result :
58+ curs .callproc ( 'pg_query_state' , ( pid , verbose , costs , timing , buffers , triggers , format ) )
59+ result = curs . fetchall ()
5960 notices = conn .notices [:]
6061 conn .close ()
6162 return result
@@ -85,7 +86,7 @@ def test_deadlock(config):
8586
8687 n_close ((acon1 , acon2 ))
8788
88- def query_state (config , async_conn , query , steps , args = {}, num_workers = 0 ):
89+ def query_state (config , async_conn , query , args = {}, num_workers = 0 ):
8990 """
9091 Get intermediate state of 'query' on connection 'async_conn' after number of 'steps'
9192 of node executions from start of query
@@ -97,13 +98,7 @@ def query_state(config, async_conn, query, steps, args={}, num_workers=0):
9798
9899 set_guc (async_conn , 'enable_mergejoin' , 'off' )
99100 set_guc (async_conn , 'max_parallel_workers_per_gather' , num_workers )
100- set_guc (async_conn , 'pg_query_state.executor_trace' , 'on' )
101-
102- # execute 'query' specific number of 'steps'
103101 acurs .execute (query )
104- for _ in xrange (steps ):
105- curs .callproc ('executor_step' , (async_conn .get_backend_pid (),))
106- # import ipdb; ipdb.set_trace()
107102
108103 # extract current state of query progress
109104 pg_qs_args = {
@@ -113,9 +108,6 @@ def query_state(config, async_conn, query, steps, args={}, num_workers=0):
113108 for k , v in args .iteritems ():
114109 pg_qs_args [k ] = v
115110 result = pg_query_state (** pg_qs_args )
116-
117- # resume query progress and complete it
118- curs .callproc ('executor_continue' , (async_conn .get_backend_pid (),))
119111 wait (async_conn )
120112
121113 set_guc (async_conn , 'pg_query_state.executor_trace' , 'off' )
@@ -129,16 +121,15 @@ def test_simple_query(config):
129121
130122 acon , = n_async_connect (config )
131123 query = 'select count(*) from foo join bar on foo.c1=bar.c1'
132- num_steps = 10
133- expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
134- -> Hash Join \(Current loop: actual rows=0, loop number=1\)
124+ expected = r"""Aggregate \(Current loop: actual rows=\d+, loop number=1\)
125+ -> Hash Join \(Current loop: actual rows=\d+, loop number=1\)
135126 Hash Cond: \(foo.c1 = bar.c1\)
136- -> Seq Scan on foo \(Current loop: actual rows=1 , loop number=1\)
137- -> Hash \(Current loop: actual rows=0 , loop number=1\)
127+ -> Seq Scan on foo \(Current loop: actual rows=\d+ , loop number=1\)
128+ -> Hash \(Current loop: actual rows=\d+ , loop number=1\)
138129 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
139- -> Seq Scan on bar \(Current loop: actual rows=9 , loop number=1\)"""
130+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)"""
140131
141- qs = query_state (config , acon , query , num_steps )
132+ qs = query_state (config , acon , query )
142133 assert len (qs ) == 1 and qs [0 ][0 ] == acon .get_backend_pid () and qs [0 ][1 ] == 0 \
143134 and qs [0 ][2 ] == query and re .match (expected , qs [0 ][3 ]) and qs [0 ][4 ] == None
144135 assert len (notices ) == 0
@@ -187,7 +178,6 @@ def test_nested_call(config):
187178 drop_function = 'drop function n_join_foo_bar()'
188179 call_function = 'select * from n_join_foo_bar()'
189180 nested_query = 'SELECT (select count(*) from foo join bar on foo.c1=bar.c1)'
190- num_steps = 10
191181 expected = 'Function Scan on n_join_foo_bar (Current loop: actual rows=0, loop number=1)'
192182 expected_nested = r"""Result \(Current loop: actual rows=0, loop number=1\)
193183 InitPlan 1 \(returns \$0\)
@@ -197,12 +187,12 @@ def test_nested_call(config):
197187 -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
198188 -> Hash \(Current loop: actual rows=0, loop number=1\)
199189 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
200- -> Seq Scan on bar \(Current loop: actual rows=8 , loop number=1\)"""
190+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)"""
201191
202192 util_curs .execute (create_function )
203193 util_conn .commit ()
204194
205- qs = query_state (config , acon , call_function , num_steps )
195+ qs = query_state (config , acon , call_function )
206196 assert len (qs ) == 2 \
207197 and qs [0 ][0 ] == qs [1 ][0 ] == acon .get_backend_pid () \
208198 and qs [0 ][1 ] == 0 and qs [1 ][1 ] == 1 \
@@ -224,24 +214,26 @@ def test_insert_on_conflict(config):
224214 util_curs = util_conn .cursor ()
225215 add_field_uniqueness = 'alter table foo add constraint unique_c1 unique(c1)'
226216 drop_field_uniqueness = 'alter table foo drop constraint unique_c1'
227- num_steps = 10
228- query = 'insert into foo select i, md5(random()::text) from generate_series(1, %d) as i on conflict do nothing' % ( num_steps + 1 )
229- expected = """Insert on foo (Current loop: actual rows=0 , loop number=1 )
217+ query = 'insert into foo select i, md5(random()::text) from generate_series(1, 30000) as i on conflict do nothing'
218+
219+ expected = r """Insert on foo \ (Current loop: actual rows=\d+ , loop number=\d+\ )
230220 Conflict Resolution: NOTHING
231- Conflicting Tuples: 9
232- -> Function Scan on generate_series i (Current loop: actual rows=10 , loop number=1 )"""
221+ Conflicting Tuples: \d+
222+ -> Function Scan on generate_series i \ (Current loop: actual rows=\d+ , loop number=\d+\ )"""
233223
234224 util_curs .execute (add_field_uniqueness )
235225 util_conn .commit ()
236226
237- qs = query_state (config , acon , query , num_steps )
227+ qs = query_state (config , acon , query )
238228 assert len (qs ) == 1 \
239229 and qs [0 ][0 ] == acon .get_backend_pid () and qs [0 ][1 ] == 0 \
240- and qs [0 ][2 ] == query and qs [0 ][3 ] == expected \
230+ and qs [0 ][2 ] == query and re . match ( expected , qs [0 ][3 ]) \
241231 and qs [0 ][4 ] == None
242232 assert len (notices ) == 0
243233
244234 util_curs .execute (drop_field_uniqueness )
235+ util_curs .execute ("ANALYZE foo" )
236+ util_curs .execute ("ANALYZE bar" )
245237
246238 util_conn .close ()
247239 n_close ((acon ,))
@@ -270,40 +262,27 @@ def test_trigger(config):
270262 create_trigger = """
271263 create trigger unique_foo_c1
272264 before insert or update of c1 on foo for row
273- execute procedure unique_c1_in_foo()"""
265+ execute procedure unique_c1_in_foo()"""
274266 drop_temps = 'drop function unique_c1_in_foo() cascade'
275- num_steps = 10
276- query = 'insert into foo select i, md5(random()::text) from generate_series(1, %d) as i' % (num_steps + 1 )
277- expected_upper = """Insert on foo (Current loop: actual rows=0, loop number=1)
278- -> Function Scan on generate_series i (Current loop: actual rows=2, loop number=1)"""
279- trigger_suffix = 'Trigger unique_foo_c1: calls=1'
280- expected_inner = """Result (Current loop: actual rows=0, loop number=1)
281- SubPlan 1
282- -> Materialize (Current loop: actual rows=1, loop number=1)
283- -> Seq Scan on foo (Current loop: actual rows=1, loop number=1)"""
267+ query = 'insert into foo select i, md5(random()::text) from generate_series(1, 10000) as i'
268+ expected_upper = r"""Insert on foo \(Current loop: actual rows=\d+, loop number=1\)
269+ -> Function Scan on generate_series i \(Current loop: actual rows=\d+, loop number=1\)"""
270+ trigger_suffix = r"""Trigger unique_foo_c1: calls=\d+"""
284271
285272 util_curs .execute (create_trigger_function )
286273 util_curs .execute (create_trigger )
287274 util_conn .commit ()
288275
289- qs = query_state (config , acon , query , num_steps , {'triggers' : True })
290- assert len (qs ) == 2 \
291- and qs [0 ][0 ] == acon .get_backend_pid () and qs [0 ][1 ] == 0 \
292- and qs [0 ][2 ] == query and qs [0 ][3 ] == expected_upper + '\n ' + trigger_suffix \
293- and qs [0 ][4 ] == None \
294- and qs [1 ][0 ] == acon .get_backend_pid () and qs [1 ][1 ] == 1 \
295- and qs [1 ][2 ] == 'SELECT new.c1 in (select c1 from foo)' and qs [1 ][3 ] == expected_inner \
296- and qs [1 ][4 ] == None
276+ qs = query_state (config , acon , query , {'triggers' : True })
277+ assert qs [0 ][0 ] == acon .get_backend_pid () and qs [0 ][1 ] == 0 \
278+ and qs [0 ][2 ] == query and re .match (expected_upper , qs [0 ][3 ]) \
279+ and qs [0 ][4 ] == None
297280 assert len (notices ) == 0
298281
299- qs = query_state (config , acon , query , num_steps , {'triggers' : False })
300- assert len (qs ) == 2 \
301- and qs [0 ][0 ] == acon .get_backend_pid () and qs [0 ][1 ] == 0 \
302- and qs [0 ][2 ] == query and qs [0 ][3 ] == expected_upper \
303- and qs [0 ][4 ] == None \
304- and qs [1 ][0 ] == acon .get_backend_pid () and qs [1 ][1 ] == 1 \
305- and qs [1 ][2 ] == 'SELECT new.c1 in (select c1 from foo)' and qs [1 ][3 ] == expected_inner \
306- and qs [1 ][4 ] == None
282+ qs = query_state (config , acon , query , {'triggers' : False })
283+ assert qs [0 ][0 ] == acon .get_backend_pid () and qs [0 ][1 ] == 0 \
284+ and qs [0 ][2 ] == query and re .match (expected_upper , qs [0 ][3 ]) \
285+ and qs [0 ][4 ] == None
307286 assert len (notices ) == 0
308287
309288 util_curs .execute (drop_temps )
@@ -316,16 +295,15 @@ def test_costs(config):
316295
317296 acon , = n_async_connect (config )
318297 query = 'select count(*) from foo join bar on foo.c1=bar.c1'
319- num_steps = 10
320298 expected = r"""Aggregate \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=8\) \(Current loop: actual rows=0, loop number=1\)
321299 -> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=0, loop number=1\)
322300 Hash Cond: \(foo.c1 = bar.c1\)
323301 -> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1, loop number=1\)
324302 -> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=0, loop number=1\)
325303 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
326- -> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=9 , loop number=1\)"""
304+ -> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=\d+ , loop number=1\)"""
327305
328- qs = query_state (config , acon , query , num_steps , {'costs' : True })
306+ qs = query_state (config , acon , query , {'costs' : True })
329307 assert len (qs ) == 1 and re .match (expected , qs [0 ][3 ])
330308 assert len (notices ) == 0
331309
@@ -336,20 +314,19 @@ def test_buffers(config):
336314
337315 acon , = n_async_connect (config )
338316 query = 'select count(*) from foo join bar on foo.c1=bar.c1'
339- num_steps = 10
340317 expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
341318 -> Hash Join \(Current loop: actual rows=0, loop number=1\)
342319 Hash Cond: \(foo.c1 = bar.c1\)
343320 -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
344321 Buffers: [^\n]*
345322 -> Hash \(Current loop: actual rows=0, loop number=1\)
346323 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
347- -> Seq Scan on bar \(Current loop: actual rows=9 , loop number=1\)
324+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)
348325 Buffers: .*"""
349326
350327 set_guc (acon , 'pg_query_state.enable_buffers' , 'on' )
351328
352- qs = query_state (config , acon , query , num_steps , {'buffers' : True })
329+ qs = query_state (config , acon , query , {'buffers' : True })
353330 assert len (qs ) == 1 and re .match (expected , qs [0 ][3 ])
354331 assert len (notices ) == 0
355332
@@ -360,18 +337,17 @@ def test_timing(config):
360337
361338 acon , = n_async_connect (config )
362339 query = 'select count(*) from foo join bar on foo.c1=bar.c1'
363- num_steps = 10
364340 expected = r"""Aggregate \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
365341 -> Hash Join \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
366342 Hash Cond: \(foo.c1 = bar.c1\)
367343 -> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1, loop number=1\)
368344 -> Hash \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
369345 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
370- -> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=9 , loop number=1\)"""
346+ -> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+ , loop number=1\)"""
371347
372348 set_guc (acon , 'pg_query_state.enable_timing' , 'on' )
373349
374- qs = query_state (config , acon , query , num_steps , {'timing' : True })
350+ qs = query_state (config , acon , query , {'timing' : True })
375351 assert len (qs ) == 1 and re .match (expected , qs [0 ][3 ])
376352 assert len (notices ) == 0
377353
@@ -402,20 +378,19 @@ def test_formats(config):
402378
403379 acon , = n_async_connect (config )
404380 query = 'select count(*) from foo join bar on foo.c1=bar.c1'
405- num_steps = 10
406381 expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
407382 -> Hash Join \(Current loop: actual rows=0, loop number=1\)
408383 Hash Cond: \(foo.c1 = bar.c1\)
409384 -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
410385 -> Hash \(Current loop: actual rows=0, loop number=1\)
411386 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
412- -> Seq Scan on bar \(Current loop: actual rows=9 , loop number=1\)"""
387+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)"""
413388
414- qs = query_state (config , acon , query , num_steps , {'format' : 'text' })
389+ qs = query_state (config , acon , query , {'format' : 'text' })
415390 assert len (qs ) == 1 and re .match (expected , qs [0 ][3 ])
416391 assert len (notices ) == 0
417392
418- qs = query_state (config , acon , query , num_steps , {'format' : 'json' })
393+ qs = query_state (config , acon , query , {'format' : 'json' })
419394 try :
420395 js_obj = json .loads (qs [0 ][3 ])
421396 except ValueError :
@@ -424,7 +399,7 @@ def test_formats(config):
424399 assert len (notices ) == 0
425400 check_plan (js_obj ['Plan' ])
426401
427- qs = query_state (config , acon , query , num_steps , {'format' : 'xml' })
402+ qs = query_state (config , acon , query , {'format' : 'xml' })
428403 assert len (qs ) == 1
429404 assert len (notices ) == 0
430405 try :
@@ -433,7 +408,7 @@ def test_formats(config):
433408 assert False , 'Invalid xml format'
434409 check_xml (xml_root )
435410
436- qs = query_state (config , acon , query , num_steps , {'format' : 'yaml' })
411+ qs = query_state (config , acon , query , {'format' : 'yaml' })
437412 try :
438413 yaml_doc = yaml .load (qs [0 ][3 ])
439414 except :
@@ -449,19 +424,18 @@ def test_timing_buffers_conflicts(config):
449424
450425 acon , = n_async_connect (config )
451426 query = 'select count(*) from foo join bar on foo.c1=bar.c1'
452- num_steps = 10
453427 timing_pattern = '(?:running time=\d+.\d+)|(?:actual time=\d+.\d+..\d+.\d+)'
454428 buffers_pattern = 'Buffers:'
455429
456- qs = query_state (config , acon , query , num_steps , {'timing' : True , 'buffers' : False })
430+ qs = query_state (config , acon , query , {'timing' : True , 'buffers' : False })
457431 assert len (qs ) == 1 and not re .search (timing_pattern , qs [0 ][3 ])
458432 assert notices == ['WARNING: timing statistics disabled\n ' ]
459433
460- qs = query_state (config , acon , query , num_steps , {'timing' : False , 'buffers' : True })
434+ qs = query_state (config , acon , query , {'timing' : False , 'buffers' : True })
461435 assert len (qs ) == 1 and not re .search (buffers_pattern , qs [0 ][3 ])
462436 assert notices == ['WARNING: buffers statistics disabled\n ' ]
463437
464- qs = query_state (config , acon , query , num_steps , {'timing' : True , 'buffers' : True })
438+ qs = query_state (config , acon , query , {'timing' : True , 'buffers' : True })
465439 assert len (qs ) == 1 and not re .search (timing_pattern , qs [0 ][3 ]) \
466440 and not re .search (buffers_pattern , qs [0 ][3 ])
467441 assert len (notices ) == 2 and 'WARNING: timing statistics disabled\n ' in notices \
0 commit comments