@@ -86,6 +86,7 @@ static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime
8686
8787typedef struct XLogPageReadPrivate
8888{
89+ int thread_num ;
8990 const char * archivedir ;
9091 TimeLineID tli ;
9192
@@ -106,7 +107,6 @@ typedef struct XLogPageReadPrivate
106107/* An argument for a thread function */
107108typedef struct
108109{
109- int thread_num ;
110110 XLogPageReadPrivate private_data ;
111111
112112 XLogRecPtr startpoint ;
@@ -134,6 +134,55 @@ static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data,
134134static XLogSegNo nextSegNoToRead = 0 ;
135135static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER ;
136136
137+ /*
138+ * Do manual switch to the next WAL segment.
139+ *
140+ * Returns false if the reader reaches the end of a WAL segment list.
141+ */
142+ static bool
143+ switchToNextWal (XLogReaderState * xlogreader , xlog_thread_arg * arg )
144+ {
145+ XLogPageReadPrivate * private_data ;
146+ XLogRecPtr found ;
147+
148+ private_data = (XLogPageReadPrivate * ) xlogreader -> private_data ;
149+ private_data -> need_switch = false;
150+
151+ /* Critical section */
152+ pthread_lock (& wal_segment_mutex );
153+ Assert (nextSegNoToRead );
154+ private_data -> xlogsegno = nextSegNoToRead ;
155+ nextSegNoToRead ++ ;
156+ pthread_mutex_unlock (& wal_segment_mutex );
157+
158+ /* We've reached the end */
159+ if (private_data -> xlogsegno > arg -> endSegNo )
160+ return false;
161+
162+ /* Adjust next record position */
163+ XLogSegNoOffsetToRecPtr (private_data -> xlogsegno , 0 , arg -> startpoint );
164+ /* Skip over the page header and contrecord if any */
165+ found = XLogFindNextRecord (xlogreader , arg -> startpoint );
166+
167+ /*
168+ * We get invalid WAL record pointer usually when WAL segment is
169+ * absent or is corrupted.
170+ */
171+ if (XLogRecPtrIsInvalid (found ))
172+ {
173+ elog (WARNING , "could not read WAL record at %X/%X" ,
174+ (uint32 ) (arg -> startpoint >> 32 ), (uint32 ) (arg -> startpoint ));
175+ PrintXLogCorruptionMsg (private_data , ERROR );
176+ }
177+ arg -> startpoint = found ;
178+
179+ elog (VERBOSE , "Thread %d switched to LSN %X/%X" ,
180+ arg -> thread_num ,
181+ (uint32 ) (arg -> startpoint >> 32 ), (uint32 ) (arg -> startpoint ));
182+
183+ return true;
184+ }
185+
137186/*
138187 * extractPageMap() worker.
139188 */
@@ -150,7 +199,7 @@ doExtractPageMap(void *arg)
150199 private_data = & extract_arg -> private_data ;
151200 xlogreader = XLogReaderAllocate (& SimpleXLogPageRead , private_data );
152201 if (xlogreader == NULL )
153- elog (ERROR , "out of memory" );
202+ elog (ERROR , "Thread [%d]: out of memory" , private_data -> thread_num );
154203 xlogreader -> system_identifier = system_identifier ;
155204
156205 found = XLogFindNextRecord (xlogreader , extract_arg -> startpoint );
@@ -161,15 +210,16 @@ doExtractPageMap(void *arg)
161210 */
162211 if (XLogRecPtrIsInvalid (found ))
163212 {
164- elog (WARNING , "could not read WAL record at %X/%X" ,
213+ elog (WARNING , "Thread [%d]: could not read WAL record at %X/%X" ,
214+ private_data -> thread_num ,
165215 (uint32 ) (extract_arg -> startpoint >> 32 ),
166216 (uint32 ) (extract_arg -> startpoint ));
167217 PrintXLogCorruptionMsg (private_data , ERROR );
168218 }
169219 extract_arg -> startpoint = found ;
170220
171- elog (VERBOSE , "Start LSN of thread %d : %X/%X" ,
172- extract_arg -> thread_num ,
221+ elog (VERBOSE , "Thread [%d]: Starting LSN : %X/%X" ,
222+ private_data -> thread_num ,
173223 (uint32 ) (extract_arg -> startpoint >> 32 ),
174224 (uint32 ) (extract_arg -> startpoint ));
175225
@@ -181,7 +231,18 @@ doExtractPageMap(void *arg)
181231 XLogRecord * record ;
182232
183233 if (interrupted )
184- elog (ERROR , "Interrupted during WAL reading" );
234+ elog (ERROR , "Thread [%d]: Interrupted during WAL reading" ,
235+ private_data -> thread_num );
236+
237+ /*
238+ * We need to switch to the next WAL segment after reading previous
239+ * record. It may happen if we read contrecord.
240+ */
241+ if (private_data -> need_switch )
242+ {
243+ if (!switchToNextWal (xlogreader , extract_arg ))
244+ break ;
245+ }
185246
186247 record = XLogReadRecord (xlogreader , extract_arg -> startpoint , & errormsg );
187248
@@ -190,23 +251,15 @@ doExtractPageMap(void *arg)
190251 XLogRecPtr errptr ;
191252
192253 /*
193- * Try to switch to the next WAL segment. Usually
194- * SimpleXLogPageRead() does it by itself. But here we need to do it
195- * manually to support threads.
254+ * There is no record, try to switch to the next WAL segment.
255+ * Usually SimpleXLogPageRead() does it by itself. But here we need
256+ * to do it manually to support threads.
196257 */
197- if (private_data -> need_switch )
258+ if (private_data -> need_switch && errormsg == NULL )
198259 {
199- private_data -> need_switch = false;
200-
201- /* Critical section */
202- pthread_lock (& wal_segment_mutex );
203- Assert (nextSegNoToRead );
204- private_data -> xlogsegno = nextSegNoToRead ;
205- nextSegNoToRead ++ ;
206- pthread_mutex_unlock (& wal_segment_mutex );
207-
208- /* We reach the end */
209- if (private_data -> xlogsegno > extract_arg -> endSegNo )
260+ if (switchToNextWal (xlogreader , extract_arg ))
261+ continue ;
262+ else
210263 break ;
211264
212265 /* Adjust next record position */
@@ -220,15 +273,16 @@ doExtractPageMap(void *arg)
220273 */
221274 if (XLogRecPtrIsInvalid (found ))
222275 {
223- elog (WARNING , "could not read WAL record at %X/%X" ,
276+ elog (WARNING , "Thread [%d]: could not read WAL record at %X/%X" ,
277+ private_data -> thread_num ,
224278 (uint32 ) (extract_arg -> startpoint >> 32 ),
225279 (uint32 ) (extract_arg -> startpoint ));
226280 PrintXLogCorruptionMsg (private_data , ERROR );
227281 }
228282 extract_arg -> startpoint = found ;
229283
230- elog (VERBOSE , "Thread %d switched to LSN %X/%X" ,
231- extract_arg -> thread_num ,
284+ elog (VERBOSE , "Thread [%d]: switched to LSN %X/%X" ,
285+ private_data -> thread_num ,
232286 (uint32 ) (extract_arg -> startpoint >> 32 ),
233287 (uint32 ) (extract_arg -> startpoint ));
234288
@@ -239,11 +293,13 @@ doExtractPageMap(void *arg)
239293 extract_arg -> startpoint : xlogreader -> EndRecPtr ;
240294
241295 if (errormsg )
242- elog (WARNING , "could not read WAL record at %X/%X: %s" ,
296+ elog (WARNING , "Thread [%d]: could not read WAL record at %X/%X: %s" ,
297+ private_data -> thread_num ,
243298 (uint32 ) (errptr >> 32 ), (uint32 ) (errptr ),
244299 errormsg );
245300 else
246- elog (WARNING , "could not read WAL record at %X/%X" ,
301+ elog (WARNING , "Thread [%d]: could not read WAL record at %X/%X" ,
302+ private_data -> thread_num ,
247303 (uint32 ) (errptr >> 32 ), (uint32 ) (errptr ));
248304
249305 /*
@@ -317,7 +373,7 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
317373 for (i = 0 ; i < num_threads ; i ++ )
318374 {
319375 InitXLogPageRead (& thread_args [i ].private_data , archivedir , tli , false);
320- thread_args [i ].thread_num = i ;
376+ thread_args [i ].private_data . thread_num = i + 1 ;
321377
322378 thread_args [i ].startpoint = startpoint ;
323379 thread_args [i ].endpoint = endpoint ;
@@ -344,7 +400,7 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
344400 /* Run threads */
345401 for (i = 0 ; i < threads_need ; i ++ )
346402 {
347- elog (VERBOSE , "Start WAL reader thread: %d" , i );
403+ elog (VERBOSE , "Start WAL reader thread: %d" , i + 1 );
348404 pthread_create (& threads [i ], NULL , doExtractPageMap , & thread_args [i ]);
349405 }
350406
@@ -736,15 +792,38 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
736792 */
737793 if (!XLByteInSeg (targetPagePtr , private_data -> xlogsegno ))
738794 {
739- CleanupXLogPageRead (xlogreader );
795+ elog (VERBOSE , "Need to switch to segno next to %X/%X, current LSN %X/%X" ,
796+ (uint32 ) (targetPagePtr >> 32 ), (uint32 ) (targetPagePtr ),
797+ (uint32 ) (xlogreader -> currRecPtr >> 32 ),
798+ (uint32 ) (xlogreader -> currRecPtr ));
799+
740800 /*
741- * Do not switch to next WAL segment in this function. Currently it is
742- * manually switched only in doExtractPageMap().
801+ * if the last record on the page is not complete,
802+ * we must continue reading pages in the same thread
743803 */
744- if (private_data -> manual_switch )
804+ if (!XLogRecPtrIsInvalid (xlogreader -> currRecPtr ) &&
805+ xlogreader -> currRecPtr < targetPagePtr )
745806 {
746- private_data -> need_switch = true;
747- return -1 ;
807+ CleanupXLogPageRead (xlogreader );
808+
809+ /*
810+ * Switch to the next WAL segment after reading contrecord.
811+ */
812+ if (private_data -> manual_switch )
813+ private_data -> need_switch = true;
814+ }
815+ else
816+ {
817+ CleanupXLogPageRead (xlogreader );
818+ /*
819+ * Do not switch to next WAL segment in this function. Currently it is
820+ * manually switched only in doExtractPageMap().
821+ */
822+ if (private_data -> manual_switch )
823+ {
824+ private_data -> need_switch = true;
825+ return -1 ;
826+ }
748827 }
749828 }
750829
@@ -761,16 +840,20 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
761840
762841 if (fileExists (private_data -> xlogpath ))
763842 {
764- elog (LOG , "Opening WAL segment \"%s\"" , private_data -> xlogpath );
843+ elog (LOG , "Thread [%d]: Opening WAL segment \"%s\"" ,
844+ private_data -> thread_num ,
845+ private_data -> xlogpath );
765846
766847 private_data -> xlogexists = true;
767848 private_data -> xlogfile = open (private_data -> xlogpath ,
768849 O_RDONLY | PG_BINARY , 0 );
769850
770851 if (private_data -> xlogfile < 0 )
771852 {
772- elog (WARNING , "Could not open WAL segment \"%s\": %s" ,
773- private_data -> xlogpath , strerror (errno ));
853+ elog (WARNING , "Thread [%d]: Could not open WAL segment \"%s\": %s" ,
854+ private_data -> thread_num ,
855+ private_data -> xlogpath ,
856+ strerror (errno ));
774857 return -1 ;
775858 }
776859 }
@@ -783,16 +866,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
783866 private_data -> xlogpath );
784867 if (fileExists (private_data -> gz_xlogpath ))
785868 {
786- elog (LOG , "Opening compressed WAL segment \"%s\"" ,
787- private_data -> gz_xlogpath );
869+ elog (LOG , "Thread [%d]: Opening compressed WAL segment \"%s\"" ,
870+ private_data -> thread_num , private_data -> gz_xlogpath );
788871
789872 private_data -> xlogexists = true;
790873 private_data -> gz_xlogfile = gzopen (private_data -> gz_xlogpath ,
791874 "rb" );
792875 if (private_data -> gz_xlogfile == NULL )
793876 {
794- elog (WARNING , "Could not open compressed WAL segment \"%s\": %s" ,
795- private_data -> gz_xlogpath , strerror (errno ));
877+ elog (WARNING , "Thread [%d]: Could not open compressed WAL segment \"%s\": %s" ,
878+ private_data -> thread_num , private_data -> gz_xlogpath , strerror (errno ));
796879 return -1 ;
797880 }
798881 }
@@ -814,15 +897,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
814897 {
815898 if (lseek (private_data -> xlogfile , (off_t ) targetPageOff , SEEK_SET ) < 0 )
816899 {
817- elog (WARNING , "Could not seek in WAL segment \"%s\": %s" ,
818- private_data -> xlogpath , strerror (errno ));
900+ elog (WARNING , "Thread [%d]: Could not seek in WAL segment \"%s\": %s" ,
901+ private_data -> thread_num , private_data -> xlogpath , strerror (errno ));
819902 return -1 ;
820903 }
821904
822905 if (read (private_data -> xlogfile , readBuf , XLOG_BLCKSZ ) != XLOG_BLCKSZ )
823906 {
824- elog (WARNING , "Could not read from WAL segment \"%s\": %s" ,
825- private_data -> xlogpath , strerror (errno ));
907+ elog (WARNING , "Thread [%d]: Could not read from WAL segment \"%s\": %s" ,
908+ private_data -> thread_num , private_data -> xlogpath , strerror (errno ));
826909 return -1 ;
827910 }
828911 }
@@ -831,15 +914,17 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
831914 {
832915 if (gzseek (private_data -> gz_xlogfile , (z_off_t ) targetPageOff , SEEK_SET ) == -1 )
833916 {
834- elog (WARNING , "Could not seek in compressed WAL segment \"%s\": %s" ,
917+ elog (WARNING , "Thread [%d]: Could not seek in compressed WAL segment \"%s\": %s" ,
918+ private_data -> thread_num ,
835919 private_data -> gz_xlogpath ,
836920 get_gz_error (private_data -> gz_xlogfile ));
837921 return -1 ;
838922 }
839923
840924 if (gzread (private_data -> gz_xlogfile , readBuf , XLOG_BLCKSZ ) != XLOG_BLCKSZ )
841925 {
842- elog (WARNING , "Could not read from compressed WAL segment \"%s\": %s" ,
926+ elog (WARNING , "Thread [%d]: Could not read from compressed WAL segment \"%s\": %s" ,
927+ private_data -> thread_num ,
843928 private_data -> gz_xlogpath ,
844929 get_gz_error (private_data -> gz_xlogfile ));
845930 return -1 ;
@@ -910,15 +995,19 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel)
910995 * We throw a WARNING here to be able to update backup status.
911996 */
912997 if (!private_data -> xlogexists )
913- elog (elevel , "WAL segment \"%s\" is absent" , private_data -> xlogpath );
998+ elog (elevel , "Thread [%d]: WAL segment \"%s\" is absent" ,
999+ private_data -> thread_num ,
1000+ private_data -> xlogpath );
9141001 else if (private_data -> xlogfile != -1 )
915- elog (elevel , "Possible WAL corruption. "
1002+ elog (elevel , "Thread [%d]: Possible WAL corruption. "
9161003 "Error has occured during reading WAL segment \"%s\"" ,
1004+ private_data -> thread_num ,
9171005 private_data -> xlogpath );
9181006#ifdef HAVE_LIBZ
9191007 else if (private_data -> gz_xlogfile != NULL)
920- elog (elevel , "Possible WAL corruption. "
1008+ elog (elevel , "Thread [%d]: Possible WAL corruption. "
9211009 "Error has occured during reading WAL segment \"%s\"" ,
1010+ private_data -> thread_num ,
9221011 private_data -> gz_xlogpath );
9231012#endif
9241013 }
0 commit comments