Skip to content

Commit dad2747

Browse files
committed
Refactor code related to WAL streaming. Move it to stream.c
1 parent 8c0badc commit dad2747

File tree

4 files changed

+380
-313
lines changed

4 files changed

+380
-313
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ OBJS = src/utils/configuration.o src/utils/json.o src/utils/logger.o \
66

77
OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \
88
src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \
9-
src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/util.o \
10-
src/validate.o src/datapagemap.o
9+
src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \
10+
src/util.o src/validate.o src/datapagemap.o
1111

1212
# borrowed files
1313
OBJS += src/pg_crc.o src/receivelog.o src/streamutil.o \

src/backup.c

Lines changed: 3 additions & 311 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#endif
1616
#include "catalog/pg_tablespace.h"
1717
#include "pgtar.h"
18-
#include "receivelog.h"
1918
#include "streamutil.h"
2019

2120
#include <sys/stat.h>
@@ -25,18 +24,6 @@
2524
#include "utils/thread.h"
2625
#include "utils/file.h"
2726

28-
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
29-
static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr;
30-
static XLogRecPtr stop_stream_lsn = InvalidXLogRecPtr;
31-
32-
/*
33-
* How long we should wait for streaming end in seconds.
34-
* Retrieved as checkpoint_timeout + checkpoint_timeout * 0.1
35-
*/
36-
static uint32 stream_stop_timeout = 0;
37-
/* Time in which we started to wait for streaming end */
38-
static time_t stream_stop_begin = 0;
39-
4027
const char *progname = "pg_probackup";
4128

4229
/* list of files contained in backup */
@@ -45,26 +32,6 @@ static parray *backup_files_list = NULL;
4532
/* We need critical section for datapagemap_add() in case of using threads */
4633
static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER;
4734

48-
/*
49-
* We need to wait end of WAL streaming before execute pg_stop_backup().
50-
*/
51-
typedef struct
52-
{
53-
const char *basedir;
54-
PGconn *conn;
55-
56-
/*
57-
* Return value from the thread.
58-
* 0 means there is no error, 1 - there is an error.
59-
*/
60-
int ret;
61-
62-
XLogRecPtr startpos;
63-
TimeLineID starttli;
64-
} StreamThreadArg;
65-
66-
static pthread_t stream_thread;
67-
static StreamThreadArg stream_thread_arg = {"", NULL, 1};
6835

6936
bool exclusive_backup = false;
7037

@@ -86,15 +53,11 @@ static void pg_start_backup(const char *label, bool smooth, pgBackup *backup,
8653
PGNodeInfo *nodeInfo, PGconn *conn);
8754
static void pg_switch_wal(PGconn *conn);
8855
static void pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn, PGNodeInfo *nodeInfo);
89-
static int checkpoint_timeout(PGconn *backup_conn);
9056

9157
static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, TimeLineID tli,
9258
bool in_prev_segment, bool segment_only,
9359
int timeout_elevel, bool in_stream_dir);
9460

95-
static void *StreamLog(void *arg);
96-
static void IdentifySystem(StreamThreadArg *stream_thread_arg);
97-
9861
static void check_external_for_tablespaces(parray *external_list,
9962
PGconn *backup_conn);
10063
static parray *get_database_map(PGconn *pg_startbackup_conn);
@@ -310,33 +273,11 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
310273
/* start stream replication */
311274
if (stream_wal)
312275
{
313-
/* How long we should wait for streaming end after pg_stop_backup */
314-
stream_stop_timeout = checkpoint_timeout(backup_conn);
315-
stream_stop_timeout = stream_stop_timeout + stream_stop_timeout * 0.1;
316-
317276
join_path_components(dst_backup_path, database_path, PG_XLOG_DIR);
318277
fio_mkdir(dst_backup_path, DIR_PERMISSION, FIO_BACKUP_HOST);
319278

320-
stream_thread_arg.basedir = dst_backup_path;
321-
322-
/*
323-
* Connect in replication mode to the server.
324-
*/
325-
stream_thread_arg.conn = pgut_connect_replication(instance_config.conn_opt.pghost,
326-
instance_config.conn_opt.pgport,
327-
instance_config.conn_opt.pgdatabase,
328-
instance_config.conn_opt.pguser);
329-
/* sanity */
330-
IdentifySystem(&stream_thread_arg);
331-
332-
/* By default there are some error */
333-
stream_thread_arg.ret = 1;
334-
/* we must use startpos as start_lsn from start_backup */
335-
stream_thread_arg.startpos = current.start_lsn;
336-
stream_thread_arg.starttli = current.tli;
337-
338-
thread_interrupted = false;
339-
pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg);
279+
start_WAL_streaming(backup_conn, dst_backup_path, &instance_config.conn_opt,
280+
current.start_lsn, current.tli);
340281
}
341282

342283
/* initialize backup list */
@@ -1979,10 +1920,7 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
19791920

19801921
if (stream_wal)
19811922
{
1982-
/* Wait for the completion of stream */
1983-
pthread_join(stream_thread, NULL);
1984-
if (stream_thread_arg.ret == 1)
1985-
elog(ERROR, "WAL streaming failed");
1923+
wait_WAL_streaming_end();
19861924

19871925
pgBackupGetPath2(backup, stream_xlog_path,
19881926
lengthof(stream_xlog_path),
@@ -2009,35 +1947,6 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
20091947
}
20101948
}
20111949

2012-
/*
2013-
* Retrieve checkpoint_timeout GUC value in seconds.
2014-
*/
2015-
static int
2016-
checkpoint_timeout(PGconn *backup_conn)
2017-
{
2018-
PGresult *res;
2019-
const char *val;
2020-
const char *hintmsg;
2021-
int val_int;
2022-
2023-
res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL);
2024-
val = PQgetvalue(res, 0, 0);
2025-
2026-
if (!parse_int(val, &val_int, OPTION_UNIT_S, &hintmsg))
2027-
{
2028-
PQclear(res);
2029-
if (hintmsg)
2030-
elog(ERROR, "Invalid value of checkout_timeout %s: %s", val,
2031-
hintmsg);
2032-
else
2033-
elog(ERROR, "Invalid value of checkout_timeout %s", val);
2034-
}
2035-
2036-
PQclear(res);
2037-
2038-
return val_int;
2039-
}
2040-
20411950
/*
20421951
* Notify end of backup to server when "backup_label" is in the root directory
20431952
* of the DB cluster.
@@ -2391,164 +2300,6 @@ process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
23912300

23922301
}
23932302

2394-
/*
2395-
* Stop WAL streaming if current 'xlogpos' exceeds 'stop_backup_lsn', which is
2396-
* set by pg_stop_backup().
2397-
*
2398-
* TODO: Add streamed file to file list when segment is finished
2399-
*/
2400-
static bool
2401-
stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
2402-
{
2403-
static uint32 prevtimeline = 0;
2404-
static XLogRecPtr prevpos = InvalidXLogRecPtr;
2405-
2406-
/* check for interrupt */
2407-
if (interrupted || thread_interrupted)
2408-
elog(ERROR, "Interrupted during WAL streaming");
2409-
2410-
/* we assume that we get called once at the end of each segment */
2411-
if (segment_finished)
2412-
elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
2413-
(uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
2414-
2415-
/*
2416-
* Note that we report the previous, not current, position here. After a
2417-
* timeline switch, xlogpos points to the beginning of the segment because
2418-
* that's where we always begin streaming. Reporting the end of previous
2419-
* timeline isn't totally accurate, because the next timeline can begin
2420-
* slightly before the end of the WAL that we received on the previous
2421-
* timeline, but it's close enough for reporting purposes.
2422-
*/
2423-
if (prevtimeline != 0 && prevtimeline != timeline)
2424-
elog(LOG, _("switched to timeline %u at %X/%X\n"),
2425-
timeline, (uint32) (prevpos >> 32), (uint32) prevpos);
2426-
2427-
if (!XLogRecPtrIsInvalid(stop_backup_lsn))
2428-
{
2429-
if (xlogpos >= stop_backup_lsn)
2430-
{
2431-
stop_stream_lsn = xlogpos;
2432-
return true;
2433-
}
2434-
2435-
/* pg_stop_backup() was executed, wait for the completion of stream */
2436-
if (stream_stop_begin == 0)
2437-
{
2438-
elog(INFO, "Wait for LSN %X/%X to be streamed",
2439-
(uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn);
2440-
2441-
stream_stop_begin = time(NULL);
2442-
}
2443-
2444-
if (time(NULL) - stream_stop_begin > stream_stop_timeout)
2445-
elog(ERROR, "Target LSN %X/%X could not be streamed in %d seconds",
2446-
(uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn,
2447-
stream_stop_timeout);
2448-
}
2449-
2450-
prevtimeline = timeline;
2451-
prevpos = xlogpos;
2452-
2453-
return false;
2454-
}
2455-
2456-
/*
2457-
* Start the log streaming
2458-
*/
2459-
static void *
2460-
StreamLog(void *arg)
2461-
{
2462-
StreamThreadArg *stream_arg = (StreamThreadArg *) arg;
2463-
2464-
/*
2465-
* Always start streaming at the beginning of a segment
2466-
*/
2467-
stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size;
2468-
2469-
/* Initialize timeout */
2470-
stream_stop_begin = 0;
2471-
2472-
#if PG_VERSION_NUM >= 100000
2473-
/* if slot name was not provided for temp slot, use default slot name */
2474-
if (!replication_slot && temp_slot)
2475-
replication_slot = "pg_probackup_slot";
2476-
#endif
2477-
2478-
2479-
#if PG_VERSION_NUM >= 110000
2480-
/* Create temp repslot */
2481-
if (temp_slot)
2482-
CreateReplicationSlot(stream_arg->conn, replication_slot,
2483-
NULL, temp_slot, true, true, false);
2484-
#endif
2485-
2486-
/*
2487-
* Start the replication
2488-
*/
2489-
elog(LOG, "started streaming WAL at %X/%X (timeline %u)",
2490-
(uint32) (stream_arg->startpos >> 32), (uint32) stream_arg->startpos,
2491-
stream_arg->starttli);
2492-
2493-
#if PG_VERSION_NUM >= 90600
2494-
{
2495-
StreamCtl ctl;
2496-
2497-
MemSet(&ctl, 0, sizeof(ctl));
2498-
2499-
ctl.startpos = stream_arg->startpos;
2500-
ctl.timeline = stream_arg->starttli;
2501-
ctl.sysidentifier = NULL;
2502-
2503-
#if PG_VERSION_NUM >= 100000
2504-
ctl.walmethod = CreateWalDirectoryMethod(
2505-
stream_arg->basedir,
2506-
// (instance_config.compress_alg == NONE_COMPRESS) ? 0 : instance_config.compress_level,
2507-
0,
2508-
true);
2509-
ctl.replication_slot = replication_slot;
2510-
ctl.stop_socket = PGINVALID_SOCKET;
2511-
ctl.do_sync = false; /* We sync all files at the end of backup */
2512-
// ctl.mark_done /* for future use in s3 */
2513-
#if PG_VERSION_NUM >= 100000 && PG_VERSION_NUM < 110000
2514-
ctl.temp_slot = temp_slot;
2515-
#endif
2516-
#else
2517-
ctl.basedir = (char *) stream_arg->basedir;
2518-
#endif
2519-
2520-
ctl.stream_stop = stop_streaming;
2521-
ctl.standby_message_timeout = standby_message_timeout;
2522-
ctl.partial_suffix = NULL;
2523-
ctl.synchronous = false;
2524-
ctl.mark_done = false;
2525-
2526-
if(ReceiveXlogStream(stream_arg->conn, &ctl) == false)
2527-
elog(ERROR, "Problem in receivexlog");
2528-
2529-
#if PG_VERSION_NUM >= 100000
2530-
if (!ctl.walmethod->finish())
2531-
elog(ERROR, "Could not finish writing WAL files: %s",
2532-
strerror(errno));
2533-
#endif
2534-
}
2535-
#else
2536-
if(ReceiveXlogStream(stream_arg->conn, stream_arg->startpos, stream_arg->starttli,
2537-
NULL, (char *) stream_arg->basedir, stop_streaming,
2538-
standby_message_timeout, NULL, false, false) == false)
2539-
elog(ERROR, "Problem in receivexlog");
2540-
#endif
2541-
2542-
elog(LOG, "finished streaming WAL at %X/%X (timeline %u)",
2543-
(uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, stream_arg->starttli);
2544-
stream_arg->ret = 0;
2545-
2546-
PQfinish(stream_arg->conn);
2547-
stream_arg->conn = NULL;
2548-
2549-
return NULL;
2550-
}
2551-
25522303
static void
25532304
check_external_for_tablespaces(parray *external_list, PGconn *backup_conn)
25542305
{
@@ -2613,62 +2364,3 @@ check_external_for_tablespaces(parray *external_list, PGconn *backup_conn)
26132364
}
26142365
}
26152366
}
2616-
2617-
/*
2618-
* Run IDENTIFY_SYSTEM through a given connection and
2619-
* check system identifier and timeline are matching
2620-
*/
2621-
void
2622-
IdentifySystem(StreamThreadArg *stream_thread_arg)
2623-
{
2624-
PGresult *res;
2625-
2626-
uint64 stream_conn_sysidentifier = 0;
2627-
char *stream_conn_sysidentifier_str;
2628-
TimeLineID stream_conn_tli = 0;
2629-
2630-
if (!CheckServerVersionForStreaming(stream_thread_arg->conn))
2631-
{
2632-
PQfinish(stream_thread_arg->conn);
2633-
/*
2634-
* Error message already written in CheckServerVersionForStreaming().
2635-
* There's no hope of recovering from a version mismatch, so don't
2636-
* retry.
2637-
*/
2638-
elog(ERROR, "Cannot continue backup because stream connect has failed.");
2639-
}
2640-
2641-
/*
2642-
* Identify server, obtain server system identifier and timeline
2643-
*/
2644-
res = pgut_execute(stream_thread_arg->conn, "IDENTIFY_SYSTEM", 0, NULL);
2645-
2646-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
2647-
{
2648-
elog(WARNING,"Could not send replication command \"%s\": %s",
2649-
"IDENTIFY_SYSTEM", PQerrorMessage(stream_thread_arg->conn));
2650-
PQfinish(stream_thread_arg->conn);
2651-
elog(ERROR, "Cannot continue backup because stream connect has failed.");
2652-
}
2653-
2654-
stream_conn_sysidentifier_str = PQgetvalue(res, 0, 0);
2655-
stream_conn_tli = atoll(PQgetvalue(res, 0, 1));
2656-
2657-
/* Additional sanity, primary for PG 9.5,
2658-
* where system id can be obtained only via "IDENTIFY SYSTEM"
2659-
*/
2660-
if (!parse_uint64(stream_conn_sysidentifier_str, &stream_conn_sysidentifier, 0))
2661-
elog(ERROR, "%s is not system_identifier", stream_conn_sysidentifier_str);
2662-
2663-
if (stream_conn_sysidentifier != instance_config.system_identifier)
2664-
elog(ERROR, "System identifier mismatch. Connected PostgreSQL instance has system id: "
2665-
"" UINT64_FORMAT ". Expected: " UINT64_FORMAT ".",
2666-
stream_conn_sysidentifier, instance_config.system_identifier);
2667-
2668-
if (stream_conn_tli != current.tli)
2669-
elog(ERROR, "Timeline identifier mismatch. "
2670-
"Connected PostgreSQL instance has timeline id: %X. Expected: %X.",
2671-
stream_conn_tli, current.tli);
2672-
2673-
PQclear(res);
2674-
}

0 commit comments

Comments
 (0)