@@ -414,8 +414,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
414414{
415415 FILE * in = NULL ;
416416 int out = -1 ;
417- char buf [STDIO_BUFSIZE ];
418- // char buf[XLOG_BLCKSZ];
417+ char * buf = pgut_malloc (OUT_BUF_SIZE ); /* 1MB buffer */
419418 char from_fullpath [MAXPGPATH ];
420419 char to_fullpath [MAXPGPATH ];
421420 /* partial handling */
@@ -433,11 +432,14 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
433432 canonicalize_path (to_fullpath );
434433
435434 /* Open source file for read */
436- in = fio_fopen (from_fullpath , PG_BINARY_R , FIO_DB_HOST );
435+ in = fopen (from_fullpath , PG_BINARY_R );
437436 if (in == NULL )
438437 elog (ERROR , "Thread [%d]: Cannot open source file \"%s\": %s" ,
439438 thread_num , from_fullpath , strerror (errno ));
440439
440+ /* disable stdio buffering for input file */
441+ setvbuf (in , NULL , _IONBF , BUFSIZ );
442+
441443 /* open destination partial file for write */
442444 snprintf (to_fullpath_part , sizeof (to_fullpath_part ), "%s.part" , to_fullpath );
443445
@@ -542,14 +544,14 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
542544 pg_crc32 crc32_dst ;
543545
544546 crc32_src = fio_get_crc32 (from_fullpath , FIO_DB_HOST , false);
545- crc32_dst = fio_get_crc32 (to_fullpath , FIO_DB_HOST , false);
547+ crc32_dst = fio_get_crc32 (to_fullpath , FIO_BACKUP_HOST , false);
546548
547549 if (crc32_src == crc32_dst )
548550 {
549551 elog (LOG , "Thread [%d]: WAL file already exists in archive with the same "
550552 "checksum, skip pushing: \"%s\"" , thread_num , from_fullpath );
551553 /* cleanup */
552- fio_fclose (in );
554+ fclose (in );
553555 fio_close (out );
554556 fio_unlink (to_fullpath_part , FIO_BACKUP_HOST );
555557 return 1 ;
@@ -574,11 +576,11 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
574576 /* copy content */
575577 for (;;)
576578 {
577- ssize_t read_len = 0 ;
579+ size_t read_len = 0 ;
578580
579- read_len = fio_fread ( in , buf , sizeof ( buf ) );
581+ read_len = fread ( buf , 1 , OUT_BUF_SIZE , in );
580582
581- if (read_len < 0 )
583+ if (ferror ( in ) )
582584 {
583585 fio_unlink (to_fullpath_part , FIO_BACKUP_HOST );
584586 elog (ERROR , "Thread [%d]: Cannot read source file \"%s\": %s" ,
@@ -595,17 +597,12 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
595597 }
596598 }
597599
598- if (read_len == 0 )
600+ if (feof ( in ) )
599601 break ;
600602 }
601603
602604 /* close source file */
603- if (fio_fclose (in ))
604- {
605- fio_unlink (to_fullpath_part , FIO_BACKUP_HOST );
606- elog (ERROR , "Thread [%d]: Cannot close source WAL file \"%s\": %s" ,
607- thread_num , from_fullpath , strerror (errno ));
608- }
605+ fclose (in );
609606
610607 /* close temp file */
611608 if (fio_close (out ) != 0 )
@@ -636,6 +633,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
636633 thread_num , to_fullpath_part , to_fullpath , strerror (errno ));
637634 }
638635
636+ pg_free (buf );
639637 return 0 ;
640638}
641639
@@ -654,8 +652,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
654652{
655653 FILE * in = NULL ;
656654 gzFile out = NULL ;
657- int errno_temp ;
658- char buf [STDIO_BUFSIZE ];
655+ char * buf = pgut_malloc (OUT_BUF_SIZE );
659656 char from_fullpath [MAXPGPATH ];
660657 char to_fullpath [MAXPGPATH ];
661658 char to_fullpath_gz [MAXPGPATH ];
@@ -681,11 +678,14 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
681678 snprintf (to_fullpath_gz_part , sizeof (to_fullpath_gz_part ), "%s.part" , to_fullpath_gz );
682679
683680 /* Open source file for read */
684- in = fio_fopen (from_fullpath , PG_BINARY_R , FIO_DB_HOST );
681+ in = fopen (from_fullpath , PG_BINARY_R );
685682 if (in == NULL )
686683 elog (ERROR , "Thread [%d]: Cannot open source WAL file \"%s\": %s" ,
687684 thread_num , from_fullpath , strerror (errno ));
688685
686+ /* disable stdio buffering for input file */
687+ setvbuf (in , NULL , _IONBF , BUFSIZ );
688+
689689 /* Grab lock by creating temp file in exclusive mode */
690690 out = fio_gzopen (to_fullpath_gz_part , PG_BINARY_W , compress_level , FIO_BACKUP_HOST );
691691 if (out == NULL )
@@ -787,16 +787,16 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
787787 pg_crc32 crc32_src ;
788788 pg_crc32 crc32_dst ;
789789
790- /* what if one of them goes missing */
790+ /* TODO: what if one of them goes missing? */
791791 crc32_src = fio_get_crc32 (from_fullpath , FIO_DB_HOST , false);
792- crc32_dst = fio_get_crc32 (to_fullpath_gz , FIO_DB_HOST , true);
792+ crc32_dst = fio_get_crc32 (to_fullpath_gz , FIO_BACKUP_HOST , true);
793793
794794 if (crc32_src == crc32_dst )
795795 {
796796 elog (LOG , "Thread [%d]: WAL file already exists in archive with the same "
797797 "checksum, skip pushing: \"%s\"" , thread_num , from_fullpath );
798798 /* cleanup */
799- fio_fclose (in );
799+ fclose (in );
800800 fio_gzclose (out );
801801 fio_unlink (to_fullpath_gz_part , FIO_BACKUP_HOST );
802802 return 1 ;
@@ -811,8 +811,6 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
811811 /* Overwriting is forbidden,
812812 * so we must unlink partial file and exit with error.
813813 */
814- fio_fclose (in );
815- fio_gzclose (out );
816814 fio_unlink (to_fullpath_gz_part , FIO_BACKUP_HOST );
817815 elog (ERROR , "Thread [%d]: WAL file already exists in archive with "
818816 "different checksum: \"%s\"" , thread_num , to_fullpath_gz );
@@ -823,11 +821,11 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
823821 /* copy content */
824822 for (;;)
825823 {
826- ssize_t read_len = 0 ;
824+ size_t read_len = 0 ;
827825
828- read_len = fio_fread ( in , buf , sizeof ( buf ) );
826+ read_len = fread ( buf , 1 , OUT_BUF_SIZE , in );
829827
830- if (read_len < 0 )
828+ if (ferror ( in ) )
831829 {
832830 fio_unlink (to_fullpath_gz_part , FIO_BACKUP_HOST );
833831 elog (ERROR , "Thread [%d]: Cannot read from source file \"%s\": %s" ,
@@ -838,32 +836,25 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
838836 {
839837 if (fio_gzwrite (out , buf , read_len ) != read_len )
840838 {
841- errno_temp = errno ;
842839 fio_unlink (to_fullpath_gz_part , FIO_BACKUP_HOST );
843840 elog (ERROR , "Thread [%d]: Cannot write to compressed temp WAL file \"%s\": %s" ,
844- thread_num , to_fullpath_gz_part , get_gz_error (out , errno_temp ));
841+ thread_num , to_fullpath_gz_part , get_gz_error (out , errno ));
845842 }
846843 }
847844
848- if (read_len == 0 )
845+ if (feof ( in ) )
849846 break ;
850847 }
851848
852849 /* close source file */
853- if (fio_fclose (in ))
854- {
855- fio_unlink (to_fullpath_gz_part , FIO_BACKUP_HOST );
856- elog (ERROR , "Thread [%d]: Cannot close source WAL file \"%s\": %s" ,
857- thread_num , from_fullpath , strerror (errno ));
858- }
850+ fclose (in );
859851
860852 /* close temp file */
861853 if (fio_gzclose (out ) != 0 )
862854 {
863- errno_temp = errno ;
864855 fio_unlink (to_fullpath_gz_part , FIO_BACKUP_HOST );
865856 elog (ERROR , "Thread [%d]: Cannot close compressed temp WAL file \"%s\": %s" ,
866- thread_num , to_fullpath_gz_part , strerror (errno_temp ));
857+ thread_num , to_fullpath_gz_part , strerror (errno ));
867858 }
868859
869860 /* sync temp file to disk */
@@ -887,6 +878,8 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
887878 thread_num , to_fullpath_gz_part , to_fullpath_gz , strerror (errno ));
888879 }
889880
881+ pg_free (buf );
882+
890883 return 0 ;
891884}
892885#endif
@@ -1518,7 +1511,7 @@ get_wal_file(const char *filename, const char *from_fullpath,
15181511}
15191512
15201513/*
1521- * Copy local WAL segment with possible decompression.
1514+ * Copy WAL segment with possible decompression from local archive .
15221515 * Return codes:
15231516 * FILE_MISSING (-1)
15241517 * OPEN_FAILED (-2)
@@ -1608,14 +1601,15 @@ get_wal_file_internal(const char *from_path, const char *to_path, FILE *out,
16081601 {
16091602 read_len = fread (buf , 1 , OUT_BUF_SIZE , in );
16101603
1611- if (read_len < 0 || ferror (in ))
1604+ if (ferror (in ))
16121605 {
16131606 elog (WARNING , "Thread [%d]: Cannot read source WAL file \"%s\": %s" ,
16141607 thread_num , from_path , strerror (errno ));
16151608 exit_code = READ_FAILED ;
16161609 break ;
16171610 }
1618- else if (read_len == 0 && feof (in ))
1611+
1612+ if (read_len == 0 && feof (in ))
16191613 break ;
16201614 }
16211615
0 commit comments