Skip to content

Commit 086cd4f

Browse files
committed
Separated data.c => backup_data_page() into 2 functions:
prepare_page() and compress_and_backup_page(). Additionally, fixed formatting in some places. (task pgpro-1726 in jira)
1 parent f98c91b commit 086cd4f

File tree

1 file changed

+114
-89
lines changed

1 file changed

+114
-89
lines changed

src/data.c

Lines changed: 114 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@
2727

2828
#ifdef HAVE_LIBZ
2929
/* Implementation of zlib compression method */
30-
static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
30+
static int32 zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
3131
{
3232
uLongf compressed_size = dst_size;
3333
int rc = compress2(dst, &compressed_size, src, src_size, compress_level);
3434
return rc == Z_OK ? compressed_size : rc;
3535
}
3636

3737
/* Implementation of zlib compression method */
38-
static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size)
38+
static int32 zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size)
3939
{
4040
uLongf dest_len = dst_size;
4141
int rc = uncompress(dst, &dest_len, src, src_size);
@@ -47,7 +47,7 @@ static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_
4747
* Compresses source into dest using algorithm. Returns the number of bytes
4848
* written in the destination buffer, or -1 if compression fails.
4949
*/
50-
static size_t
50+
static int32
5151
do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
5252
{
5353
switch (alg)
@@ -70,7 +70,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, Compre
7070
* Decompresses source into dest using algorithm. Returns the number of bytes
7171
* decompressed in the destination buffer, or -1 if decompression fails.
7272
*/
73-
static size_t
73+
static int32
7474
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
7575
{
7676
switch (alg)
@@ -101,6 +101,7 @@ typedef struct BackupPageHeader
101101

102102
/* Special value for compressed_size field */
103103
#define PageIsTruncated -2
104+
#define SkipCurrentPage -3
104105

105106
/* Verify page's header */
106107
static bool
@@ -134,8 +135,8 @@ static int
134135
read_page_from_file(pgFile *file, BlockNumber blknum,
135136
FILE *in, Page page, XLogRecPtr *page_lsn)
136137
{
137-
off_t offset = blknum*BLCKSZ;
138-
size_t read_len = 0;
138+
off_t offset = blknum * BLCKSZ;
139+
size_t read_len = 0;
139140

140141
/* read the block */
141142
if (fseek(in, offset, SEEK_SET) != 0)
@@ -216,32 +217,29 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
216217
}
217218

218219
/*
219-
* Backup the specified block from a file of a relation.
220-
* Verify page header and checksum of the page and write it
221-
* to the backup file.
220+
* Retrieves a page taking the backup mode into account
221+
* and writes it into argument "page". Argument "page"
222+
* should be a pointer to allocated BLCKSZ of bytes.
223+
*
224+
* Prints appropriate warnings/errors/etc into log.
225+
* Returns 0 if page was successfully retrieved
226+
* SkipCurrentPage(-3) if we need to skip this page
227+
* PageIsTruncated(-2) if the page was truncated
222228
*/
223-
static void
224-
backup_data_page(backup_files_args *arguments,
229+
static int32
230+
prepare_page(backup_files_args *arguments,
225231
pgFile *file, XLogRecPtr prev_backup_start_lsn,
226232
BlockNumber blknum, BlockNumber nblocks,
227-
FILE *in, FILE *out,
228-
pg_crc32 *crc, int *n_skipped,
229-
BackupMode backup_mode)
233+
FILE *in, int *n_skipped,
234+
BackupMode backup_mode,
235+
Page page)
230236
{
231-
BackupPageHeader header;
232-
Page page = malloc(BLCKSZ);
233-
Page compressed_page = NULL;
234-
XLogRecPtr page_lsn = 0;
235-
size_t write_buffer_size;
236-
char write_buffer[BLCKSZ+sizeof(header)];
237-
238-
int try_again = 100;
239-
bool page_is_valid = false;
237+
XLogRecPtr page_lsn = 0;
238+
int try_again = 100;
239+
bool page_is_valid = false;
240+
bool page_is_truncated = false;
240241
BlockNumber absolute_blknum = file->segno * RELSEG_SIZE + blknum;
241242

242-
header.block = blknum;
243-
header.compressed_size = 0;
244-
245243
/* check for interrupt */
246244
if (interrupted)
247245
elog(ERROR, "Interrupted during backup");
@@ -262,7 +260,7 @@ backup_data_page(backup_files_args *arguments,
262260
if (result == 0)
263261
{
264262
/* This block was truncated.*/
265-
header.compressed_size = PageIsTruncated;
263+
page_is_truncated = true;
266264
/* Page is not actually valid, but it is absent
267265
* and we're not going to reread it or validate */
268266
page_is_valid = true;
@@ -295,35 +293,38 @@ backup_data_page(backup_files_args *arguments,
295293
if (backup_mode == BACKUP_MODE_DIFF_PTRACK || (!page_is_valid && is_ptrack_support))
296294
{
297295
size_t page_size = 0;
298-
299-
free(page);
300-
page = NULL;
301-
page = (Page) pg_ptrack_get_block(arguments, file->dbOid, file->tblspcOid,
296+
Page ptrack_page = NULL;
297+
ptrack_page = (Page) pg_ptrack_get_block(arguments, file->dbOid, file->tblspcOid,
302298
file->relOid, absolute_blknum, &page_size);
303299

304-
if (page == NULL)
300+
if (ptrack_page == NULL)
305301
{
306302
/* This block was truncated.*/
307-
header.compressed_size = PageIsTruncated;
303+
page_is_truncated = true;
308304
}
309305
else if (page_size != BLCKSZ)
310306
{
307+
free(ptrack_page);
311308
elog(ERROR, "File: %s, block %u, expected block size %d, but read %lu",
312309
file->path, absolute_blknum, BLCKSZ, page_size);
313310
}
314311
else
315312
{
316313
/*
314+
* We need to copy the page that was successfully
315+
* retreieved from ptrack into our output "page" parameter.
317316
* We must set checksum here, because it is outdated
318317
* in the block recieved from shared buffers.
319318
*/
319+
memcpy(page, ptrack_page, BLCKSZ);
320+
free(ptrack_page);
320321
if (is_checksum_enabled)
321322
((PageHeader) page)->pd_checksum = pg_checksum_page(page, absolute_blknum);
322323
}
323324
/* get lsn from page, provided by pg_ptrack_get_block() */
324325
if (backup_mode == BACKUP_MODE_DIFF_DELTA &&
325326
file->exists_in_prev &&
326-
header.compressed_size != PageIsTruncated &&
327+
!page_is_truncated &&
327328
!parse_page(page, &page_lsn))
328329
elog(ERROR, "Cannot parse page after pg_ptrack_get_block. "
329330
"Possible risk of a memory corruption");
@@ -332,52 +333,70 @@ backup_data_page(backup_files_args *arguments,
332333

333334
if (backup_mode == BACKUP_MODE_DIFF_DELTA &&
334335
file->exists_in_prev &&
335-
header.compressed_size != PageIsTruncated &&
336+
!page_is_truncated &&
336337
page_lsn < prev_backup_start_lsn)
337338
{
338339
elog(VERBOSE, "Skipping blknum: %u in file: %s", blknum, file->path);
339340
(*n_skipped)++;
340-
free(page);
341-
return;
341+
return SkipCurrentPage;
342342
}
343343

344-
if (header.compressed_size != PageIsTruncated)
345-
{
346-
file->read_size += BLCKSZ;
344+
if (page_is_truncated)
345+
return PageIsTruncated;
347346

348-
compressed_page = malloc(BLCKSZ);
349-
header.compressed_size = do_compress(compressed_page, BLCKSZ,
350-
page, BLCKSZ, compress_alg);
347+
return 0;
348+
}
351349

352-
file->compress_alg = compress_alg;
350+
static void
351+
compress_and_backup_page(pgFile *file, BlockNumber blknum,
352+
FILE *in, FILE *out, pg_crc32 *crc,
353+
int page_state, Page page)
354+
{
355+
BackupPageHeader header;
356+
size_t write_buffer_size = sizeof(header);
357+
char write_buffer[BLCKSZ+sizeof(header)];
358+
char compressed_page[BLCKSZ];
353359

354-
Assert (header.compressed_size <= BLCKSZ);
355-
}
360+
if(page_state == SkipCurrentPage)
361+
return;
356362

357-
write_buffer_size = sizeof(header);
363+
header.block = blknum;
364+
header.compressed_size = page_state;
358365

359-
/*
360-
* The page was truncated. Write only header
361-
* to know that we must truncate restored file
362-
*/
363-
if (header.compressed_size == PageIsTruncated)
364-
{
365-
memcpy(write_buffer, &header, sizeof(header));
366-
}
367-
/* The page compression failed. Write it as is. */
368-
else if (header.compressed_size == -1)
366+
if(page_state == PageIsTruncated)
369367
{
370-
header.compressed_size = BLCKSZ;
368+
/*
369+
* The page was truncated. Write only header
370+
* to know that we must truncate restored file
371+
*/
371372
memcpy(write_buffer, &header, sizeof(header));
372-
memcpy(write_buffer + sizeof(header), page, BLCKSZ);
373-
write_buffer_size += header.compressed_size;
374373
}
375-
/* The page was successfully compressed */
376-
else if (header.compressed_size > 0)
374+
else
377375
{
378-
memcpy(write_buffer, &header, sizeof(header));
379-
memcpy(write_buffer + sizeof(header), compressed_page, header.compressed_size);
380-
write_buffer_size += MAXALIGN(header.compressed_size);
376+
/* The page was not truncated, so we need to compress it */
377+
header.compressed_size = do_compress(compressed_page, BLCKSZ,
378+
page, BLCKSZ, compress_alg);
379+
380+
file->compress_alg = compress_alg;
381+
file->read_size += BLCKSZ;
382+
Assert (header.compressed_size <= BLCKSZ);
383+
384+
/* The page was successfully compressed. */
385+
if (header.compressed_size > 0)
386+
{
387+
memcpy(write_buffer, &header, sizeof(header));
388+
memcpy(write_buffer + sizeof(header),
389+
compressed_page, header.compressed_size);
390+
write_buffer_size += MAXALIGN(header.compressed_size);
391+
}
392+
/* Nonpositive value means that compression failed. Write it as is. */
393+
else
394+
{
395+
header.compressed_size = BLCKSZ;
396+
memcpy(write_buffer, &header, sizeof(header));
397+
memcpy(write_buffer + sizeof(header), page, BLCKSZ);
398+
write_buffer_size += header.compressed_size;
399+
}
381400
}
382401

383402
/* elog(VERBOSE, "backup blkno %u, compressed_size %d write_buffer_size %ld",
@@ -389,19 +408,14 @@ backup_data_page(backup_files_args *arguments,
389408
/* write data page */
390409
if(fwrite(write_buffer, 1, write_buffer_size, out) != write_buffer_size)
391410
{
392-
int errno_tmp = errno;
411+
int errno_tmp = errno;
393412
fclose(in);
394413
fclose(out);
395414
elog(ERROR, "File: %s, cannot write backup at block %u : %s",
396415
file->path, blknum, strerror(errno_tmp));
397416
}
398417

399418
file->write_size += write_buffer_size;
400-
401-
if (page != NULL)
402-
free(page);
403-
if (compressed_page != NULL)
404-
free(compressed_page);
405419
}
406420

407421
/*
@@ -418,13 +432,15 @@ backup_data_file(backup_files_args* arguments,
418432
pgFile *file, XLogRecPtr prev_backup_start_lsn,
419433
BackupMode backup_mode)
420434
{
421-
char to_path[MAXPGPATH];
422-
FILE *in;
423-
FILE *out;
424-
BlockNumber blknum = 0;
425-
BlockNumber nblocks = 0;
426-
int n_blocks_skipped = 0;
427-
int n_blocks_read = 0;
435+
char to_path[MAXPGPATH];
436+
FILE *in;
437+
FILE *out;
438+
BlockNumber blknum = 0;
439+
BlockNumber nblocks = 0;
440+
int n_blocks_skipped = 0;
441+
int n_blocks_read = 0;
442+
int page_state;
443+
char curr_page[BLCKSZ];
428444

429445
/*
430446
* Skip unchanged file only if it exists in previous backup.
@@ -503,9 +519,11 @@ backup_data_file(backup_files_args* arguments,
503519
{
504520
for (blknum = 0; blknum < nblocks; blknum++)
505521
{
506-
backup_data_page(arguments, file, prev_backup_start_lsn, blknum,
507-
nblocks, in, out, &(file->crc),
508-
&n_blocks_skipped, backup_mode);
522+
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
523+
blknum, nblocks, in, &n_blocks_skipped,
524+
backup_mode, curr_page);
525+
compress_and_backup_page(file, blknum, in, out, &(file->crc),
526+
page_state, curr_page);
509527
n_blocks_read++;
510528
}
511529
if (backup_mode == BACKUP_MODE_DIFF_DELTA)
@@ -518,9 +536,11 @@ backup_data_file(backup_files_args* arguments,
518536
iter = datapagemap_iterate(&file->pagemap);
519537
while (datapagemap_next(iter, &blknum))
520538
{
521-
backup_data_page(arguments, file, prev_backup_start_lsn, blknum,
522-
nblocks, in, out, &(file->crc),
523-
&n_blocks_skipped, backup_mode);
539+
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
540+
blknum, nblocks, in, &n_blocks_skipped,
541+
backup_mode, curr_page);
542+
compress_and_backup_page(file, blknum, in, out, &(file->crc),
543+
page_state, curr_page);
524544
n_blocks_read++;
525545
}
526546

@@ -635,7 +655,8 @@ restore_data_file(const char *from_root,
635655
}
636656

637657
if (header.block < blknum)
638-
elog(ERROR, "backup is broken at file->path %s block %u",file->path, blknum);
658+
elog(ERROR, "backup is broken at file->path %s block %u",
659+
file->path, blknum);
639660

640661
if (header.compressed_size == PageIsTruncated)
641662
{
@@ -646,7 +667,8 @@ restore_data_file(const char *from_root,
646667
if (ftruncate(fileno(out), header.block * BLCKSZ) != 0)
647668
elog(ERROR, "cannot truncate \"%s\": %s",
648669
file->path, strerror(errno));
649-
elog(VERBOSE, "truncate file %s to block %u", file->path, header.block);
670+
elog(VERBOSE, "truncate file %s to block %u",
671+
file->path, header.block);
650672
break;
651673
}
652674

@@ -664,10 +686,12 @@ restore_data_file(const char *from_root,
664686

665687
uncompressed_size = do_decompress(page.data, BLCKSZ,
666688
compressed_page.data,
667-
header.compressed_size, file->compress_alg);
689+
header.compressed_size,
690+
file->compress_alg);
668691

669692
if (uncompressed_size != BLCKSZ)
670-
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", uncompressed_size);
693+
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ",
694+
uncompressed_size);
671695
}
672696

673697
/*
@@ -714,7 +738,8 @@ restore_data_file(const char *from_root,
714738
if (ftruncate(fileno(out), file->n_blocks * BLCKSZ) != 0)
715739
elog(ERROR, "cannot truncate \"%s\": %s",
716740
file->path, strerror(errno));
717-
elog(INFO, "Delta truncate file %s to block %u", file->path, file->n_blocks);
741+
elog(INFO, "Delta truncate file %s to block %u",
742+
file->path, file->n_blocks);
718743
}
719744
}
720745

0 commit comments

Comments
 (0)