10
10
#include "pg_probackup.h"
11
11
#include "receivelog.h"
12
12
#include "streamutil.h"
13
+ #include "access/timeline.h"
13
14
14
15
#include <time.h>
15
16
#include <unistd.h>
@@ -69,6 +70,7 @@ static void add_walsegment_to_filelist(parray *filelist, uint32 timeline,
69
70
uint32 xlog_seg_size );
70
71
static void add_history_file_to_filelist (parray * filelist , uint32 timeline ,
71
72
char * basedir );
73
+ static parray * parse_tli_history_buffer (char * history , TimeLineID tli );
72
74
73
75
/*
74
76
* Run IDENTIFY_SYSTEM through a given connection and
@@ -353,6 +355,204 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
353
355
354
356
/* --- External API --- */
355
357
358
+ /*
359
+ * Maybe add a StreamOptions struct ?
360
+ * Backup conn only needed to calculate stream_stop_timeout. Think about refactoring it.
361
+ */
362
+ parray *
363
+ get_history_streaming (ConnectionOptions * conn_opt , TimeLineID tli , parray * backup_list )
364
+ {
365
+ PGresult * res ;
366
+ PGconn * conn ;
367
+ char * history ;
368
+ char query [128 ];
369
+ parray * result = NULL ;
370
+ parray * tli_list = NULL ;
371
+ timelineInfo * tlinfo = NULL ;
372
+ int i ,j ;
373
+
374
+ snprintf (query , sizeof (query ), "TIMELINE_HISTORY %u" , tli );
375
+
376
+ /*
377
+ * Connect in replication mode to the server.
378
+ */
379
+ conn = pgut_connect_replication (conn_opt -> pghost ,
380
+ conn_opt -> pgport ,
381
+ conn_opt -> pgdatabase ,
382
+ conn_opt -> pguser ,
383
+ false);
384
+
385
+ if (!conn )
386
+ return NULL ;
387
+
388
+ res = PQexec (conn , query );
389
+ PQfinish (conn );
390
+
391
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
392
+ {
393
+ elog (WARNING , "Could not send replication command \"%s\": %s" ,
394
+ query , PQresultErrorMessage (res ));
395
+ PQclear (res );
396
+ return NULL ;
397
+ }
398
+
399
+ /*
400
+ * The response to TIMELINE_HISTORY is a single row result set
401
+ * with two fields: filename and content
402
+ */
403
+
404
+ if (PQnfields (res ) != 2 || PQntuples (res ) != 1 )
405
+ {
406
+ elog (WARNING , "Unexpected response to TIMELINE_HISTORY command: "
407
+ "got %d rows and %d fields, expected %d rows and %d fields" ,
408
+ PQntuples (res ), PQnfields (res ), 1 , 2 );
409
+ PQclear (res );
410
+ return NULL ;
411
+ }
412
+
413
+ history = pgut_strdup (PQgetvalue (res , 0 , 1 ));
414
+ result = parse_tli_history_buffer (history , tli );
415
+
416
+ /* some cleanup */
417
+ pg_free (history );
418
+ PQclear (res );
419
+
420
+ if (result )
421
+ tlinfo = timelineInfoNew (tli );
422
+ else
423
+ return NULL ;
424
+
425
+ /* transform TimeLineHistoryEntry into timelineInfo */
426
+ for (i = parray_num (result ) - 1 ; i >= 0 ; i -- )
427
+ {
428
+ TimeLineHistoryEntry * tln = (TimeLineHistoryEntry * ) parray_get (result , i );
429
+
430
+ tlinfo -> parent_tli = tln -> tli ;
431
+ tlinfo -> switchpoint = tln -> end ;
432
+
433
+ if (!tli_list )
434
+ tli_list = parray_new ();
435
+
436
+ parray_append (tli_list , tlinfo );
437
+
438
+ /* Next tli */
439
+ tlinfo = timelineInfoNew (tln -> tli );
440
+
441
+ /* oldest tli */
442
+ if (i == 0 )
443
+ {
444
+ tlinfo -> tli = tln -> tli ;
445
+ tlinfo -> parent_tli = 0 ;
446
+ tlinfo -> switchpoint = 0 ;
447
+ parray_append (tli_list , tlinfo );
448
+ }
449
+ }
450
+
451
+ /* link parent to child */
452
+ for (i = 0 ; i < parray_num (tli_list ); i ++ )
453
+ {
454
+ timelineInfo * tlinfo = (timelineInfo * ) parray_get (tli_list , i );
455
+
456
+ for (j = 0 ; j < parray_num (tli_list ); j ++ )
457
+ {
458
+ timelineInfo * tlinfo_parent = (timelineInfo * ) parray_get (tli_list , j );
459
+
460
+ if (tlinfo -> parent_tli == tlinfo_parent -> tli )
461
+ {
462
+ tlinfo -> parent_link = tlinfo_parent ;
463
+ break ;
464
+ }
465
+ }
466
+ }
467
+
468
+ /* add backups to each timeline info */
469
+ for (i = 0 ; i < parray_num (tli_list ); i ++ )
470
+ {
471
+ timelineInfo * tlinfo = parray_get (tli_list , i );
472
+ for (j = 0 ; j < parray_num (backup_list ); j ++ )
473
+ {
474
+ pgBackup * backup = parray_get (backup_list , j );
475
+ if (tlinfo -> tli == backup -> tli )
476
+ {
477
+ if (tlinfo -> backups == NULL )
478
+ tlinfo -> backups = parray_new ();
479
+ parray_append (tlinfo -> backups , backup );
480
+ }
481
+ }
482
+ }
483
+
484
+ /* cleanup */
485
+ parray_walk (result , pg_free );
486
+ pg_free (result );
487
+
488
+ return tli_list ;
489
+ }
490
+
491
+ parray *
492
+ parse_tli_history_buffer (char * history , TimeLineID tli )
493
+ {
494
+ char * curLine = history ;
495
+ TimeLineHistoryEntry * entry ;
496
+ TimeLineHistoryEntry * last_timeline = NULL ;
497
+ parray * result = NULL ;
498
+
499
+ /* Parse timeline history buffer string by string */
500
+ while (curLine )
501
+ {
502
+ char tempStr [1024 ];
503
+ char * nextLine = strchr (curLine , '\n' );
504
+ int curLineLen = nextLine ? (nextLine - curLine ) : strlen (curLine );
505
+
506
+ memcpy (tempStr , curLine , curLineLen );
507
+ tempStr [curLineLen ] = '\0' ; // NUL-terminate!
508
+ curLine = nextLine ? (nextLine + 1 ) : NULL ;
509
+
510
+ if (curLineLen > 0 )
511
+ {
512
+ char * ptr ;
513
+ TimeLineID tli ;
514
+ uint32 switchpoint_hi ;
515
+ uint32 switchpoint_lo ;
516
+ int nfields ;
517
+
518
+ for (ptr = tempStr ; * ptr ; ptr ++ )
519
+ {
520
+ if (!isspace ((unsigned char ) * ptr ))
521
+ break ;
522
+ }
523
+ if (* ptr == '\0' || * ptr == '#' )
524
+ continue ;
525
+
526
+ nfields = sscanf (tempStr , "%u\t%X/%X" , & tli , & switchpoint_hi , & switchpoint_lo );
527
+
528
+ if (nfields < 1 )
529
+ {
530
+ /* expect a numeric timeline ID as first field of line */
531
+ elog (ERROR , "Syntax error in timeline history: \"%s\". Expected a numeric timeline ID." , tempStr );
532
+ }
533
+ if (nfields != 3 )
534
+ elog (ERROR , "Syntax error in timeline history: \"%s\". Expected a transaction log switchpoint location." , tempStr );
535
+
536
+ if (last_timeline && tli <= last_timeline -> tli )
537
+ elog (ERROR , "Timeline IDs must be in increasing sequence: \"%s\"" , tempStr );
538
+
539
+ entry = pgut_new (TimeLineHistoryEntry );
540
+ entry -> tli = tli ;
541
+ entry -> end = ((uint64 ) switchpoint_hi << 32 ) | switchpoint_lo ;
542
+
543
+ last_timeline = entry ;
544
+ /* Build list with newest item first */
545
+ if (!result )
546
+ result = parray_new ();
547
+ parray_append (result , entry );
548
+
549
+ /* we ignore the remainder of each line */
550
+ }
551
+ }
552
+
553
+ return result ;
554
+ }
555
+
356
556
/*
357
557
* Maybe add a StreamOptions struct ?
358
558
* Backup conn only needed to calculate stream_stop_timeout. Think about refactoring it.
@@ -374,7 +574,8 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption
374
574
stream_thread_arg .conn = pgut_connect_replication (conn_opt -> pghost ,
375
575
conn_opt -> pgport ,
376
576
conn_opt -> pgdatabase ,
377
- conn_opt -> pguser );
577
+ conn_opt -> pguser ,
578
+ true);
378
579
/* sanity check*/
379
580
IdentifySystem (& stream_thread_arg );
380
581
0 commit comments