Skip to content

Commit b97e9e8

Browse files
committed
PGPRO-427: Put in order thread arguments
1 parent a3b9d1e commit b97e9e8

File tree

6 files changed

+108
-82
lines changed

6 files changed

+108
-82
lines changed

src/backup.c

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,10 @@ remote_copy_file(PGconn *conn, pgFile* file)
372372
static void *
373373
remote_backup_files(void *arg)
374374
{
375-
int i;
376-
backup_files_args *arguments = (backup_files_args *) arg;
377-
int n_backup_files_list = parray_num(arguments->backup_files_list);
378-
PGconn *file_backup_conn = NULL;
375+
int i;
376+
backup_files_arg *arguments = (backup_files_arg *) arg;
377+
int n_backup_files_list = parray_num(arguments->files_list);
378+
PGconn *file_backup_conn = NULL;
379379

380380
for (i = 0; i < n_backup_files_list; i++)
381381
{
@@ -385,7 +385,7 @@ remote_backup_files(void *arg)
385385
pgFile *file;
386386
int row_length;
387387

388-
file = (pgFile *) parray_get(arguments->backup_files_list, i);
388+
file = (pgFile *) parray_get(arguments->files_list, i);
389389

390390
/* We have already copied all directories */
391391
if (S_ISDIR(file->mode))
@@ -465,12 +465,11 @@ do_backup_instance(void)
465465
XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr;
466466

467467
/* arrays with meta info for multi threaded backup */
468-
pthread_t *backup_threads;
469-
backup_files_args *backup_threads_args;
468+
pthread_t *threads;
469+
backup_files_arg *threads_args;
470470
bool backup_isok = true;
471471

472472
pgBackup *prev_backup = NULL;
473-
char prev_backup_filelist_path[MAXPGPATH];
474473
parray *prev_backup_filelist = NULL;
475474

476475
elog(LOG, "Database backup start");
@@ -512,6 +511,7 @@ do_backup_instance(void)
512511
current.backup_mode == BACKUP_MODE_DIFF_DELTA)
513512
{
514513
parray *backup_list;
514+
char prev_backup_filelist_path[MAXPGPATH];
515515

516516
/* get list of backups already taken */
517517
backup_list = catalog_get_backup_list(INVALID_BACKUP_ID);
@@ -524,8 +524,8 @@ do_backup_instance(void)
524524
"Create new FULL backup before an incremental one.");
525525
parray_free(backup_list);
526526

527-
pgBackupGetPath(prev_backup, prev_backup_filelist_path, lengthof(prev_backup_filelist_path),
528-
DATABASE_FILE_LIST);
527+
pgBackupGetPath(prev_backup, prev_backup_filelist_path,
528+
lengthof(prev_backup_filelist_path), DATABASE_FILE_LIST);
529529
/* Files of previous backup needed by DELTA backup */
530530
prev_backup_filelist = dir_read_file_list(NULL, prev_backup_filelist_path);
531531

@@ -701,20 +701,20 @@ do_backup_instance(void)
701701
parray_qsort(backup_files_list, pgFileCompareSize);
702702

703703
/* init thread args with own file lists */
704-
backup_threads = (pthread_t *) palloc(sizeof(pthread_t)*num_threads);
705-
backup_threads_args = (backup_files_args *) palloc(sizeof(backup_files_args)*num_threads);
704+
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
705+
threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
706706

707707
for (i = 0; i < num_threads; i++)
708708
{
709-
backup_files_args *arg = &(backup_threads_args[i]);
709+
backup_files_arg *arg = &(threads_args[i]);
710710

711711
arg->from_root = pgdata;
712712
arg->to_root = database_path;
713-
arg->backup_files_list = backup_files_list;
714-
arg->prev_backup_filelist = prev_backup_filelist;
715-
arg->prev_backup_start_lsn = prev_backup_start_lsn;
716-
arg->thread_backup_conn = NULL;
717-
arg->thread_cancel_conn = NULL;
713+
arg->files_list = backup_files_list;
714+
arg->prev_filelist = prev_backup_filelist;
715+
arg->prev_start_lsn = prev_backup_start_lsn;
716+
arg->backup_conn = NULL;
717+
arg->cancel_conn = NULL;
718718
/* By default there are some error */
719719
arg->ret = 1;
720720
}
@@ -723,20 +723,21 @@ do_backup_instance(void)
723723
elog(LOG, "Start transfering data files");
724724
for (i = 0; i < num_threads; i++)
725725
{
726-
backup_files_args *arg = &(backup_threads_args[i]);
726+
backup_files_arg *arg = &(threads_args[i]);
727+
727728
elog(VERBOSE, "Start thread num: %i", i);
728729

729730
if (!is_remote_backup)
730-
pthread_create(&backup_threads[i], NULL, backup_files, arg);
731+
pthread_create(&threads[i], NULL, backup_files, arg);
731732
else
732-
pthread_create(&backup_threads[i], NULL, remote_backup_files, arg);
733+
pthread_create(&threads[i], NULL, remote_backup_files, arg);
733734
}
734735

735736
/* Wait threads */
736737
for (i = 0; i < num_threads; i++)
737738
{
738-
pthread_join(backup_threads[i], NULL);
739-
if (backup_threads_args[i].ret == 1)
739+
pthread_join(threads[i], NULL);
740+
if (threads_args[i].ret == 1)
740741
backup_isok = false;
741742
}
742743
if (backup_isok)
@@ -2021,17 +2022,17 @@ backup_disconnect(bool fatal, void *userdata)
20212022
static void *
20222023
backup_files(void *arg)
20232024
{
2024-
int i;
2025-
backup_files_args *arguments = (backup_files_args *) arg;
2026-
int n_backup_files_list = parray_num(arguments->backup_files_list);
2025+
int i;
2026+
backup_files_arg *arguments = (backup_files_arg *) arg;
2027+
int n_backup_files_list = parray_num(arguments->files_list);
20272028

20282029
/* backup a file */
20292030
for (i = 0; i < n_backup_files_list; i++)
20302031
{
20312032
int ret;
20322033
struct stat buf;
2034+
pgFile *file = (pgFile *) parray_get(arguments->files_list, i);
20332035

2034-
pgFile *file = (pgFile *) parray_get(arguments->backup_files_list, i);
20352036
elog(VERBOSE, "Copying file: \"%s\" ", file->path);
20362037
if (!pg_atomic_test_set_flag(&file->lock))
20372038
continue;
@@ -2077,11 +2078,15 @@ backup_files(void *arg)
20772078
{
20782079
int p;
20792080
char *relative;
2080-
int n_prev_backup_files_list = parray_num(arguments->prev_backup_filelist);
2081+
int n_prev_files = parray_num(arguments->prev_filelist);
2082+
20812083
relative = GetRelativePath(file->path, arguments->from_root);
2082-
for (p = 0; p < n_prev_backup_files_list; p++)
2084+
for (p = 0; p < n_prev_files; p++)
20832085
{
2084-
pgFile *prev_file = (pgFile *) parray_get(arguments->prev_backup_filelist, p);
2086+
pgFile *prev_file;
2087+
2088+
prev_file = (pgFile *) parray_get(arguments->prev_filelist, p);
2089+
20852090
if (strcmp(relative, prev_file->path) == 0)
20862091
{
20872092
/* File exists in previous backup */
@@ -2098,7 +2103,7 @@ backup_files(void *arg)
20982103
if (!backup_data_file(arguments,
20992104
arguments->from_root,
21002105
arguments->to_root, file,
2101-
arguments->prev_backup_start_lsn,
2106+
arguments->prev_start_lsn,
21022107
current.backup_mode))
21032108
{
21042109
file->write_size = BYTES_INVALID;
@@ -2130,8 +2135,8 @@ backup_files(void *arg)
21302135
}
21312136

21322137
/* Close connection */
2133-
if (arguments->thread_backup_conn)
2134-
pgut_disconnect(arguments->thread_backup_conn);
2138+
if (arguments->backup_conn)
2139+
pgut_disconnect(arguments->backup_conn);
21352140

21362141
/* Data files transferring is successful */
21372142
arguments->ret = 0;
@@ -2633,7 +2638,7 @@ get_last_ptrack_lsn(void)
26332638
}
26342639

26352640
char *
2636-
pg_ptrack_get_block(backup_files_args *arguments,
2641+
pg_ptrack_get_block(backup_files_arg *arguments,
26372642
Oid dbOid,
26382643
Oid tblsOid,
26392644
Oid relOid,
@@ -2658,17 +2663,17 @@ pg_ptrack_get_block(backup_files_args *arguments,
26582663
sprintf(params[2], "%i", relOid);
26592664
sprintf(params[3], "%u", blknum);
26602665

2661-
if (arguments->thread_backup_conn == NULL)
2666+
if (arguments->backup_conn == NULL)
26622667
{
2663-
arguments->thread_backup_conn = pgut_connect(pgut_dbname);
2668+
arguments->backup_conn = pgut_connect(pgut_dbname);
26642669
}
26652670

2666-
if (arguments->thread_cancel_conn == NULL)
2667-
arguments->thread_cancel_conn = PQgetCancel(arguments->thread_backup_conn);
2671+
if (arguments->cancel_conn == NULL)
2672+
arguments->cancel_conn = PQgetCancel(arguments->backup_conn);
26682673

26692674
//elog(LOG, "db %i pg_ptrack_get_block(%i, %i, %u)",dbOid, tblsOid, relOid, blknum);
2670-
res = pgut_execute_parallel(arguments->thread_backup_conn,
2671-
arguments->thread_cancel_conn,
2675+
res = pgut_execute_parallel(arguments->backup_conn,
2676+
arguments->cancel_conn,
26722677
"SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)",
26732678
4, (const char **)params, true);
26742679

src/data.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
221221
* to the backup file.
222222
*/
223223
static void
224-
backup_data_page(backup_files_args *arguments,
224+
backup_data_page(backup_files_arg *arguments,
225225
pgFile *file, XLogRecPtr prev_backup_start_lsn,
226226
BlockNumber blknum, BlockNumber nblocks,
227227
FILE *in, FILE *out,
@@ -409,7 +409,7 @@ backup_data_page(backup_files_args *arguments,
409409
* backup with special header.
410410
*/
411411
bool
412-
backup_data_file(backup_files_args* arguments,
412+
backup_data_file(backup_files_arg* arguments,
413413
const char *from_root, const char *to_root,
414414
pgFile *file, XLogRecPtr prev_backup_start_lsn,
415415
BackupMode backup_mode)

src/parsexlog.c

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*
66
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
77
* Portions Copyright (c) 1994, Regents of the University of California
8-
* Portions Copyright (c) 2015-2017, Postgres Professional
8+
* Portions Copyright (c) 2015-2018, Postgres Professional
99
*
1010
*-------------------------------------------------------------------------
1111
*/
@@ -101,6 +101,19 @@ typedef struct XLogPageReadPrivate
101101
TimeLineID tli;
102102
} XLogPageReadPrivate;
103103

104+
/* An argument for a thread function */
105+
typedef struct
106+
{
107+
parray *files;
108+
bool corrupted;
109+
110+
/*
111+
* Return value from the thread.
112+
* 0 means there is no error, 1 - there is an error.
113+
*/
114+
int ret;
115+
} xlog_thread_arg;
116+
104117
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
105118
XLogRecPtr targetPagePtr,
106119
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
@@ -125,6 +138,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
125138
XLogSegNo endSegNo,
126139
nextSegNo = 0;
127140

141+
pthread_t threads[num_threads];
142+
xlog_thread_arg thread_args[num_threads];
143+
128144
elog(LOG, "Compiling pagemap");
129145
if (!XRecOffIsValid(startpoint))
130146
elog(ERROR, "Invalid startpoint value %X/%X",

src/pg_probackup.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -276,18 +276,20 @@ typedef struct
276276
{
277277
const char *from_root;
278278
const char *to_root;
279-
parray *backup_files_list;
280-
parray *prev_backup_filelist;
281-
XLogRecPtr prev_backup_start_lsn;
282-
PGconn *thread_backup_conn;
283-
PGcancel *thread_cancel_conn;
279+
280+
parray *files_list;
281+
parray *prev_filelist;
282+
XLogRecPtr prev_start_lsn;
283+
284+
PGconn *backup_conn;
285+
PGcancel *cancel_conn;
284286

285287
/*
286288
* Return value from the thread.
287289
* 0 means there is no error, 1 - there is an error.
288290
*/
289291
int ret;
290-
} backup_files_args;
292+
} backup_files_arg;
291293

292294
/*
293295
* return pointer that exceeds the length of prefix from character string.
@@ -381,7 +383,7 @@ extern const char *deparse_backup_mode(BackupMode mode);
381383
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
382384
BlockNumber blkno);
383385

384-
extern char *pg_ptrack_get_block(backup_files_args *arguments,
386+
extern char *pg_ptrack_get_block(backup_files_arg *arguments,
385387
Oid dbOid, Oid tblsOid, Oid relOid,
386388
BlockNumber blknum,
387389
size_t *result_size);
@@ -485,7 +487,7 @@ extern int pgFileCompareLinked(const void *f1, const void *f2);
485487
extern int pgFileCompareSize(const void *f1, const void *f2);
486488

487489
/* in data.c */
488-
extern bool backup_data_file(backup_files_args* arguments,
490+
extern bool backup_data_file(backup_files_arg* arguments,
489491
const char *from_root, const char *to_root,
490492
pgFile *file, XLogRecPtr prev_backup_start_lsn,
491493
BackupMode backup_mode);

src/restore.c

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ typedef struct
2828
* 0 means there is no error, 1 - there is an error.
2929
*/
3030
int ret;
31-
} restore_files_args;
31+
} restore_files_arg;
3232

3333
/* Tablespace mapping structures */
3434

@@ -363,8 +363,8 @@ restore_backup(pgBackup *backup)
363363
parray *files;
364364
int i;
365365
/* arrays with meta info for multi threaded backup */
366-
pthread_t *restore_threads;
367-
restore_files_args *restore_threads_args;
366+
pthread_t *threads;
367+
restore_files_arg *threads_args;
368368
bool restore_isok = true;
369369

370370
if (backup->status != BACKUP_STATUS_OK)
@@ -398,43 +398,44 @@ restore_backup(pgBackup *backup)
398398
pgBackupGetPath(backup, list_path, lengthof(list_path), DATABASE_FILE_LIST);
399399
files = dir_read_file_list(database_path, list_path);
400400

401-
restore_threads = (pthread_t *) palloc(sizeof(pthread_t)*num_threads);
402-
restore_threads_args = (restore_files_args *) palloc(sizeof(restore_files_args)*num_threads);
401+
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
402+
threads_args = (restore_files_arg *) palloc(sizeof(restore_files_arg)*num_threads);
403403

404404
/* setup threads */
405405
for (i = 0; i < parray_num(files); i++)
406406
{
407-
pgFile *file = (pgFile *) parray_get(files, i);
407+
pgFile *file = (pgFile *) parray_get(files, i);
408+
408409
pg_atomic_clear_flag(&file->lock);
409410
}
410411

411412
/* Restore files into target directory */
412413
for (i = 0; i < num_threads; i++)
413414
{
414-
restore_files_args *arg = &(restore_threads_args[i]);
415+
restore_files_arg *arg = &(threads_args[i]);
415416

416417
arg->files = files;
417418
arg->backup = backup;
418419
/* By default there are some error */
419-
arg->ret = 1;
420+
threads_args[i].ret = 1;
420421

421422
elog(LOG, "Start thread for num:%li", parray_num(files));
422423

423-
pthread_create(&restore_threads[i], NULL, restore_files, arg);
424+
pthread_create(&threads[i], NULL, restore_files, arg);
424425
}
425426

426427
/* Wait theads */
427428
for (i = 0; i < num_threads; i++)
428429
{
429-
pthread_join(restore_threads[i], NULL);
430-
if (restore_threads_args[i].ret == 1)
430+
pthread_join(threads[i], NULL);
431+
if (threads_args[i].ret == 1)
431432
restore_isok = false;
432433
}
433434
if (!restore_isok)
434435
elog(ERROR, "Data files restoring failed");
435436

436-
pfree(restore_threads);
437-
pfree(restore_threads_args);
437+
pfree(threads);
438+
pfree(threads_args);
438439

439440
/* cleanup */
440441
parray_walk(files, pgFileFree);
@@ -710,7 +711,7 @@ static void *
710711
restore_files(void *arg)
711712
{
712713
int i;
713-
restore_files_args *arguments = (restore_files_args *)arg;
714+
restore_files_arg *arguments = (restore_files_arg *)arg;
714715

715716
for (i = 0; i < parray_num(arguments->files); i++)
716717
{

0 commit comments

Comments
 (0)