|
1 | 1 | '''
|
2 | 2 | test_cases.py
|
3 |
| -Copyright (c) 2016-2020, Postgres Professional |
| 3 | +Copyright (c) 2016-2021, Postgres Professional |
4 | 4 | '''
|
5 | 5 |
|
6 | 6 | import json
|
@@ -42,21 +42,28 @@ def test_deadlock(config):
|
42 | 42 | def test_simple_query(config):
|
43 | 43 | """test statistics of simple query"""
|
44 | 44 |
|
45 |
| - acon, = common.n_async_connect(config) |
46 |
| - query = 'select count(*) from foo join bar on foo.c1=bar.c1' |
| 45 | + acon1, acon2 = common.n_async_connect(config, 2) |
| 46 | + query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1' |
47 | 47 | expected = r"""Aggregate \(Current loop: actual rows=\d+, loop number=1\)
|
48 |
| - -> Hash Join \(Current loop: actual rows=\d+, loop number=1\) |
| 48 | + -> Hash Join \(Current loop: actual rows=62473, loop number=1\) |
49 | 49 | Hash Cond: \(foo.c1 = bar.c1\)
|
| 50 | + Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\) |
50 | 51 | -> Seq Scan on foo \(Current loop: actual rows=\d+, loop number=1\)
|
51 | 52 | -> Hash \(Current loop: actual rows=\d+, loop number=1\)
|
52 | 53 | Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
|
53 | 54 | -> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
|
54 | 55 |
|
55 |
| - qs, _ = common.onetime_query_state(config, acon, query) |
56 |
| - assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ |
57 |
| - and qs[0][2] == query and re.match(expected, qs[0][3]) and qs[0][4] == None |
| 56 | + qs, _ = common.onetime_query_state_locks(config, acon1, acon2, query) |
58 | 57 |
|
59 |
| - common.n_close((acon,)) |
| 58 | + assert qs[0][0] == acon1.get_backend_pid() |
| 59 | + assert qs[0][1] == 0 |
| 60 | + assert qs[0][2] == query |
| 61 | + assert re.match(expected, qs[0][3]) |
| 62 | + assert qs[0][4] == None |
| 63 | + # assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ |
| 64 | + # and qs[0][2] == query and re.match(expected, qs[0][3]) and qs[0][4] == None |
| 65 | + |
| 66 | + common.n_close((acon1, acon2)) |
60 | 67 |
|
61 | 68 | def test_concurrent_access(config):
|
62 | 69 | """test when two backends compete with each other to extract state from third running backend"""
|
@@ -87,50 +94,56 @@ def test_concurrent_access(config):
|
87 | 94 | def test_nested_call(config):
|
88 | 95 | """test statistics under calling function"""
|
89 | 96 |
|
90 |
| - acon, = common.n_async_connect(config) |
| 97 | + acon1, acon2 = common.n_async_connect(config, 2) |
91 | 98 | util_conn = psycopg2.connect(**config)
|
92 | 99 | util_curs = util_conn.cursor()
|
93 | 100 | create_function = """
|
94 | 101 | create or replace function n_join_foo_bar() returns integer as $$
|
95 | 102 | begin
|
96 |
| - return (select count(*) from foo join bar on foo.c1=bar.c1); |
| 103 | + return (select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1); |
97 | 104 | end;
|
98 | 105 | $$ language plpgsql"""
|
99 | 106 | drop_function = 'drop function n_join_foo_bar()'
|
100 | 107 | call_function = 'select * from n_join_foo_bar()'
|
101 |
| - nested_query = 'SELECT (select count(*) from foo join bar on foo.c1=bar.c1)' |
| 108 | + nested_query1 = '(select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1)' |
| 109 | + nested_query2 = 'SELECT (select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1)' |
102 | 110 | expected = 'Function Scan on n_join_foo_bar (Current loop: actual rows=0, loop number=1)'
|
103 | 111 | expected_nested = r"""Result \(Current loop: actual rows=0, loop number=1\)
|
104 | 112 | InitPlan 1 \(returns \$0\)
|
105 | 113 | -> Aggregate \(Current loop: actual rows=0, loop number=1\)
|
106 |
| - -> Hash Join \(Current loop: actual rows=0, loop number=1\) |
| 114 | + -> Hash Join \(Current loop: actual rows=62473, loop number=1\) |
107 | 115 | Hash Cond: \(foo.c1 = bar.c1\)
|
108 |
| - -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\) |
109 |
| - -> Hash \(Current loop: actual rows=0, loop number=1\) |
| 116 | + Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\) |
| 117 | + -> Seq Scan on foo \(Current loop: actual rows=1000000, loop number=1\) |
| 118 | + -> Hash \(Current loop: actual rows=500000, loop number=1\) |
110 | 119 | Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
|
111 | 120 | -> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
|
112 | 121 |
|
| 122 | + |
113 | 123 | util_curs.execute(create_function)
|
114 | 124 | util_conn.commit()
|
115 | 125 |
|
116 |
| - qs, notices = common.onetime_query_state(config, acon, call_function) |
| 126 | + qs, notices = common.onetime_query_state_locks(config, acon1, acon2, call_function) |
117 | 127 |
|
118 | 128 | # Print some debug output before assertion
|
119 | 129 | if len(qs) < 2:
|
120 | 130 | print(qs)
|
121 | 131 |
|
122 |
| - assert len(qs) == 2 \ |
123 |
| - and qs[0][0] == qs[1][0] == acon.get_backend_pid() \ |
124 |
| - and qs[0][1] == 0 and qs[1][1] == 1 \ |
125 |
| - and qs[0][2] == call_function and qs[0][3] == expected \ |
126 |
| - and qs[1][2] == nested_query and re.match(expected_nested, qs[1][3]) \ |
127 |
| - and qs[0][4] == qs[1][4] == None |
| 132 | + assert len(qs) == 3 |
| 133 | + assert qs[0][0] == qs[1][0] == acon1.get_backend_pid() |
| 134 | + assert qs[0][1] == 0 |
| 135 | + assert qs[1][1] == 1 |
| 136 | + assert qs[0][2] == call_function |
| 137 | + assert qs[0][3] == expected |
| 138 | + assert qs[1][2] == nested_query1 or qs[1][2] == nested_query2 |
| 139 | + assert re.match(expected_nested, qs[1][3]) |
| 140 | + assert qs[0][4] == qs[1][4] == None |
128 | 141 | assert len(notices) == 0
|
129 | 142 |
|
130 | 143 | util_curs.execute(drop_function)
|
131 | 144 |
|
132 | 145 | util_conn.close()
|
133 |
| - common.n_close((acon,)) |
| 146 | + common.n_close((acon1, acon2)) |
134 | 147 |
|
135 | 148 | def test_insert_on_conflict(config):
|
136 | 149 | """test statistics on conflicting tuples under INSERT ON CONFLICT query"""
|
@@ -212,65 +225,77 @@ def test_trigger(config):
|
212 | 225 | def test_costs(config):
|
213 | 226 | """test plan costs"""
|
214 | 227 |
|
215 |
| - acon, = common.n_async_connect(config) |
216 |
| - query = 'select count(*) from foo join bar on foo.c1=bar.c1' |
| 228 | + acon1, acon2 = common.n_async_connect(config, 2) |
| 229 | + query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1;' |
| 230 | + |
217 | 231 | expected = r"""Aggregate \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=8\) \(Current loop: actual rows=0, loop number=1\)
|
218 |
| - -> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=0, loop number=1\) |
| 232 | + -> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=\d+, loop number=1\) |
219 | 233 | Hash Cond: \(foo.c1 = bar.c1\)
|
220 |
| - -> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1, loop number=1\) |
221 |
| - -> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=0, loop number=1\) |
| 234 | + Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\) |
| 235 | + -> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1000000, loop number=1\) |
| 236 | + -> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=500000, loop number=1\) |
222 | 237 | Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
|
223 | 238 | -> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=\d+, loop number=1\)"""
|
224 | 239 |
|
225 |
| - qs, notices = common.onetime_query_state(config, acon, query, {'costs': True}) |
226 |
| - assert len(qs) == 1 and re.match(expected, qs[0][3]) |
| 240 | + qs, notices = common.onetime_query_state_locks(config, acon1, acon2, query, {'costs': True}) |
| 241 | + |
| 242 | + assert len(qs) == 2 and re.match(expected, qs[0][3]) |
227 | 243 | assert len(notices) == 0
|
228 | 244 |
|
229 |
| - common.n_close((acon,)) |
| 245 | + common.n_close((acon1, acon2)) |
230 | 246 |
|
231 | 247 | def test_buffers(config):
|
232 | 248 | """test buffer statistics"""
|
233 | 249 |
|
234 |
| - acon, = common.n_async_connect(config) |
235 |
| - query = 'select count(*) from foo join bar on foo.c1=bar.c1' |
| 250 | + acon1, acon2 = common.n_async_connect(config, 2) |
| 251 | + query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1' |
236 | 252 | expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
|
237 |
| - -> Hash Join \(Current loop: actual rows=0, loop number=1\) |
| 253 | + -> Hash Join \(Current loop: actual rows=\d+, loop number=1\) |
238 | 254 | Hash Cond: \(foo.c1 = bar.c1\)
|
239 |
| - -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\) |
| 255 | + Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\) |
| 256 | + Buffers: shared hit=\d+, temp read=\d+ written=\d+ |
| 257 | + -> Seq Scan on foo \(Current loop: actual rows=1000000, loop number=1\) |
240 | 258 | Buffers: [^\n]*
|
241 |
| - -> Hash \(Current loop: actual rows=0, loop number=1\) |
| 259 | + -> Hash \(Current loop: actual rows=500000, loop number=1\) |
242 | 260 | Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
|
| 261 | + Buffers: shared hit=\d+, temp written=\d+ |
243 | 262 | -> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)
|
244 | 263 | Buffers: .*"""
|
245 | 264 |
|
246 |
| - common.set_guc(acon, 'pg_query_state.enable_buffers', 'on') |
| 265 | + common.set_guc(acon1, 'pg_query_state.enable_buffers', 'on') |
247 | 266 |
|
248 |
| - qs, notices = common.onetime_query_state(config, acon, query, {'buffers': True}) |
249 |
| - assert len(qs) == 1 and re.match(expected, qs[0][3]) |
| 267 | + qs, notices = common.onetime_query_state_locks(config, acon1, acon2, query, {'buffers': True}) |
| 268 | + |
| 269 | + assert len(qs) == 2 |
| 270 | + assert re.match(expected, qs[0][3]) |
250 | 271 | assert len(notices) == 0
|
251 | 272 |
|
252 |
| - common.n_close((acon,)) |
| 273 | + common.n_close((acon1, acon2)) |
253 | 274 |
|
254 | 275 | def test_timing(config):
|
255 | 276 | """test timing statistics"""
|
256 | 277 |
|
257 |
| - acon, = common.n_async_connect(config) |
258 |
| - query = 'select count(*) from foo join bar on foo.c1=bar.c1' |
| 278 | + acon1, acon2 = common.n_async_connect(config, 2) |
| 279 | + query = 'select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1' |
| 280 | + |
259 | 281 | expected = r"""Aggregate \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
|
260 |
| - -> Hash Join \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\) |
| 282 | + -> Hash Join \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+, loop number=1\) |
261 | 283 | Hash Cond: \(foo.c1 = bar.c1\)
|
262 |
| - -> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1, loop number=1\) |
263 |
| - -> Hash \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\) |
| 284 | + Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\) |
| 285 | + -> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1000000, loop number=1\) |
| 286 | + -> Hash \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=500000, loop number=1\) |
264 | 287 | Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
|
265 | 288 | -> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+, loop number=1\)"""
|
266 | 289 |
|
267 |
| - common.set_guc(acon, 'pg_query_state.enable_timing', 'on') |
| 290 | + common.set_guc(acon1, 'pg_query_state.enable_timing', 'on') |
268 | 291 |
|
269 |
| - qs, notices = common.onetime_query_state(config, acon, query, {'timing': True}) |
270 |
| - assert len(qs) == 1 and re.match(expected, qs[0][3]) |
| 292 | + qs, notices = common.onetime_query_state_locks(config, acon1, acon2, query, {'timing': True}) |
| 293 | + |
| 294 | + assert len(qs) == 2 |
| 295 | + assert re.match(expected, qs[0][3]) |
271 | 296 | assert len(notices) == 0
|
272 | 297 |
|
273 |
| - common.n_close((acon,)) |
| 298 | + common.n_close((acon1, acon2)) |
274 | 299 |
|
275 | 300 | def check_plan(plan):
|
276 | 301 | assert 'Current loop' in plan
|
|
0 commit comments