Skip to content

Commit 1d3c333

Browse files
committed
Refactor code related to WAL streaming. Optimize filelist gathering for streamed xlog files.
Now streaming thread calculates CRC and adds file info to the filelist after each finished segment.
1 parent dad2747 commit 1d3c333

File tree

3 files changed

+49
-56
lines changed

3 files changed

+49
-56
lines changed

src/backup.c

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
270270
pgBackupGetPath(&current, external_prefix, lengthof(external_prefix),
271271
EXTERNAL_DIR);
272272

273+
/* initialize backup's file list */
274+
backup_files_list = parray_new();
275+
273276
/* start stream replication */
274277
if (stream_wal)
275278
{
@@ -280,9 +283,6 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
280283
current.start_lsn, current.tli);
281284
}
282285

283-
/* initialize backup list */
284-
backup_files_list = parray_new();
285-
286286
/* list files with the logical path. omit $PGDATA */
287287
if (fio_is_remote(FIO_DB_HOST))
288288
fio_list_dir(backup_files_list, instance_config.pgdata,
@@ -567,52 +567,11 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
567567
/* close ssh session in main thread */
568568
fio_disconnect();
569569

570-
/* Add archived xlog files into the list of files of this backup */
571-
if (stream_wal)
572-
{
573-
parray *xlog_files_list;
574-
char pg_xlog_path[MAXPGPATH];
575-
char wal_full_path[MAXPGPATH];
576-
577-
/* Scan backup PG_XLOG_DIR */
578-
xlog_files_list = parray_new();
579-
join_path_components(pg_xlog_path, database_path, PG_XLOG_DIR);
580-
dir_list_file(xlog_files_list, pg_xlog_path, false, true, false, false, true, 0,
581-
FIO_BACKUP_HOST);
582-
583-
/* TODO: Drop streamed WAL segments greater than stop_lsn */
584-
for (i = 0; i < parray_num(xlog_files_list); i++)
585-
{
586-
pgFile *file = (pgFile *) parray_get(xlog_files_list, i);
587-
588-
join_path_components(wal_full_path, pg_xlog_path, file->rel_path);
589-
590-
if (!S_ISREG(file->mode))
591-
continue;
592-
593-
file->crc = pgFileGetCRC(wal_full_path, true, false);
594-
file->write_size = file->size;
595-
596-
/* overwrite rel_path, because now it is relative to
597-
* /backup_dir/backups/instance_name/backup_id/database/pg_xlog/
598-
*/
599-
pg_free(file->rel_path);
600-
601-
/* Now it is relative to /backup_dir/backups/instance_name/backup_id/database/ */
602-
file->rel_path = pgut_strdup(GetRelativePath(wal_full_path, database_path));
603-
604-
file->name = last_dir_separator(file->rel_path);
605-
606-
if (file->name == NULL) // TODO: do it in pgFileInit
607-
file->name = file->rel_path;
608-
else
609-
file->name++;
610-
}
570+
/*
571+
* Add archived xlog files into the list of files of this backup
572+
* NOTHING TO DO HERE
573+
*/
611574

612-
/* Add xlog files into the list of backed up files */
613-
parray_concat(backup_files_list, xlog_files_list);
614-
parray_free(xlog_files_list);
615-
}
616575

617576
/* write database map to file and add it to control file */
618577
if (database_map)
@@ -1920,7 +1879,10 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
19201879

19211880
if (stream_wal)
19221881
{
1923-
wait_WAL_streaming_end();
1882+
/* This function will also add list of xlog files
1883+
* to the passed filelist */
1884+
if(wait_WAL_streaming_end(backup_files_list))
1885+
elog(ERROR, "WAL streaming failed");
19241886

19251887
pgBackupGetPath2(backup, stream_xlog_path,
19261888
lengthof(stream_xlog_path),

src/pg_probackup.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,5 @@ extern XLogRecPtr stop_backup_lsn;
11771177
extern void start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path,
11781178
ConnectionOptions *conn_opt,
11791179
XLogRecPtr startpos, TimeLineID starttli);
1180-
extern void wait_WAL_streaming_end(void);
1181-
1180+
extern int wait_WAL_streaming_end(parray *backup_files_list);
11821181
#endif /* PG_PROBACKUP_H */

src/stream.c

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ typedef struct
5757
static pthread_t stream_thread;
5858
static StreamThreadArg stream_thread_arg = {"", NULL, 1};
5959

60+
static parray *xlog_files_list = NULL;
61+
6062
static void IdentifySystem(StreamThreadArg *stream_thread_arg);
6163
static int checkpoint_timeout(PGconn *backup_conn);
6264
static void *StreamLog(void *arg);
@@ -273,10 +275,38 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
273275
/* we assume that we get called once at the end of each segment */
274276
if (segment_finished)
275277
{
278+
XLogSegNo xlog_segno;
279+
char wal_segment_name[MAXPGPATH];
280+
char wal_segment_fullpath[MAXPGPATH];
281+
pgFile *file;
282+
276283
elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
277284
(uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
278285

279-
/* TODO Add streamed file to file list */
286+
/* Add streamed xlog file into the backup's list of files */
287+
if (!xlog_files_list)
288+
xlog_files_list = parray_new();
289+
290+
GetXLogSegNo(xlogpos, xlog_segno, instance_config.xlog_seg_size);
291+
GetXLogFileName(wal_segment_name, timeline, xlog_segno,
292+
instance_config.xlog_seg_size);
293+
294+
join_path_components(wal_segment_fullpath,
295+
stream_thread_arg.basedir, wal_segment_name);
296+
297+
/*
298+
* NOTE We pass wal_segment_name as a relpath, since now we don't have
299+
* any subdirs in wal directory structure
300+
*/
301+
file = pgFileNew(wal_segment_fullpath, wal_segment_name, false, 0,
302+
FIO_BACKUP_HOST);
303+
file->name = file->rel_path;
304+
file->crc = pgFileGetCRC(wal_segment_fullpath, true, false);
305+
306+
/* Should we recheck it using stat? */
307+
file->write_size = instance_config.xlog_seg_size;
308+
file->uncompressed_size = instance_config.xlog_seg_size;
309+
parray_append(xlog_files_list, file);
280310
}
281311

282312
/*
@@ -359,10 +389,12 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption
359389
}
360390

361391
/* Wait for the completion of stream */
362-
void
363-
wait_WAL_streaming_end(void)
392+
int
393+
wait_WAL_streaming_end(parray *backup_files_list)
364394
{
395+
parray_concat(backup_files_list, xlog_files_list);
396+
parray_free(xlog_files_list);
397+
365398
pthread_join(stream_thread, NULL);
366-
if (stream_thread_arg.ret == 1)
367-
elog(ERROR, "WAL streaming failed");
399+
return stream_thread_arg.ret;
368400
}

0 commit comments

Comments
 (0)