Skip to content

Commit 45988aa

Browse files
committed
Make replication connection outside StreamLog()
1 parent ad083f8 commit 45988aa

File tree

1 file changed

+38
-48
lines changed

1 file changed

+38
-48
lines changed

src/backup.c

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ const char *progname = "pg_probackup";
4646
/* list of files contained in backup */
4747
static parray *backup_files_list = NULL;
4848

49-
static pthread_mutex_t start_stream_mut = PTHREAD_MUTEX_INITIALIZER;
50-
5149
/*
5250
* We need to wait end of WAL streaming before execute pg_stop_backup().
5351
*/
5452
typedef struct
5553
{
5654
const char *basedir;
55+
PGconn *conn;
56+
5757
/*
5858
* Return value from the thread.
5959
* 0 means there is no error, 1 - there is an error.
@@ -62,7 +62,7 @@ typedef struct
6262
} StreamThreadArg;
6363

6464
static pthread_t stream_thread;
65-
static StreamThreadArg stream_thread_arg = {"", 1};
65+
static StreamThreadArg stream_thread_arg = {"", NULL, 1};
6666

6767
static int is_ptrack_enable = false;
6868
bool is_ptrack_support = false;
@@ -558,17 +558,39 @@ do_backup_instance(void)
558558
dir_create_dir(dst_backup_path, DIR_PERMISSION);
559559

560560
stream_thread_arg.basedir = dst_backup_path;
561+
562+
/*
563+
* Connect in replication mode to the server.
564+
*/
565+
stream_thread_arg.conn = pgut_connect_replication(pgut_dbname);
566+
567+
if (!CheckServerVersionForStreaming(stream_thread_arg.conn))
568+
{
569+
PQfinish(stream_thread_arg.conn);
570+
/*
571+
* Error message already written in CheckServerVersionForStreaming().
572+
* There's no hope of recovering from a version mismatch, so don't
573+
* retry.
574+
*/
575+
elog(ERROR, "Cannot continue backup because stream connect has failed.");
576+
}
577+
578+
/*
579+
* Identify server, obtaining start LSN position and current timeline ID
580+
* at the same time, necessary if not valid data can be found in the
581+
* existing output directory.
582+
*/
583+
if (!RunIdentifySystem(stream_thread_arg.conn, NULL, NULL, NULL, NULL))
584+
{
585+
PQfinish(stream_thread_arg.conn);
586+
elog(ERROR, "Cannot continue backup because stream connect has failed.");
587+
}
588+
561589
/* By default there are some error */
562590
stream_thread_arg.ret = 1;
563591

564-
pthread_mutex_lock(&start_stream_mut);
565592
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog,
566593
&stream_thread_arg);
567-
pthread_mutex_lock(&start_stream_mut);
568-
if (conn == NULL)
569-
elog(ERROR, "Cannot continue backup because stream connect has failed.");
570-
571-
pthread_mutex_unlock(&start_stream_mut);
572594
}
573595

574596
/* initialize backup list */
@@ -2653,43 +2675,11 @@ StreamLog(void *arg)
26532675
TimeLineID starttli;
26542676
StreamThreadArg *stream_arg = (StreamThreadArg *) arg;
26552677

2656-
/*
2657-
* Connect in replication mode to the server
2658-
*/
2659-
if (conn == NULL)
2660-
conn = pgut_connect_replication(pgut_dbname);
2661-
if (!conn)
2662-
{
2663-
pthread_mutex_unlock(&start_stream_mut);
2664-
/* Error message already written in GetConnection() */
2665-
return;
2666-
}
2667-
2668-
if (!CheckServerVersionForStreaming(conn))
2669-
{
2670-
/*
2671-
* Error message already written in CheckServerVersionForStreaming().
2672-
* There's no hope of recovering from a version mismatch, so don't
2673-
* retry.
2674-
*/
2675-
disconnect_and_exit(1);
2676-
}
2677-
2678-
/*
2679-
* Identify server, obtaining start LSN position and current timeline ID
2680-
* at the same time, necessary if not valid data can be found in the
2681-
* existing output directory.
2682-
*/
2683-
if (!RunIdentifySystem(conn, NULL, &starttli, &startpos, NULL))
2684-
disconnect_and_exit(1);
2685-
2686-
/* Ok we have normal stream connect and main process can work again */
2687-
pthread_mutex_unlock(&start_stream_mut);
2688-
26892678
/*
26902679
* We must use startpos as start_lsn from start_backup
26912680
*/
26922681
startpos = current.start_lsn;
2682+
starttli = current.tli;
26932683

26942684
/*
26952685
* Always start streaming at the beginning of a segment
@@ -2730,7 +2720,7 @@ StreamLog(void *arg)
27302720
ctl.synchronous = false;
27312721
ctl.mark_done = false;
27322722

2733-
if(ReceiveXlogStream(conn, &ctl) == false)
2723+
if(ReceiveXlogStream(stream_arg->conn, &ctl) == false)
27342724
elog(ERROR, "Problem in receivexlog");
27352725

27362726
#if PG_VERSION_NUM >= 100000
@@ -2740,18 +2730,18 @@ StreamLog(void *arg)
27402730
#endif
27412731
}
27422732
#else
2743-
if(ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
2744-
stop_streaming, standby_message_timeout, NULL,
2745-
false, false) == false)
2733+
if(ReceiveXlogStream(stream_arg->conn, startpos, starttli, NULL, basedir,
2734+
stop_streaming, standby_message_timeout, NULL,
2735+
false, false) == false)
27462736
elog(ERROR, "Problem in receivexlog");
27472737
#endif
27482738

27492739
elog(LOG, _("finished streaming WAL at %X/%X (timeline %u)"),
27502740
(uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, starttli);
27512741
stream_arg->ret = 0;
27522742

2753-
PQfinish(conn);
2754-
conn = NULL;
2743+
PQfinish(stream_arg->conn);
2744+
stream_arg->conn = NULL;
27552745
}
27562746

27572747
/*

0 commit comments

Comments
 (0)