3131
3232static int standby_message_timeout = 10 * 1000 ; /* 10 sec = default */
3333static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr ;
34+ static XLogRecPtr stop_stream_lsn = InvalidXLogRecPtr ;
3435
3536/*
3637 * How long we should wait for streaming end in seconds.
@@ -45,11 +46,23 @@ const char *progname = "pg_probackup";
4546/* list of files contained in backup */
4647static parray * backup_files_list = NULL ;
4748
48- static pthread_mutex_t start_stream_mut = PTHREAD_MUTEX_INITIALIZER ;
4949/*
5050 * We need to wait end of WAL streaming before execute pg_stop_backup().
5151 */
52+ typedef struct
53+ {
54+ const char * basedir ;
55+ PGconn * conn ;
56+
57+ /*
58+ * Return value from the thread.
59+ * 0 means there is no error, 1 - there is an error.
60+ */
61+ int ret ;
62+ } StreamThreadArg ;
63+
5264static pthread_t stream_thread ;
65+ static StreamThreadArg stream_thread_arg = {"" , NULL , 1 };
5366
5467static int is_ptrack_enable = false;
5568bool is_ptrack_support = false;
@@ -423,6 +436,9 @@ remote_backup_files(void *arg)
423436 file -> path , (unsigned long ) file -> write_size );
424437 PQfinish (file_backup_conn );
425438 }
439+
440+ /* Data files transferring is successful */
441+ arguments -> ret = 0 ;
426442}
427443
428444/*
@@ -440,6 +456,7 @@ do_backup_instance(void)
440456
441457 pthread_t backup_threads [num_threads ];
442458 backup_files_args * backup_threads_args [num_threads ];
459+ bool backup_isok = true;
443460
444461 pgBackup * prev_backup = NULL ;
445462 char prev_backup_filelist_path [MAXPGPATH ];
@@ -540,13 +557,40 @@ do_backup_instance(void)
540557 join_path_components (dst_backup_path , database_path , PG_XLOG_DIR );
541558 dir_create_dir (dst_backup_path , DIR_PERMISSION );
542559
543- pthread_mutex_lock (& start_stream_mut );
544- pthread_create (& stream_thread , NULL , (void * (* )(void * )) StreamLog , dst_backup_path );
545- pthread_mutex_lock (& start_stream_mut );
546- if (conn == NULL )
560+ stream_thread_arg .basedir = dst_backup_path ;
561+
562+ /*
563+ * Connect in replication mode to the server.
564+ */
565+ stream_thread_arg .conn = pgut_connect_replication (pgut_dbname );
566+
567+ if (!CheckServerVersionForStreaming (stream_thread_arg .conn ))
568+ {
569+ PQfinish (stream_thread_arg .conn );
570+ /*
571+ * Error message already written in CheckServerVersionForStreaming().
572+ * There's no hope of recovering from a version mismatch, so don't
573+ * retry.
574+ */
575+ elog (ERROR , "Cannot continue backup because stream connect has failed." );
576+ }
577+
578+ /*
579+ * Identify server, obtaining start LSN position and current timeline ID
580+ * at the same time, necessary if not valid data can be found in the
581+ * existing output directory.
582+ */
583+ if (!RunIdentifySystem (stream_thread_arg .conn , NULL , NULL , NULL , NULL ))
584+ {
585+ PQfinish (stream_thread_arg .conn );
547586 elog (ERROR , "Cannot continue backup because stream connect has failed." );
587+ }
588+
589+ /* By default there are some error */
590+ stream_thread_arg .ret = 1 ;
548591
549- pthread_mutex_unlock (& start_stream_mut );
592+ pthread_create (& stream_thread , NULL , (void * (* )(void * )) StreamLog ,
593+ & stream_thread_arg );
550594 }
551595
552596 /* initialize backup list */
@@ -652,6 +696,8 @@ do_backup_instance(void)
652696 arg -> prev_backup_start_lsn = prev_backup_start_lsn ;
653697 arg -> thread_backup_conn = NULL ;
654698 arg -> thread_cancel_conn = NULL ;
699+ /* By default there are some error */
700+ arg -> ret = 1 ;
655701 backup_threads_args [i ] = arg ;
656702 }
657703
@@ -675,9 +721,15 @@ do_backup_instance(void)
675721 for (i = 0 ; i < num_threads ; i ++ )
676722 {
677723 pthread_join (backup_threads [i ], NULL );
724+ if (backup_threads_args [i ]-> ret == 1 )
725+ backup_isok = false;
726+
678727 pg_free (backup_threads_args [i ]);
679728 }
680- elog (LOG , "Data files are transfered" );
729+ if (backup_isok )
730+ elog (LOG , "Data files are transfered" );
731+ else
732+ elog (ERROR , "Data files transferring failed" );
681733
682734 /* clean previous backup file list */
683735 if (prev_backup_filelist )
@@ -776,10 +828,10 @@ do_backup(time_t start_time)
776828 is_checksum_enabled = pg_checksum_enable ();
777829
778830 if (is_checksum_enabled )
779- elog (LOG , "This PostgreSQL instance initialized with data block checksums. "
831+ elog (LOG , "This PostgreSQL instance was initialized with data block checksums. "
780832 "Data block corruption will be detected" );
781833 else
782- elog (WARNING , "This PostgreSQL instance initialized without data block checksums. "
834+ elog (WARNING , "This PostgreSQL instance was initialized without data block checksums. "
783835 "pg_probackup have no way to detect data block corruption without them. "
784836 "Reinitialize PGDATA with option '--data-checksums'." );
785837
@@ -1544,7 +1596,8 @@ pg_stop_backup(pgBackup *backup)
15441596 FILE * fp ;
15451597 pgFile * file ;
15461598 size_t len ;
1547- char * val = NULL ;
1599+ char * val = NULL ;
1600+ char * stop_backup_query = NULL ;
15481601
15491602 /*
15501603 * We will use this values if there are no transactions between start_lsn
@@ -1601,26 +1654,25 @@ pg_stop_backup(pgBackup *backup)
16011654 * pg_stop_backup(false) copy of the backup label and tablespace map
16021655 * so they can be written to disk by the caller.
16031656 */
1604- sent = pgut_send (conn ,
1605- "SELECT"
1657+ stop_backup_query = "SELECT"
16061658 " pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
16071659 " current_timestamp(0)::timestamptz,"
16081660 " lsn,"
16091661 " labelfile,"
16101662 " spcmapfile"
1611- " FROM pg_catalog.pg_stop_backup(false)" ,
1612- 0 , NULL , WARNING );
1663+ " FROM pg_catalog.pg_stop_backup(false)" ;
1664+
16131665 }
16141666 else
16151667 {
16161668
1617- sent = pgut_send (conn ,
1618- "SELECT"
1669+ stop_backup_query = "SELECT"
16191670 " pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
16201671 " current_timestamp(0)::timestamptz,"
1621- " pg_catalog.pg_stop_backup() as lsn" ,
1622- 0 , NULL , WARNING );
1672+ " pg_catalog.pg_stop_backup() as lsn" ;
16231673 }
1674+
1675+ sent = pgut_send (conn , stop_backup_query , 0 , NULL , WARNING );
16241676 pg_stop_backup_is_sent = true;
16251677 if (!sent )
16261678 elog (ERROR , "Failed to send pg_stop_backup query" );
@@ -1665,10 +1717,23 @@ pg_stop_backup(pgBackup *backup)
16651717 break ;
16661718 }
16671719 }
1720+
1721+ /* Check successfull execution of pg_stop_backup() */
16681722 if (!res )
16691723 elog (ERROR , "pg_stop backup() failed" );
16701724 else
1725+ {
1726+ switch (PQresultStatus (res ))
1727+ {
1728+ case PGRES_TUPLES_OK :
1729+ case PGRES_COMMAND_OK :
1730+ break ;
1731+ default :
1732+ elog (ERROR , "query failed: %s query was: %s" ,
1733+ PQerrorMessage (conn ), stop_backup_query );
1734+ }
16711735 elog (INFO , "pg_stop backup() successfully executed" );
1736+ }
16721737
16731738 backup_in_progress = false;
16741739
@@ -1771,8 +1836,12 @@ pg_stop_backup(pgBackup *backup)
17711836 PQclear (res );
17721837
17731838 if (stream_wal )
1839+ {
17741840 /* Wait for the completion of stream */
17751841 pthread_join (stream_thread , NULL );
1842+ if (stream_thread_arg .ret == 1 )
1843+ elog (ERROR , "WAL streaming failed" );
1844+ }
17761845 }
17771846
17781847 /* Fill in fields if that is the correct end of backup. */
@@ -1858,7 +1927,7 @@ backup_cleanup(bool fatal, void *userdata)
18581927 */
18591928 if (current .status == BACKUP_STATUS_RUNNING && current .end_time == 0 )
18601929 {
1861- elog (INFO , "Backup %s is running, setting its status to ERROR" ,
1930+ elog (WARNING , "Backup %s is running, setting its status to ERROR" ,
18621931 base36enc (current .start_time ));
18631932 current .end_time = time (NULL );
18641933 current .status = BACKUP_STATUS_ERROR ;
@@ -1870,7 +1939,7 @@ backup_cleanup(bool fatal, void *userdata)
18701939 */
18711940 if (backup_in_progress )
18721941 {
1873- elog (LOG , "backup in progress, stop backup" );
1942+ elog (WARNING , "backup in progress, stop backup" );
18741943 pg_stop_backup (NULL ); /* don't care stop_lsn on error case */
18751944 }
18761945}
@@ -2012,6 +2081,8 @@ backup_files(void *arg)
20122081 if (arguments -> thread_backup_conn )
20132082 pgut_disconnect (arguments -> thread_backup_conn );
20142083
2084+ /* Data files transferring is successful */
2085+ arguments -> ret = 0 ;
20152086}
20162087
20172088/*
@@ -2548,7 +2619,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
25482619
25492620 /* we assume that we get called once at the end of each segment */
25502621 if (segment_finished )
2551- elog (LOG , _ ("finished segment at %X/%X (timeline %u)\n " ),
2622+ elog (VERBOSE , _ ("finished segment at %X/%X (timeline %u)" ),
25522623 (uint32 ) (xlogpos >> 32 ), (uint32 ) xlogpos , timeline );
25532624
25542625 /*
@@ -2566,7 +2637,10 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
25662637 if (!XLogRecPtrIsInvalid (stop_backup_lsn ))
25672638 {
25682639 if (xlogpos > stop_backup_lsn )
2640+ {
2641+ stop_stream_lsn = xlogpos ;
25692642 return true;
2643+ }
25702644
25712645 /* pg_stop_backup() was executed, wait for the completion of stream */
25722646 if (stream_stop_timeout == 0 )
@@ -2600,45 +2674,13 @@ StreamLog(void *arg)
26002674{
26012675 XLogRecPtr startpos ;
26022676 TimeLineID starttli ;
2603- char * basedir = (char * )arg ;
2604-
2605- /*
2606- * Connect in replication mode to the server
2607- */
2608- if (conn == NULL )
2609- conn = pgut_connect_replication (pgut_dbname );
2610- if (!conn )
2611- {
2612- pthread_mutex_unlock (& start_stream_mut );
2613- /* Error message already written in GetConnection() */
2614- return ;
2615- }
2616-
2617- if (!CheckServerVersionForStreaming (conn ))
2618- {
2619- /*
2620- * Error message already written in CheckServerVersionForStreaming().
2621- * There's no hope of recovering from a version mismatch, so don't
2622- * retry.
2623- */
2624- disconnect_and_exit (1 );
2625- }
2626-
2627- /*
2628- * Identify server, obtaining start LSN position and current timeline ID
2629- * at the same time, necessary if not valid data can be found in the
2630- * existing output directory.
2631- */
2632- if (!RunIdentifySystem (conn , NULL , & starttli , & startpos , NULL ))
2633- disconnect_and_exit (1 );
2634-
2635- /* Ok we have normal stream connect and main process can work again */
2636- pthread_mutex_unlock (& start_stream_mut );
2677+ StreamThreadArg * stream_arg = (StreamThreadArg * ) arg ;
26372678
26382679 /*
26392680 * We must use startpos as start_lsn from start_backup
26402681 */
26412682 startpos = current .start_lsn ;
2683+ starttli = current .tli ;
26422684
26432685 /*
26442686 * Always start streaming at the beginning of a segment
@@ -2652,7 +2694,7 @@ StreamLog(void *arg)
26522694 /*
26532695 * Start the replication
26542696 */
2655- elog (LOG , _ ("starting log streaming at %X/%X (timeline %u)\n " ),
2697+ elog (LOG , _ ("started streaming WAL at %X/%X (timeline %u)" ),
26562698 (uint32 ) (startpos >> 32 ), (uint32 ) startpos , starttli );
26572699
26582700#if PG_VERSION_NUM >= 90600
@@ -2666,11 +2708,11 @@ StreamLog(void *arg)
26662708 ctl .sysidentifier = NULL ;
26672709
26682710#if PG_VERSION_NUM >= 100000
2669- ctl .walmethod = CreateWalDirectoryMethod (basedir , 0 , true);
2711+ ctl .walmethod = CreateWalDirectoryMethod (stream_arg -> basedir , 0 , true);
26702712 ctl .replication_slot = replication_slot ;
26712713 ctl .stop_socket = PGINVALID_SOCKET ;
26722714#else
2673- ctl .basedir = basedir ;
2715+ ctl .basedir = ( char * ) stream_arg -> basedir ;
26742716#endif
26752717
26762718 ctl .stream_stop = stop_streaming ;
@@ -2679,7 +2721,7 @@ StreamLog(void *arg)
26792721 ctl .synchronous = false;
26802722 ctl .mark_done = false;
26812723
2682- if (ReceiveXlogStream (conn , & ctl ) == false)
2724+ if (ReceiveXlogStream (stream_arg -> conn , & ctl ) == false)
26832725 elog (ERROR , "Problem in receivexlog" );
26842726
26852727#if PG_VERSION_NUM >= 100000
@@ -2689,14 +2731,18 @@ StreamLog(void *arg)
26892731#endif
26902732 }
26912733#else
2692- if (ReceiveXlogStream (conn , startpos , starttli , NULL , basedir ,
2693- stop_streaming , standby_message_timeout , NULL ,
2694- false, false) == false)
2734+ if (ReceiveXlogStream (stream_arg -> conn , startpos , starttli , NULL , basedir ,
2735+ stop_streaming , standby_message_timeout , NULL ,
2736+ false, false) == false)
26952737 elog (ERROR , "Problem in receivexlog" );
26962738#endif
26972739
2698- PQfinish (conn );
2699- conn = NULL ;
2740+ elog (LOG , _ ("finished streaming WAL at %X/%X (timeline %u)" ),
2741+ (uint32 ) (stop_stream_lsn >> 32 ), (uint32 ) stop_stream_lsn , starttli );
2742+ stream_arg -> ret = 0 ;
2743+
2744+ PQfinish (stream_arg -> conn );
2745+ stream_arg -> conn = NULL ;
27002746}
27012747
27022748/*
0 commit comments