Skip to content

Commit 6d91234

Browse files
committed
PGPRO-427: Fix bug in SimpleXLogPageRead() with parallel mode
1 parent e8037dd commit 6d91234

File tree

3 files changed

+78
-30
lines changed

3 files changed

+78
-30
lines changed

src/parsexlog.c

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,13 @@ static void *
141141
doExtractPageMap(void *arg)
142142
{
143143
xlog_thread_arg *extract_arg = (xlog_thread_arg *) arg;
144+
XLogPageReadPrivate *private_data;
144145
XLogReaderState *xlogreader;
145146
XLogSegNo nextSegNo = 0;
146147
char *errormsg;
147148

148-
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead,
149-
&extract_arg->private_data);
149+
private_data = &extract_arg->private_data;
150+
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data);
150151
if (xlogreader == NULL)
151152
elog(ERROR, "out of memory");
152153

@@ -158,6 +159,9 @@ doExtractPageMap(void *arg)
158159
(uint32) (extract_arg->startpoint >> 32),
159160
(uint32) (extract_arg->startpoint));
160161

162+
/* Switch WAL segment manually below without using SimpleXLogPageRead() */
163+
private_data->manual_switch = true;
164+
161165
do
162166
{
163167
XLogRecord *record;
@@ -171,23 +175,28 @@ doExtractPageMap(void *arg)
171175
{
172176
XLogRecPtr errptr;
173177

174-
/* Try to switch to the next WAL segment */
175-
if (extract_arg->private_data.need_switch)
178+
/*
179+
* Try to switch to the next WAL segment. Usually
180+
* SimpleXLogPageRead() does it by itself. But here we need to do it
181+
* manually to support threads.
182+
*/
183+
if (private_data->need_switch)
176184
{
177-
extract_arg->private_data.need_switch = false;
185+
private_data->need_switch = false;
178186

187+
/* Critical section */
179188
pthread_lock(&wal_segment_mutex);
180189
Assert(nextSegNoToRead);
181-
extract_arg->private_data.xlogsegno = nextSegNoToRead;
190+
private_data->xlogsegno = nextSegNoToRead;
182191
nextSegNoToRead++;
183192
pthread_mutex_unlock(&wal_segment_mutex);
184193

185194
/* We reach the end */
186-
if (extract_arg->private_data.xlogsegno > extract_arg->endSegNo)
195+
if (private_data->xlogsegno > extract_arg->endSegNo)
187196
break;
188197

189198
/* Adjust next record position */
190-
XLogSegNoOffsetToRecPtr(extract_arg->private_data.xlogsegno, 0,
199+
XLogSegNoOffsetToRecPtr(private_data->xlogsegno, 0,
191200
extract_arg->startpoint);
192201
/* Skip over the page header */
193202
extract_arg->startpoint = XLogFindNextRecord(xlogreader,
@@ -217,7 +226,7 @@ doExtractPageMap(void *arg)
217226
* start_lsn, we won't be able to build page map and PAGE backup will
218227
* be incorrect. Stop it and throw an error.
219228
*/
220-
PrintXLogCorruptionMsg(&extract_arg->private_data, ERROR);
229+
PrintXLogCorruptionMsg(private_data, ERROR);
221230
}
222231

223232
extractPageInfo(xlogreader);
@@ -255,8 +264,8 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
255264
int threads_need = 0;
256265
XLogSegNo endSegNo;
257266
bool extract_isok = true;
258-
pthread_t threads[num_threads];
259-
xlog_thread_arg thread_args[num_threads];
267+
pthread_t *threads;
268+
xlog_thread_arg *thread_args;
260269
time_t start_time,
261270
end_time;
262271

@@ -276,6 +285,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
276285
nextSegNoToRead = 0;
277286
time(&start_time);
278287

288+
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
289+
thread_args = (xlog_thread_arg *) palloc(sizeof(xlog_thread_arg)*num_threads);
290+
279291
/*
280292
* Initialize thread args.
281293
*
@@ -286,7 +298,6 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
286298
{
287299
InitXLogPageRead(&thread_args[i].private_data, archivedir, tli, false);
288300
thread_args[i].thread_num = i;
289-
thread_args[i].private_data.manual_switch = true;
290301

291302
thread_args[i].startpoint = startpoint;
292303
thread_args[i].endpoint = endpoint;
@@ -327,6 +338,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
327338
extract_isok = false;
328339
}
329340

341+
pfree(threads);
342+
pfree(thread_args);
343+
330344
time(&end_time);
331345
if (extract_isok)
332346
elog(LOG, "Pagemap compiled, time elapsed %.0f sec",
@@ -700,6 +714,10 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
700714
if (!XLByteInSeg(targetPagePtr, private_data->xlogsegno))
701715
{
702716
CleanupXLogPageRead(xlogreader);
717+
/*
718+
* Do not switch to next WAL segment in this function. Currently it is
719+
* manually switched only in doExtractPageMap().
720+
*/
703721
if (private_data->manual_switch)
704722
{
705723
private_data->need_switch = true;
@@ -709,6 +727,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
709727

710728
XLByteToSeg(targetPagePtr, private_data->xlogsegno);
711729

730+
/* Try to switch to the next WAL segment */
712731
if (!private_data->xlogexists)
713732
{
714733
char xlogfname[MAXFNAMELEN];

tests/helpers/ptrack_helpers.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -907,13 +907,26 @@ def version_to_num(self, version):
907907
return num
908908

909909
def switch_wal_segment(self, node):
910-
""" Execute pg_switch_wal/xlog() in given node"""
911-
if self.version_to_num(
912-
node.safe_psql("postgres", "show server_version")
913-
) >= self.version_to_num('10.0'):
914-
node.safe_psql("postgres", "select pg_switch_wal()")
910+
"""
911+
Execute pg_switch_wal/xlog() in given node
912+
913+
Args:
914+
node: an instance of PostgresNode or NodeConnection class
915+
"""
916+
if isinstance(node, testgres.PostgresNode):
917+
if self.version_to_num(
918+
node.safe_psql("postgres", "show server_version")
919+
) >= self.version_to_num('10.0'):
920+
node.safe_psql("postgres", "select pg_switch_wal()")
921+
else:
922+
node.safe_psql("postgres", "select pg_switch_xlog()")
915923
else:
916-
node.safe_psql("postgres", "select pg_switch_xlog()")
924+
if self.version_to_num(
925+
node.execute("show server_version")[0][0]
926+
) >= self.version_to_num('10.0'):
927+
node.execute("select pg_switch_wal()")
928+
else:
929+
node.execute("select pg_switch_xlog()")
917930
sleep(1)
918931

919932
def get_version(self, node):

tests/page.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,13 @@ def test_parallel_pagemap(self):
528528
"hot_standby": "on"
529529
}
530530
)
531+
node_restored = self.make_simple_node(
532+
base_dir="{0}/{1}/node_restored".format(module_name, fname),
533+
)
531534

532535
self.init_pb(backup_dir)
533536
self.add_instance(backup_dir, 'node', node)
537+
node_restored.cleanup()
534538
self.set_archiving(backup_dir, 'node', node)
535539
node.start()
536540

@@ -542,13 +546,14 @@ def test_parallel_pagemap(self):
542546
self.assertEqual(show_backup['backup-mode'], "FULL")
543547

544548
# Fill instance with data and make several WAL segments ...
545-
node.safe_psql("postgres", "create table test (id int)")
546-
for x in range(0, 8):
547-
node.safe_psql(
548-
"postgres",
549-
"insert into test select i from generate_series(1,100) s(i)")
550-
self.switch_wal_segment(node)
551-
count1 = node.safe_psql("postgres", "select count(*) from test")
549+
with node.connect() as conn:
550+
conn.execute("create table test (id int)")
551+
for x in range(0, 8):
552+
conn.execute(
553+
"insert into test select i from generate_series(1,100) s(i)")
554+
conn.commit()
555+
self.switch_wal_segment(conn)
556+
count1 = conn.execute("select count(*) from test")
552557

553558
# ... and do page backup with parallel pagemap
554559
self.backup_node(
@@ -558,18 +563,29 @@ def test_parallel_pagemap(self):
558563
self.assertEqual(show_backup['status'], "OK")
559564
self.assertEqual(show_backup['backup-mode'], "PAGE")
560565

561-
# Drop node and restore it
562-
node.cleanup()
563-
self.restore_node(backup_dir, 'node', node)
564-
node.start()
566+
if self.paranoia:
567+
pgdata = self.pgdata_content(node.data_dir)
568+
569+
# Restore it
570+
self.restore_node(backup_dir, 'node', node_restored)
571+
572+
# Physical comparison
573+
if self.paranoia:
574+
pgdata_restored = self.pgdata_content(node_restored.data_dir)
575+
self.compare_pgdata(pgdata, pgdata_restored)
576+
577+
node_restored.append_conf(
578+
"postgresql.auto.conf", "port = {0}".format(node_restored.port))
579+
node_restored.start()
565580

566581
# Check restored node
567-
count2 = node.safe_psql("postgres", "select count(*) from test")
582+
count2 = node_restored.execute("postgres", "select count(*) from test")
568583

569584
self.assertEqual(count1, count2)
570585

571586
# Clean after yourself
572587
node.cleanup()
588+
node_restored.cleanup()
573589
self.del_test_dir(module_name, fname)
574590

575591
def test_parallel_pagemap_1(self):

0 commit comments

Comments
 (0)