Skip to content

Commit beb8587

Browse files
author
Kovalenko Anastasia
committed
Python tests fixed
1 parent 3d6ca52 commit beb8587

File tree

3 files changed

+154
-50
lines changed

3 files changed

+154
-50
lines changed

tests/common.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,21 @@ def n_close(conns):
4343
for conn in conns:
4444
conn.close()
4545

46+
def pg_query_state_locks(config, pid, conn, verbose=False, costs=False, timing=False, \
47+
buffers=False, triggers=False, format='text'):
48+
"""
49+
Get query state from backend with specified pid and optional parameters.
50+
Save any warning, info, notice and log data in global variable 'notices'
51+
"""
52+
53+
curs = conn.cursor()
54+
curs.callproc('pg_query_state', (pid, verbose, costs, timing, buffers, triggers, format))
55+
wait(conn)
56+
result = curs.fetchall()
57+
notices = conn.notices[:]
58+
59+
return result, notices
60+
4661
def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \
4762
buffers=False, triggers=False, format='text'):
4863
"""
@@ -52,14 +67,63 @@ def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \
5267

5368
conn = psycopg2.connect(**config)
5469
curs = conn.cursor()
55-
5670
curs.callproc('pg_query_state', (pid, verbose, costs, timing, buffers, triggers, format))
5771
result = curs.fetchall()
5872
notices = conn.notices[:]
5973
conn.close()
6074

6175
return result, notices
6276

77+
def onetime_query_state_locks(config, acon_query, acon_pg, query, args={}, num_workers=0):
78+
"""
79+
Get intermediate state of 'query' on connection 'acon_query' after number of 'steps'
80+
of node executions from start of query
81+
"""
82+
83+
curs_query = acon_query.cursor()
84+
curs_pg = acon_pg.cursor()
85+
curs_query.execute("select pg_advisory_lock(1);")
86+
curs_pg.execute("select pg_advisory_lock(2);")
87+
wait(acon_query)
88+
wait(acon_pg)
89+
curs_pg.execute("select pg_advisory_lock(1);")
90+
set_guc(acon_query, 'enable_mergejoin', 'off')
91+
set_guc(acon_query, 'max_parallel_workers_per_gather', num_workers)
92+
curs_query.execute(query)
93+
# extract current state of query progress
94+
MAX_PG_QS_RETRIES = 10
95+
DELAY_BETWEEN_RETRIES = 0.1
96+
pg_qs_args = {
97+
'config': config,
98+
'pid': acon_query.get_backend_pid(),
99+
'conn': acon_pg
100+
}
101+
for k, v in args.items():
102+
pg_qs_args[k] = v
103+
n_retries = 0
104+
105+
wait(acon_pg)
106+
107+
while True:
108+
result, notices = pg_query_state_locks(**pg_qs_args)
109+
n_retries += 1
110+
if len(result) > 0:
111+
break
112+
if n_retries >= MAX_PG_QS_RETRIES:
113+
# pg_query_state callings don't return any result, more likely run
114+
# query has completed
115+
break
116+
time.sleep(DELAY_BETWEEN_RETRIES)
117+
118+
curs_pg.execute("select pg_advisory_unlock(2);")
119+
wait(acon_pg)
120+
wait(acon_query)
121+
122+
set_guc(acon_query, 'enable_mergejoin', 'on')
123+
curs_query.execute("select pg_advisory_unlock(2);")
124+
curs_pg.execute("select pg_advisory_unlock(1);")
125+
return result, notices
126+
63127
def onetime_query_state(config, async_conn, query, args={}, num_workers=0):
64128
"""
65129
Get intermediate state of 'query' on connection 'async_conn' after number of 'steps'

tests/pg_qs_test_runner.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'''
22
pg_qs_test_runner.py
3-
Copyright (c) 2016-2020, Postgres Professional
3+
Copyright (c) 2016-2021, Postgres Professional
44
'''
55

66
import argparse
@@ -22,6 +22,20 @@ def __call__(self, parser, args, values, option_string=None):
2222
class SetupException(Exception): pass
2323
class TeardownException(Exception): pass
2424

25+
unlock_if_eq_1 = """
26+
CREATE OR REPLACE FUNCTION unlock_if_eq_1(x integer) RETURNS integer AS $$
27+
BEGIN
28+
IF x = 1 THEN
29+
perform pg_advisory_unlock(1);
30+
perform pg_advisory_lock(2);
31+
return 1;
32+
ELSE
33+
return x;
34+
END IF;
35+
END;
36+
$$ LANGUAGE plpgsql
37+
"""
38+
2539
setup_cmd = [
2640
'drop extension if exists pg_query_state cascade',
2741
'drop table if exists foo cascade',
@@ -33,6 +47,7 @@ class TeardownException(Exception): pass
3347
'insert into bar select i, i%2=1 from generate_series(1, 500000) as i',
3448
'analyze foo',
3549
'analyze bar',
50+
unlock_if_eq_1,
3651
]
3752

3853
teardown_cmd = [

tests/test_cases.py

Lines changed: 73 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'''
22
test_cases.py
3-
Copyright (c) 2016-2020, Postgres Professional
3+
Copyright (c) 2016-2021, Postgres Professional
44
'''
55

66
import json
@@ -42,21 +42,28 @@ def test_deadlock(config):
4242
def test_simple_query(config):
4343
"""test statistics of simple query"""
4444

45-
acon, = common.n_async_connect(config)
46-
query = 'select count(*) from foo join bar on foo.c1=bar.c1'
45+
acon1, acon2 = common.n_async_connect(config, 2)
46+
query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1'
4747
expected = r"""Aggregate \(Current loop: actual rows=\d+, loop number=1\)
48-
-> Hash Join \(Current loop: actual rows=\d+, loop number=1\)
48+
-> Hash Join \(Current loop: actual rows=62473, loop number=1\)
4949
Hash Cond: \(foo.c1 = bar.c1\)
50+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
5051
-> Seq Scan on foo \(Current loop: actual rows=\d+, loop number=1\)
5152
-> Hash \(Current loop: actual rows=\d+, loop number=1\)
5253
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
5354
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
5455

55-
qs, _ = common.onetime_query_state(config, acon, query)
56-
assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \
57-
and qs[0][2] == query and re.match(expected, qs[0][3]) and qs[0][4] == None
56+
qs, _ = common.onetime_query_state_locks(config, acon1, acon2, query)
5857

59-
common.n_close((acon,))
58+
assert qs[0][0] == acon1.get_backend_pid()
59+
assert qs[0][1] == 0
60+
assert qs[0][2] == query
61+
assert re.match(expected, qs[0][3])
62+
assert qs[0][4] == None
63+
# assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \
64+
# and qs[0][2] == query and re.match(expected, qs[0][3]) and qs[0][4] == None
65+
66+
common.n_close((acon1, acon2))
6067

6168
def test_concurrent_access(config):
6269
"""test when two backends compete with each other to extract state from third running backend"""
@@ -87,50 +94,56 @@ def test_concurrent_access(config):
8794
def test_nested_call(config):
8895
"""test statistics under calling function"""
8996

90-
acon, = common.n_async_connect(config)
97+
acon1, acon2 = common.n_async_connect(config, 2)
9198
util_conn = psycopg2.connect(**config)
9299
util_curs = util_conn.cursor()
93100
create_function = """
94101
create or replace function n_join_foo_bar() returns integer as $$
95102
begin
96-
return (select count(*) from foo join bar on foo.c1=bar.c1);
103+
return (select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1);
97104
end;
98105
$$ language plpgsql"""
99106
drop_function = 'drop function n_join_foo_bar()'
100107
call_function = 'select * from n_join_foo_bar()'
101-
nested_query = 'SELECT (select count(*) from foo join bar on foo.c1=bar.c1)'
108+
nested_query1 = '(select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1)'
109+
nested_query2 = 'SELECT (select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1)'
102110
expected = 'Function Scan on n_join_foo_bar (Current loop: actual rows=0, loop number=1)'
103111
expected_nested = r"""Result \(Current loop: actual rows=0, loop number=1\)
104112
InitPlan 1 \(returns \$0\)
105113
-> Aggregate \(Current loop: actual rows=0, loop number=1\)
106-
-> Hash Join \(Current loop: actual rows=0, loop number=1\)
114+
-> Hash Join \(Current loop: actual rows=62473, loop number=1\)
107115
Hash Cond: \(foo.c1 = bar.c1\)
108-
-> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
109-
-> Hash \(Current loop: actual rows=0, loop number=1\)
116+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
117+
-> Seq Scan on foo \(Current loop: actual rows=1000000, loop number=1\)
118+
-> Hash \(Current loop: actual rows=500000, loop number=1\)
110119
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
111120
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
112121

122+
113123
util_curs.execute(create_function)
114124
util_conn.commit()
115125

116-
qs, notices = common.onetime_query_state(config, acon, call_function)
126+
qs, notices = common.onetime_query_state_locks(config, acon1, acon2, call_function)
117127

118128
# Print some debug output before assertion
119129
if len(qs) < 2:
120130
print(qs)
121131

122-
assert len(qs) == 2 \
123-
and qs[0][0] == qs[1][0] == acon.get_backend_pid() \
124-
and qs[0][1] == 0 and qs[1][1] == 1 \
125-
and qs[0][2] == call_function and qs[0][3] == expected \
126-
and qs[1][2] == nested_query and re.match(expected_nested, qs[1][3]) \
127-
and qs[0][4] == qs[1][4] == None
132+
assert len(qs) == 3
133+
assert qs[0][0] == qs[1][0] == acon1.get_backend_pid()
134+
assert qs[0][1] == 0
135+
assert qs[1][1] == 1
136+
assert qs[0][2] == call_function
137+
assert qs[0][3] == expected
138+
assert qs[1][2] == nested_query1 or qs[1][2] == nested_query2
139+
assert re.match(expected_nested, qs[1][3])
140+
assert qs[0][4] == qs[1][4] == None
128141
assert len(notices) == 0
129142

130143
util_curs.execute(drop_function)
131144

132145
util_conn.close()
133-
common.n_close((acon,))
146+
common.n_close((acon1, acon2))
134147

135148
def test_insert_on_conflict(config):
136149
"""test statistics on conflicting tuples under INSERT ON CONFLICT query"""
@@ -212,65 +225,77 @@ def test_trigger(config):
212225
def test_costs(config):
213226
"""test plan costs"""
214227

215-
acon, = common.n_async_connect(config)
216-
query = 'select count(*) from foo join bar on foo.c1=bar.c1'
228+
acon1, acon2 = common.n_async_connect(config, 2)
229+
query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1;'
230+
217231
expected = r"""Aggregate \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=8\) \(Current loop: actual rows=0, loop number=1\)
218-
-> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=0, loop number=1\)
232+
-> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=\d+, loop number=1\)
219233
Hash Cond: \(foo.c1 = bar.c1\)
220-
-> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1, loop number=1\)
221-
-> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=0, loop number=1\)
234+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
235+
-> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1000000, loop number=1\)
236+
-> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=500000, loop number=1\)
222237
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
223238
-> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=\d+, loop number=1\)"""
224239

225-
qs, notices = common.onetime_query_state(config, acon, query, {'costs': True})
226-
assert len(qs) == 1 and re.match(expected, qs[0][3])
240+
qs, notices = common.onetime_query_state_locks(config, acon1, acon2, query, {'costs': True})
241+
242+
assert len(qs) == 2 and re.match(expected, qs[0][3])
227243
assert len(notices) == 0
228244

229-
common.n_close((acon,))
245+
common.n_close((acon1, acon2))
230246

231247
def test_buffers(config):
232248
"""test buffer statistics"""
233249

234-
acon, = common.n_async_connect(config)
235-
query = 'select count(*) from foo join bar on foo.c1=bar.c1'
250+
acon1, acon2 = common.n_async_connect(config, 2)
251+
query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1'
236252
expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
237-
-> Hash Join \(Current loop: actual rows=0, loop number=1\)
253+
-> Hash Join \(Current loop: actual rows=\d+, loop number=1\)
238254
Hash Cond: \(foo.c1 = bar.c1\)
239-
-> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
255+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
256+
Buffers: shared hit=\d+, temp read=\d+ written=\d+
257+
-> Seq Scan on foo \(Current loop: actual rows=1000000, loop number=1\)
240258
Buffers: [^\n]*
241-
-> Hash \(Current loop: actual rows=0, loop number=1\)
259+
-> Hash \(Current loop: actual rows=500000, loop number=1\)
242260
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
261+
Buffers: shared hit=\d+, temp written=\d+
243262
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)
244263
Buffers: .*"""
245264

246-
common.set_guc(acon, 'pg_query_state.enable_buffers', 'on')
265+
common.set_guc(acon1, 'pg_query_state.enable_buffers', 'on')
247266

248-
qs, notices = common.onetime_query_state(config, acon, query, {'buffers': True})
249-
assert len(qs) == 1 and re.match(expected, qs[0][3])
267+
qs, notices = common.onetime_query_state_locks(config, acon1, acon2, query, {'buffers': True})
268+
269+
assert len(qs) == 2
270+
assert re.match(expected, qs[0][3])
250271
assert len(notices) == 0
251272

252-
common.n_close((acon,))
273+
common.n_close((acon1, acon2))
253274

254275
def test_timing(config):
255276
"""test timing statistics"""
256277

257-
acon, = common.n_async_connect(config)
258-
query = 'select count(*) from foo join bar on foo.c1=bar.c1'
278+
acon1, acon2 = common.n_async_connect(config, 2)
279+
query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1'
280+
259281
expected = r"""Aggregate \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
260-
-> Hash Join \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
282+
-> Hash Join \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+, loop number=1\)
261283
Hash Cond: \(foo.c1 = bar.c1\)
262-
-> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1, loop number=1\)
263-
-> Hash \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
284+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
285+
-> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1000000, loop number=1\)
286+
-> Hash \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=500000, loop number=1\)
264287
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
265288
-> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+, loop number=1\)"""
266289

267-
common.set_guc(acon, 'pg_query_state.enable_timing', 'on')
290+
common.set_guc(acon1, 'pg_query_state.enable_timing', 'on')
268291

269-
qs, notices = common.onetime_query_state(config, acon, query, {'timing': True})
270-
assert len(qs) == 1 and re.match(expected, qs[0][3])
292+
qs, notices = common.onetime_query_state_locks(config, acon1, acon2, query, {'timing': True})
293+
294+
assert len(qs) == 2
295+
assert re.match(expected, qs[0][3])
271296
assert len(notices) == 0
272297

273-
common.n_close((acon,))
298+
common.n_close((acon1, acon2))
274299

275300
def check_plan(plan):
276301
assert 'Current loop' in plan

0 commit comments

Comments
 (0)