@@ -149,12 +149,16 @@ def test_subscriber_lag(
149
149
check_pgbench_still_running (pub_workload , "pub" )
150
150
check_pgbench_still_running (sub_workload , "sub" )
151
151
152
- with (
153
- psycopg2 .connect (pub_connstr ) as pub_conn ,
154
- psycopg2 .connect (sub_connstr ) as sub_conn ,
155
- ):
156
- with pub_conn .cursor () as pub_cur , sub_conn .cursor () as sub_cur :
157
- lag = measure_logical_replication_lag (sub_cur , pub_cur )
152
+ pub_conn = psycopg2 .connect (pub_connstr )
153
+ sub_conn = psycopg2 .connect (sub_connstr )
154
+ pub_conn .autocommit = True
155
+ sub_conn .autocommit = True
156
+
157
+ with pub_conn .cursor () as pub_cur , sub_conn .cursor () as sub_cur :
158
+ lag = measure_logical_replication_lag (sub_cur , pub_cur )
159
+
160
+ pub_conn .close ()
161
+ sub_conn .close ()
158
162
159
163
log .info (f"Replica lagged behind master by { lag } seconds" )
160
164
zenbenchmark .record ("replica_lag" , lag , "s" , MetricReport .LOWER_IS_BETTER )
@@ -206,6 +210,7 @@ def test_publisher_restart(
206
210
sub_conn = psycopg2 .connect (sub_connstr )
207
211
pub_conn .autocommit = True
208
212
sub_conn .autocommit = True
213
+
209
214
with pub_conn .cursor () as pub_cur , sub_conn .cursor () as sub_cur :
210
215
pub_cur .execute ("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'" )
211
216
pub_exists = len (pub_cur .fetchall ()) != 0
@@ -222,6 +227,7 @@ def test_publisher_restart(
222
227
sub_cur .execute (f"create subscription sub1 connection '{ pub_connstr } ' publication pub1" )
223
228
224
229
initial_sync_lag = measure_logical_replication_lag (sub_cur , pub_cur )
230
+
225
231
pub_conn .close ()
226
232
sub_conn .close ()
227
233
@@ -248,12 +254,17 @@ def test_publisher_restart(
248
254
["pgbench" , "-c10" , pgbench_duration , "-Mprepared" ],
249
255
env = pub_env ,
250
256
)
251
- with (
252
- psycopg2 .connect (pub_connstr ) as pub_conn ,
253
- psycopg2 .connect (sub_connstr ) as sub_conn ,
254
- ):
255
- with pub_conn .cursor () as pub_cur , sub_conn .cursor () as sub_cur :
256
- lag = measure_logical_replication_lag (sub_cur , pub_cur )
257
+
258
+ pub_conn = psycopg2 .connect (pub_connstr )
259
+ sub_conn = psycopg2 .connect (sub_connstr )
260
+ pub_conn .autocommit = True
261
+ sub_conn .autocommit = True
262
+
263
+ with pub_conn .cursor () as pub_cur , sub_conn .cursor () as sub_cur :
264
+ lag = measure_logical_replication_lag (sub_cur , pub_cur )
265
+
266
+ pub_conn .close ()
267
+ sub_conn .close ()
257
268
258
269
log .info (f"Replica lagged behind master by { lag } seconds" )
259
270
zenbenchmark .record ("replica_lag" , lag , "s" , MetricReport .LOWER_IS_BETTER )
@@ -288,58 +299,56 @@ def test_snap_files(
288
299
env = benchmark_project_pub .pgbench_env
289
300
connstr = benchmark_project_pub .connstr
290
301
291
- with psycopg2 .connect (connstr ) as conn :
292
- conn .autocommit = True
293
- with conn .cursor () as cur :
294
- cur .execute ("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'" )
295
- is_super = cast ("bool" , cur .fetchall ()[0 ][0 ])
296
- assert is_super , "This benchmark won't work if we don't have superuser"
302
+ conn = psycopg2 .connect (connstr )
303
+ conn .autocommit = True
304
+
305
+ with conn .cursor () as cur :
306
+ cur .execute ("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'" )
307
+ is_super = cast ("bool" , cur .fetchall ()[0 ][0 ])
308
+ assert is_super , "This benchmark won't work if we don't have superuser"
309
+
310
+ conn .close ()
297
311
298
312
pg_bin .run_capture (["pgbench" , "-i" , "-I" , "dtGvp" , "-s100" ], env = env )
299
313
300
314
conn = psycopg2 .connect (connstr )
301
315
conn .autocommit = True
302
- cur = conn .cursor ()
303
- cur .execute ("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1" )
304
-
305
- with psycopg2 .connect (connstr ) as conn :
306
- conn .autocommit = True
307
- with conn .cursor () as cur :
308
- cur .execute ("SELECT pg_reload_conf()" )
309
-
310
- with psycopg2 .connect (connstr ) as conn :
311
- conn .autocommit = True
312
- with conn .cursor () as cur :
313
- cur .execute (
314
- """
315
- DO $$
316
- BEGIN
317
- IF EXISTS (
318
- SELECT 1
319
- FROM pg_replication_slots
320
- WHERE slot_name = 'slotter'
321
- ) THEN
322
- PERFORM pg_drop_replication_slot('slotter');
323
- END IF;
324
- END $$;
316
+
317
+ with conn .cursor () as cur :
318
+ cur .execute (
325
319
"""
326
- )
327
- cur .execute ("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')" )
320
+ DO $$
321
+ BEGIN
322
+ IF EXISTS (
323
+ SELECT 1
324
+ FROM pg_replication_slots
325
+ WHERE slot_name = 'slotter'
326
+ ) THEN
327
+ PERFORM pg_drop_replication_slot('slotter');
328
+ END IF;
329
+ END $$;
330
+ """
331
+ )
332
+ cur .execute ("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')" )
333
+
334
+ conn .close ()
328
335
329
336
workload = pg_bin .run_nonblocking (["pgbench" , "-c10" , pgbench_duration , "-Mprepared" ], env = env )
330
337
try :
331
338
start = time .time ()
332
339
prev_measurement = time .time ()
333
340
while time .time () - start < test_duration_min * 60 :
334
- with psycopg2 .connect (connstr ) as conn :
335
- with conn .cursor () as cur :
336
- cur .execute (
337
- "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
338
- )
339
- check_pgbench_still_running (workload )
340
- cur .execute (
341
- "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
342
- )
341
+ conn = psycopg2 .connect (connstr )
342
+ conn .autocommit = True
343
+
344
+ with conn .cursor () as cur :
345
+ cur .execute (
346
+ "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
347
+ )
348
+ check_pgbench_still_running (workload )
349
+ cur .execute ("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" )
350
+
351
+ conn .close ()
343
352
344
353
# Measure storage
345
354
if time .time () - prev_measurement > test_interval_min * 60 :
0 commit comments