Skip to content

Commit c2e8593

Browse files
committed
Merge branch 'master' into issue_310
2 parents be07ee7 + 1647d11 commit c2e8593

File tree

7 files changed

+123
-35
lines changed

7 files changed

+123
-35
lines changed

src/backup.c

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ static void pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn, PGNode
5656

5757
static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, TimeLineID tli,
5858
bool in_prev_segment, bool segment_only,
59-
int timeout_elevel, bool in_stream_dir);
59+
int timeout_elevel, bool in_stream_dir, pgBackup *backup);
6060

6161
static void check_external_for_tablespaces(parray *external_list,
6262
PGconn *backup_conn);
@@ -270,7 +270,7 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
270270
* Because WAL streaming will start after pg_start_backup() in stream
271271
* mode.
272272
*/
273-
wait_wal_lsn(current.start_lsn, true, current.tli, false, true, ERROR, false);
273+
wait_wal_lsn(current.start_lsn, true, current.tli, false, true, ERROR, false, &current);
274274
}
275275

276276
/* start stream replication */
@@ -281,6 +281,12 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
281281

282282
start_WAL_streaming(backup_conn, dst_backup_path, &instance_config.conn_opt,
283283
current.start_lsn, current.tli);
284+
285+
/* Make sure that WAL streaming is working
286+
* PAGE backup in stream mode is waited twice, first for
287+
* segment in WAL archive and then for streamed segment
288+
*/
289+
wait_wal_lsn(current.start_lsn, true, current.tli, false, true, ERROR, true, &current);
284290
}
285291

286292
/* initialize backup's file list */
@@ -1264,7 +1270,7 @@ pg_is_superuser(PGconn *conn)
12641270
static XLogRecPtr
12651271
wait_wal_lsn(XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli,
12661272
bool in_prev_segment, bool segment_only,
1267-
int timeout_elevel, bool in_stream_dir)
1273+
int timeout_elevel, bool in_stream_dir, pgBackup *backup)
12681274
{
12691275
XLogSegNo targetSegNo;
12701276
char pg_wal_dir[MAXPGPATH];
@@ -1296,15 +1302,14 @@ wait_wal_lsn(XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli,
12961302
*/
12971303
if (in_stream_dir)
12981304
{
1299-
pgBackupGetPath2(&current, pg_wal_dir, lengthof(pg_wal_dir),
1300-
DATABASE_DIR, PG_XLOG_DIR);
1305+
join_path_components(pg_wal_dir, backup->database_dir, PG_XLOG_DIR);
13011306
join_path_components(wal_segment_path, pg_wal_dir, wal_segment);
13021307
wal_segment_dir = pg_wal_dir;
13031308
}
13041309
else
13051310
{
13061311
join_path_components(wal_segment_path, arclog_path, wal_segment);
1307-
wal_segment_dir = arclog_path;
1312+
wal_segment_dir = arclog_path; /* global var */
13081313
}
13091314

13101315
/* TODO: remove this in 3.0 (it is a cludge against some old bug with archive_timeout) */
@@ -1396,7 +1401,7 @@ wait_wal_lsn(XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli,
13961401

13971402
sleep(1);
13981403
if (interrupted)
1399-
elog(ERROR, "Interrupted during waiting for WAL archiving");
1404+
elog(ERROR, "Interrupted during waiting for WAL %s", in_stream_dir ? "streaming" : "archiving");
14001405
try_count++;
14011406

14021407
/* Inform user if WAL segment is absent in first attempt */
@@ -1420,9 +1425,10 @@ wait_wal_lsn(XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli,
14201425
{
14211426
if (file_exists)
14221427
elog(timeout_elevel, "WAL segment %s was %s, "
1423-
"but target LSN %X/%X could not be archived in %d seconds",
1428+
"but target LSN %X/%X could not be %s in %d seconds",
14241429
wal_segment, wal_delivery_str,
1425-
(uint32) (target_lsn >> 32), (uint32) target_lsn, timeout);
1430+
(uint32) (target_lsn >> 32), (uint32) target_lsn,
1431+
wal_delivery_str, timeout);
14261432
/* If WAL segment doesn't exist or we wait for previous segment */
14271433
else
14281434
elog(timeout_elevel,
@@ -1707,7 +1713,7 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
17071713
{
17081714
/* Wait for segment with current stop_lsn, it is ok for it to never arrive */
17091715
wait_wal_lsn(stop_backup_lsn_tmp, false, backup->tli,
1710-
false, true, WARNING, stream_wal);
1716+
false, true, WARNING, stream_wal, backup);
17111717

17121718
/* Get the first record in segment with current stop_lsn */
17131719
lsn_tmp = get_first_record_lsn(xlog_path, segno, backup->tli,
@@ -1735,7 +1741,7 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
17351741
* because previous record can be the contrecord.
17361742
*/
17371743
lsn_tmp = wait_wal_lsn(stop_backup_lsn_tmp, false, backup->tli,
1738-
true, false, ERROR, stream_wal);
1744+
true, false, ERROR, stream_wal, backup);
17391745

17401746
/* sanity */
17411747
if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp))
@@ -1749,7 +1755,7 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
17491755
{
17501756
/* Wait for segment with current stop_lsn */
17511757
wait_wal_lsn(stop_backup_lsn_tmp, false, backup->tli,
1752-
false, true, ERROR, stream_wal);
1758+
false, true, ERROR, stream_wal, backup);
17531759

17541760
/* Get the next closest record in segment with current stop_lsn */
17551761
lsn_tmp = get_next_record_lsn(xlog_path, segno, backup->tli,
@@ -1878,7 +1884,7 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
18781884
*/
18791885
if (!stop_lsn_exists)
18801886
stop_backup_lsn = wait_wal_lsn(stop_backup_lsn_tmp, false, backup->tli,
1881-
false, false, ERROR, stream_wal);
1887+
false, false, ERROR, stream_wal, backup);
18821888

18831889
if (stream_wal)
18841890
{

src/catalog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2457,7 +2457,7 @@ write_backup_filelist(pgBackup *backup, parray *files, const char *root,
24572457
{
24582458
len += sprintf(line+len, ",\"n_headers\":\"%i\"", file->n_headers);
24592459
len += sprintf(line+len, ",\"hdr_crc\":\"%u\"", file->hdr_crc);
2460-
len += sprintf(line+len, ",\"hdr_off\":\"%li\"", file->hdr_off);
2460+
len += sprintf(line+len, ",\"hdr_off\":\"%llu\"", file->hdr_off);
24612461
len += sprintf(line+len, ",\"hdr_size\":\"%i\"", file->hdr_size);
24622462
}
24632463

src/data.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2160,7 +2160,7 @@ get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version, b
21602160

21612161
if (fseek(in, file->hdr_off, SEEK_SET))
21622162
{
2163-
elog(strict ? ERROR : WARNING, "Cannot seek to position %lu in page header map \"%s\": %s",
2163+
elog(strict ? ERROR : WARNING, "Cannot seek to position %llu in page header map \"%s\": %s",
21642164
file->hdr_off, hdr_map->path, strerror(errno));
21652165
goto cleanup;
21662166
}
@@ -2177,7 +2177,7 @@ get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version, b
21772177

21782178
if (fread(zheaders, 1, file->hdr_size, in) != file->hdr_size)
21792179
{
2180-
elog(strict ? ERROR : WARNING, "Cannot read header file at offset: %li len: %i \"%s\": %s",
2180+
elog(strict ? ERROR : WARNING, "Cannot read header file at offset: %llu len: %i \"%s\": %s",
21812181
file->hdr_off, file->hdr_size, hdr_map->path, strerror(errno));
21822182
goto cleanup;
21832183
}
@@ -2208,7 +2208,7 @@ get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version, b
22082208
if (hdr_crc != file->hdr_crc)
22092209
{
22102210
elog(strict ? ERROR : WARNING, "Header map for file \"%s\" crc mismatch \"%s\" "
2211-
"offset: %lu, len: %lu, current: %u, expected: %u",
2211+
"offset: %llu, len: %lu, current: %u, expected: %u",
22122212
file->rel_path, hdr_map->path, file->hdr_off, read_len, hdr_crc, file->hdr_crc);
22132213
goto cleanup;
22142214
}
@@ -2268,7 +2268,7 @@ write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map,
22682268
{
22692269
elog(LOG, "Creating page header map \"%s\"", map_path);
22702270

2271-
hdr_map->fp = fopen(map_path, PG_BINARY_W);
2271+
hdr_map->fp = fopen(map_path, "a");
22722272
if (hdr_map->fp == NULL)
22732273
elog(ERROR, "Cannot open header file \"%s\": %s",
22742274
map_path, strerror(errno));
@@ -2297,7 +2297,7 @@ write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map,
22972297
file->rel_path, z_len);
22982298
}
22992299

2300-
elog(VERBOSE, "Writing headers for file \"%s\" offset: %li, len: %i, crc: %u",
2300+
elog(VERBOSE, "Writing headers for file \"%s\" offset: %llu, len: %i, crc: %u",
23012301
file->rel_path, file->hdr_off, z_len, file->hdr_crc);
23022302

23032303
if (fwrite(zheaders, 1, z_len, hdr_map->fp) != z_len)

src/pg_probackup.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ do { \
208208
FIN_TRADITIONAL_CRC32(crc); \
209209
} while (0)
210210

211+
#define pg_off_t unsigned long long
212+
211213

212214
/* Information about single file (or dir) in backup */
213215
typedef struct pgFile
@@ -249,8 +251,8 @@ typedef struct pgFile
249251
/* Coordinates in header map */
250252
int n_headers; /* number of blocks in the data file in backup */
251253
pg_crc32 hdr_crc; /* CRC value of header file: name_hdr */
252-
off_t hdr_off; /* offset in header map */
253-
int hdr_size; /* offset in header map */
254+
pg_off_t hdr_off; /* offset in header map */
255+
int hdr_size; /* length of headers */
254256
} pgFile;
255257

256258
typedef struct page_map_entry
@@ -406,11 +408,11 @@ typedef struct PGNodeInfo
406408
/* structure used for access to block header map */
407409
typedef struct HeaderMap
408410
{
409-
char path[MAXPGPATH];
410-
char path_tmp[MAXPGPATH]; /* used only in merge */
411-
FILE *fp; /* used only for writing */
412-
char *buf; /* buffer */
413-
off_t offset; /* current position in fp */
411+
char path[MAXPGPATH];
412+
char path_tmp[MAXPGPATH]; /* used only in merge */
413+
FILE *fp; /* used only for writing */
414+
char *buf; /* buffer */
415+
pg_off_t offset; /* current position in fp */
414416
pthread_mutex_t mutex;
415417

416418
} HeaderMap;

src/utils/logger.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ exit_if_necessary(int elevel)
169169
{
170170
/* Interrupt other possible routines */
171171
thread_interrupted = true;
172+
interrupted = true;
172173
#ifdef WIN32
173174
ExitThread(elevel);
174175
#else

tests/backup.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2365,7 +2365,7 @@ def test_parent_choosing_2(self):
23652365
# Clean after yourself
23662366
self.del_test_dir(module_name, fname)
23672367

2368-
@unittest.skip("skip")
2368+
# @unittest.skip("skip")
23692369
def test_backup_with_less_privileges_role(self):
23702370
"""
23712371
check permissions correctness from documentation:
@@ -3079,3 +3079,80 @@ def test_incr_backup_filenode_map(self):
30793079

30803080
# Clean after yourself
30813081
self.del_test_dir(module_name, fname)
3082+
3083+
3084+
3085+
# @unittest.skip("skip")
3086+
def test_missing_wal_segment(self):
3087+
""""""
3088+
fname = self.id().split('.')[3]
3089+
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
3090+
node = self.make_simple_node(
3091+
base_dir=os.path.join(module_name, fname, 'node'),
3092+
set_replication=True,
3093+
ptrack_enable=self.ptrack,
3094+
initdb_params=['--data-checksums'],
3095+
pg_options={'archive_timeout': '30s'})
3096+
3097+
self.init_pb(backup_dir)
3098+
self.add_instance(backup_dir, 'node', node)
3099+
self.set_archiving(backup_dir, 'node', node)
3100+
node.slow_start()
3101+
3102+
node.pgbench_init(scale=10)
3103+
3104+
node.safe_psql(
3105+
'postgres',
3106+
'CREATE DATABASE backupdb')
3107+
3108+
# get segments in pg_wal, sort then and remove all but the latest
3109+
pg_wal_dir = os.path.join(node.data_dir, 'pg_wal')
3110+
3111+
if node.major_version >= 10:
3112+
pg_wal_dir = os.path.join(node.data_dir, 'pg_wal')
3113+
else:
3114+
pg_wal_dir = os.path.join(node.data_dir, 'pg_xlog')
3115+
3116+
# Full backup in streaming mode
3117+
gdb = self.backup_node(
3118+
backup_dir, 'node', node, datname='backupdb',
3119+
options=['--stream', '--log-level-file=INFO'], gdb=True)
3120+
3121+
# break at streaming start
3122+
gdb.set_breakpoint('start_WAL_streaming')
3123+
gdb.run_until_break()
3124+
3125+
# generate some more data
3126+
node.pgbench_init(scale=3)
3127+
3128+
# remove redundant WAL segments in pg_wal
3129+
files = os.listdir(pg_wal_dir)
3130+
files.sort(reverse=True)
3131+
3132+
# leave first two files in list
3133+
del files[:2]
3134+
for filename in files:
3135+
os.remove(os.path.join(pg_wal_dir, filename))
3136+
3137+
gdb.continue_execution_until_exit()
3138+
3139+
self.assertIn(
3140+
'unexpected termination of replication stream: ERROR: requested WAL segment',
3141+
gdb.output)
3142+
3143+
self.assertIn(
3144+
'has already been removed',
3145+
gdb.output)
3146+
3147+
self.assertIn(
3148+
'ERROR: Interrupted during waiting for WAL streaming',
3149+
gdb.output)
3150+
3151+
self.assertIn(
3152+
'WARNING: backup in progress, stop backup',
3153+
gdb.output)
3154+
3155+
# TODO: check the same for PAGE backup
3156+
3157+
# Clean after yourself
3158+
self.del_test_dir(module_name, fname)

tests/helpers/ptrack_helpers.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,7 @@ def __str__(self):
17571757
class GDBobj(ProbackupTest):
17581758
def __init__(self, cmd, verbose, attach=False):
17591759
self.verbose = verbose
1760+
self.output = ''
17601761

17611762
# Check gdb presense
17621763
try:
@@ -1798,10 +1799,8 @@ def __init__(self, cmd, verbose, attach=False):
17981799
)
17991800
self.gdb_pid = self.proc.pid
18001801

1801-
# discard data from pipe,
1802-
# is there a way to do it a less derpy way?
18031802
while True:
1804-
line = self.proc.stdout.readline()
1803+
line = self.get_line()
18051804

18061805
if 'No such process' in line:
18071806
raise GdbException(line)
@@ -1811,6 +1810,11 @@ def __init__(self, cmd, verbose, attach=False):
18111810
else:
18121811
break
18131812

1813+
def get_line(self):
1814+
line = self.proc.stdout.readline()
1815+
self.output += line
1816+
return line
1817+
18141818
def kill(self):
18151819
self.proc.kill()
18161820
self.proc.wait()
@@ -1932,10 +1936,8 @@ def continue_execution_until_break(self, ignore_count=0):
19321936
'Failed to continue execution until break.\n')
19331937

19341938
def stopped_in_breakpoint(self):
1935-
output = []
19361939
while True:
1937-
line = self.proc.stdout.readline()
1938-
output += [line]
1940+
line = self.get_line()
19391941
if self.verbose:
19401942
print(line)
19411943
if line.startswith('*stopped,reason="breakpoint-hit"'):
@@ -1952,7 +1954,7 @@ def _execute(self, cmd, running=True):
19521954

19531955
# look for command we just send
19541956
while True:
1955-
line = self.proc.stdout.readline()
1957+
line = self.get_line()
19561958
if self.verbose:
19571959
print(repr(line))
19581960

@@ -1962,7 +1964,7 @@ def _execute(self, cmd, running=True):
19621964
break
19631965

19641966
while True:
1965-
line = self.proc.stdout.readline()
1967+
line = self.get_line()
19661968
output += [line]
19671969
if self.verbose:
19681970
print(repr(line))

0 commit comments

Comments
 (0)