Skip to content

Commit d3babee

Browse files
committed
[PBCKP-287] simplify and fix cfs chaining
- properly chain only main fork and cfs related forks - properly chain datafile with its cfm even sort order places other segments between - don't raise error for system tables which has no companion cfm. + fix couple of tests
1 parent 2f27142 commit d3babee

File tree

3 files changed

+90
-89
lines changed

3 files changed

+90
-89
lines changed

src/backup.c

Lines changed: 74 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ static bool pg_is_in_recovery(PGconn *conn);
6565
static bool pg_is_superuser(PGconn *conn);
6666
static void check_server_version(PGconn *conn, PGNodeInfo *nodeInfo);
6767
static void confirm_block_size(PGconn *conn, const char *name, int blcksz);
68-
static size_t rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, size_t i);
69-
static void group_cfs_segments(parray *files, size_t first, size_t last);
68+
static void rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, size_t i);
7069
static bool remove_excluded_files_criterion(void *value, void *exclude_args);
7170
static void backup_cfs_segment(int i, pgFile *file, backup_files_arg *arguments);
7271
static void process_file(int i, pgFile *file, backup_files_arg *arguments);
@@ -2228,7 +2227,11 @@ backup_cfs_segment(int i, pgFile *file, backup_files_arg *arguments) {
22282227
cfm_bck_file = data_file;
22292228
}
22302229
data_file = file;
2231-
Assert(cfm_file); /* ensure we always have cfm exist */
2230+
if (data_file->relOid >= FirstNormalObjectId && cfm_file == NULL)
2231+
{
2232+
elog(ERROR, "'CFS' file '%s' have to have '%s.cfm' companion file",
2233+
data_file->rel_path, data_file->name);
2234+
}
22322235

22332236
elog(LOG, "backup CFS segment %s, data_file=%s, cfm_file=%s, data_bck_file=%s, cfm_bck_file=%s",
22342237
data_file->name, data_file->name, cfm_file->name, data_bck_file == NULL? "NULL": data_bck_file->name, cfm_bck_file == NULL? "NULL": cfm_bck_file->name);
@@ -2251,7 +2254,10 @@ backup_cfs_segment(int i, pgFile *file, backup_files_arg *arguments) {
22512254
process_file(i, cfm_bck_file, arguments);
22522255

22532256
/* storing cfs segment in order cfm_file -> datafile to guarantee their consistency */
2254-
process_file(i, cfm_file, arguments);
2257+
/* cfm_file could be NULL for system tables. But we don't clear is_cfs flag
2258+
* for compatibility with older pg_probackup. */
2259+
if (cfm_file)
2260+
process_file(i, cfm_file, arguments);
22552261
process_file(i, data_file, arguments);
22562262
elog(LOG, "Backup CFS segment %s done", data_file->name);
22572263
}
@@ -2299,9 +2305,7 @@ parse_filelist_filenames(parray *files, const char *root)
22992305
strncmp(tmp_rel_path, TABLESPACE_VERSION_DIRECTORY,
23002306
strlen(TABLESPACE_VERSION_DIRECTORY)) == 0) {
23012307
/* rewind index to the beginning of cfs tablespace */
2302-
size_t start = rewind_and_mark_cfs_datafiles(files, root, file->rel_path, i);
2303-
/* group every to cfs segments chains */
2304-
group_cfs_segments(files, start, i);
2308+
rewind_and_mark_cfs_datafiles(files, root, file->rel_path, i);
23052309
}
23062310
}
23072311
}
@@ -2349,57 +2353,11 @@ remove_excluded_files_criterion(void *value, void *exclude_args) {
23492353
return file->remove_from_list;
23502354
}
23512355

2352-
/*
2353-
* For every cfs segment do group its files to linked list, datafile on the head.
2354-
* All non data files of segment moved to linked list and marked to skip in backup processing threads.
2355-
* @param first - first index of cfs tablespace files
2356-
* @param last - last index of cfs tablespace files
2357-
*/
2358-
void group_cfs_segments(parray *files, size_t first, size_t last) {/* grouping cfs files by relOid.segno, removing leafs of group */
2359-
2360-
for (;first <= last; first++)
2361-
{
2362-
pgFile *file = parray_get(files, first);
2363-
2364-
if (file->is_cfs)
2365-
{
2366-
pgFile *cfs_file = file;
2367-
size_t counter = first + 1;
2368-
pgFile *chain_file = parray_get(files, counter);
2369-
2370-
bool has_cfm = false; /* flag for later assertion the cfm file also exist */
2371-
2372-
elog(LOG, "Preprocessing cfs file %s, %u.%d", cfs_file->name, cfs_file->relOid, cfs_file->segno);
2373-
2374-
elog(LOG, "Checking file %s, %u.%d as cfs chain", chain_file->name, chain_file->relOid, chain_file->segno);
2375-
2376-
/* scanning cfs segment files */
2377-
while (cfs_file->relOid == chain_file->relOid &&
2378-
cfs_file->segno == chain_file->segno)
2379-
{
2380-
elog(LOG, "Grouping cfs chain file %s, %d.%d", chain_file->name, chain_file->relOid, chain_file->segno);
2381-
chain_file->skip_cfs_nested = true;
2382-
cfs_file->cfs_chain = chain_file; /* adding to cfs group */
2383-
cfs_file = chain_file;
2384-
2385-
/* next file */
2386-
counter++;
2387-
chain_file = parray_get(files, counter);
2388-
elog(LOG, "Checking file %s, %u.%d as cfs chain", chain_file->name, chain_file->relOid, chain_file->segno);
2389-
}
2390-
2391-
/* assertion - we always have cfs data + cfs map files */
2392-
cfs_file = file;
2393-
for (; cfs_file; cfs_file = cfs_file->cfs_chain) {
2394-
elog(LOG, "searching cfm in %s, chain is %s", cfs_file->name, cfs_file->cfs_chain == NULL? "NULL": cfs_file->cfs_chain->name);
2395-
has_cfm = cfs_file->forkName == cfm;
2396-
}
2397-
Assert(has_cfm);
2398-
2399-
/* shifting to last cfs segment file */
2400-
first = counter-1;
2401-
}
2402-
}
2356+
static uint32_t
2357+
hash_rel_seg(pgFile* file)
2358+
{
2359+
uint32 hash = hash_mix32_2(file->relOid, file->segno);
2360+
return hash_mix32_2(hash, 0xcf5);
24032361
}
24042362

24052363
/* If file is equal to pg_compression, then we consider this tablespace as
@@ -2416,13 +2374,24 @@ void group_cfs_segments(parray *files, size_t first, size_t last) {/* grouping c
24162374
*
24172375
* @returns index of first tablespace entry, i.e tblspcOid/TABLESPACE_VERSION_DIRECTORY
24182376
*/
2419-
static size_t
2377+
static void
24202378
rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, size_t i)
24212379
{
24222380
int len;
24232381
int p;
2382+
int j;
24242383
pgFile *prev_file;
2384+
pgFile *tmp_file;
24252385
char *cfs_tblspc_path;
2386+
uint32_t h;
2387+
2388+
/* hash table for cfm files */
2389+
#define HASHN 128
2390+
parray *hashtab[HASHN] = {NULL};
2391+
parray *bucket;
2392+
for (p = 0; p < HASHN; p++)
2393+
hashtab[p] = parray_new();
2394+
24262395

24272396
cfs_tblspc_path = strdup(relative);
24282397
if(!cfs_tblspc_path)
@@ -2437,23 +2406,60 @@ rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, s
24372406

24382407
elog(LOG, "Checking file in cfs tablespace %s", prev_file->rel_path);
24392408

2440-
if (strstr(prev_file->rel_path, cfs_tblspc_path) != NULL)
2409+
if (strstr(prev_file->rel_path, cfs_tblspc_path) == NULL)
2410+
{
2411+
elog(LOG, "Breaking on %s", prev_file->rel_path);
2412+
break;
2413+
}
2414+
2415+
if (!S_ISREG(prev_file->mode))
2416+
continue;
2417+
2418+
h = hash_rel_seg(prev_file);
2419+
bucket = hashtab[h % HASHN];
2420+
2421+
if (prev_file->forkName == cfm || prev_file->forkName == cfm_bck ||
2422+
prev_file->forkName == cfs_bck)
24412423
{
2442-
if (S_ISREG(prev_file->mode) && prev_file->is_datafile)
2424+
parray_append(bucket, prev_file);
2425+
}
2426+
else if (prev_file->is_datafile && prev_file->forkName == none)
2427+
{
2428+
elog(LOG, "Processing 'cfs' file %s", prev_file->rel_path);
2429+
/* have to mark as is_cfs even for system-tables for compatibility
2430+
* with older pg_probackup */
2431+
prev_file->is_cfs = true;
2432+
prev_file->cfs_chain = NULL;
2433+
for (j = 0; j < parray_num(bucket); j++)
24432434
{
2444-
elog(LOG, "Setting 'is_cfs' on file %s, name %s",
2445-
prev_file->rel_path, prev_file->name);
2446-
prev_file->is_cfs = true;
2435+
tmp_file = parray_get(bucket, j);
2436+
elog(LOG, "Linking 'cfs' file '%s' to '%s'",
2437+
tmp_file->rel_path, prev_file->rel_path);
2438+
if (tmp_file->relOid == prev_file->relOid &&
2439+
tmp_file->segno == prev_file->segno)
2440+
{
2441+
tmp_file->cfs_chain = prev_file->cfs_chain;
2442+
prev_file->cfs_chain = tmp_file;
2443+
parray_remove(bucket, j);
2444+
j--;
2445+
}
24472446
}
24482447
}
2449-
else
2448+
}
2449+
2450+
for (p = 0; p < HASHN; p++)
2451+
{
2452+
bucket = hashtab[p];
2453+
for (j = 0; j < parray_num(bucket); j++)
24502454
{
2451-
elog(LOG, "Breaking on %s", prev_file->rel_path);
2452-
break;
2455+
tmp_file = parray_get(bucket, j);
2456+
elog(WARNING, "Orphaned cfs related file '%s'", tmp_file->rel_path);
24532457
}
2458+
parray_free(bucket);
2459+
hashtab[p] = NULL;
24542460
}
2461+
#undef HASHN
24552462
free(cfs_tblspc_path);
2456-
return p+1;
24572463
}
24582464

24592465
/*

src/utils/pgut.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,14 @@ extern int usleep(unsigned int usec);
115115
#define ARG_SIZE_HINT static
116116
#endif
117117

118+
static inline uint32_t hash_mix32_2(uint32_t a, uint32_t b)
119+
{
120+
b ^= (a<<7)|(a>>25);
121+
a *= 0xdeadbeef;
122+
b *= 0xcafeabed;
123+
a ^= a >> 16;
124+
b ^= b >> 15;
125+
return a^b;
126+
}
127+
118128
#endif /* PGUT_H */

tests/cfs_backup_test.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -431,16 +431,10 @@ def test_page_doesnt_store_unchanged_cfm(self):
431431
"FROM generate_series(0,256) i".format('t1', tblspace_name)
432432
)
433433

434-
try:
435-
backup_id_full = self.backup_node(
436-
self.backup_dir, 'node', self.node, backup_type='full')
437-
except ProbackupException as e:
438-
self.fail(
439-
"ERROR: Full backup failed.\n {0} \n {1}".format(
440-
repr(self.cmd),
441-
repr(e.message)
442-
)
443-
)
434+
self.node.safe_psql("postgres", "checkpoint")
435+
436+
backup_id_full = self.backup_node(
437+
self.backup_dir, 'node', self.node, backup_type='full')
444438

445439
self.assertTrue(
446440
find_by_extensions(
@@ -449,16 +443,8 @@ def test_page_doesnt_store_unchanged_cfm(self):
449443
"ERROR: .cfm files not found in backup dir"
450444
)
451445

452-
try:
453-
backup_id = self.backup_node(
454-
self.backup_dir, 'node', self.node, backup_type='page')
455-
except ProbackupException as e:
456-
self.fail(
457-
"ERROR: Incremental backup failed.\n {0} \n {1}".format(
458-
repr(self.cmd),
459-
repr(e.message)
460-
)
461-
)
446+
backup_id = self.backup_node(
447+
self.backup_dir, 'node', self.node, backup_type='page')
462448

463449
show_backup = self.show_pb(self.backup_dir, 'node', backup_id)
464450
self.assertEqual(
@@ -1046,7 +1032,6 @@ def test_fullbackup_after_create_table_page_after_create_table_stream(self):
10461032
)
10471033

10481034
# --- Make backup with not valid data(broken .cfm) --- #
1049-
@unittest.expectedFailure
10501035
# @unittest.skip("skip")
10511036
@unittest.skipUnless(ProbackupTest.enterprise, 'skip')
10521037
def test_delete_random_cfm_file_from_tablespace_dir(self):

0 commit comments

Comments
 (0)