@@ -574,17 +574,20 @@ namespace
574574 return true ;
575575 }
576576
577+ enum ActionType { REPLICATE, REPLAY, FAST_FORWARD };
578+
577579 void replicate (Target* target,
578580 TransactionList& transactions,
579581 FB_UINT64 sequence, ULONG offset,
580582 ULONG length, const UCHAR* data,
581- bool rewind )
583+ ActionType action )
582584 {
583585 const Block* const header = (Block*) data;
584586
585587 const auto traNumber = header->traNumber ;
586588
587- if (!rewind || !traNumber || transactions.exist (traNumber))
589+ if (action == REPLICATE ||
590+ (action == REPLAY && (!traNumber || transactions.exist (traNumber))))
588591 {
589592 target->replicate (sequence, offset, length, data);
590593 }
@@ -597,7 +600,7 @@ namespace
597600 if (transactions.find (traNumber, pos))
598601 transactions.remove (pos);
599602 }
600- else if (!rewind )
603+ else if (action != REPLAY )
601604 {
602605 transactions.clear ();
603606 }
@@ -606,7 +609,7 @@ namespace
606609 {
607610 fb_assert (traNumber);
608611
609- if (!rewind && !transactions.exist (traNumber))
612+ if (action != REPLAY && !transactions.exist (traNumber))
610613 transactions.add (ActiveTransaction (traNumber, sequence));
611614 }
612615 }
@@ -737,6 +740,7 @@ namespace
737740 const FB_UINT64 max_sequence = queue.back ()->header .hdr_sequence ;
738741 FB_UINT64 next_sequence = 0 ;
739742 const bool restart = target->isShutdown ();
743+ auto action = REPLICATE;
740744
741745 for (auto segment : queue)
742746 {
@@ -754,21 +758,48 @@ namespace
754758 const FB_UINT64 db_sequence = target->initReplica ();
755759 const FB_UINT64 last_db_sequence = control.getDbSequence ();
756760
757- if (sequence <= db_sequence)
758- {
759- target->verbose (" Deleting segment %" UQUADFORMAT " due to fast forward" , sequence);
760- segment->remove ();
761- continue ;
762- }
763-
764761 if (db_sequence != last_db_sequence)
765762 {
766- target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT, db_sequence + 1 );
767- control.saveDbSequence (db_sequence);
768- transactions.clear ();
769- control.saveComplete (db_sequence, transactions);
770- last_sequence = db_sequence;
771- last_offset = 0 ;
763+ if (sequence == db_sequence + 1 )
764+ {
765+ if (const auto oldest = findOldest (transactions))
766+ {
767+ const TraNumber oldest_trans = oldest->tra_id ;
768+ const FB_UINT64 oldest_sequence = oldest ? oldest->sequence : 0 ;
769+ target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT
770+ " (new OAT: %" UQUADFORMAT " in segment %" UQUADFORMAT " )" ,
771+ db_sequence + 1 , oldest_trans, oldest_sequence);
772+ }
773+ else
774+ {
775+ target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT,
776+ db_sequence + 1 );
777+ }
778+
779+ control.saveDbSequence (db_sequence);
780+ return PROCESS_SHUTDOWN; // this enforces restart from OAT
781+ }
782+
783+ if (action != FAST_FORWARD)
784+ {
785+ if (segment != queue.front ())
786+ {
787+ fb_assert (false );
788+ return PROCESS_SHUTDOWN;
789+ }
790+
791+ if (db_sequence > max_sequence)
792+ {
793+ target->verbose (" Database sequence has been changed to %" UQUADFORMAT
794+ " , waiting for appropriate segment" , db_sequence);
795+ return PROCESS_SUSPEND;
796+ }
797+
798+ target->verbose (" Database sequence has been changed to %" UQUADFORMAT
799+ " , preparing for replication reset" , db_sequence);
800+
801+ action = FAST_FORWARD;
802+ }
772803 }
773804
774805 // If no new segments appeared since our last attempt,
@@ -843,17 +874,18 @@ namespace
843874
844875 if (blockLength)
845876 {
846- const bool rewind = (sequence < last_sequence ||
877+ const bool replay = (sequence < last_sequence ||
847878 (sequence == last_sequence && (!last_offset || totalLength < last_offset)));
879+ if (action != FAST_FORWARD)
880+ action = replay ? REPLAY : REPLICATE;
848881
849882 UCHAR* const data = buffer.getBuffer (length);
850883 memcpy (data, &header, sizeof (Block));
851884
852885 if (read (file, data + sizeof (Block), blockLength) != blockLength)
853886 raiseError (" Journal file %s read failed (error %d)" , segment->filename .c_str (), ERRNO);
854887
855- replicate (target, transactions, sequence, totalLength,
856- length, data, rewind);
888+ replicate (target, transactions, sequence, totalLength, length, data, action);
857889 }
858890
859891 totalLength += length;
@@ -872,7 +904,10 @@ namespace
872904 oldest_sequence = oldest ? oldest->sequence : 0 ;
873905 next_sequence = sequence + 1 ;
874906
875- string extra;
907+ string actionName, extra;
908+ actionName = (action == FAST_FORWARD) ? " scanned" :
909+ (action == REPLAY) ? " replayed" : " replicated" ;
910+
876911 if (oldest)
877912 {
878913 const TraNumber oldest_trans = oldest->tra_id ;
@@ -884,8 +919,8 @@ namespace
884919 extra = " deleting" ;
885920 }
886921
887- target->verbose (" Segment %" UQUADFORMAT " (%u bytes) is replicated in %s, %s" ,
888- sequence, totalLength, interval.c_str (), extra.c_str ());
922+ target->verbose (" Segment %" UQUADFORMAT " (%u bytes) is %s in %s, %s" ,
923+ sequence, totalLength, actionName. c_str (), interval.c_str (), extra.c_str ());
889924
890925 if (!oldest_sequence)
891926 segment->remove ();
@@ -907,8 +942,8 @@ namespace
907942 break ;
908943
909944 target->verbose (" Deleting segment %" UQUADFORMAT " as no longer needed" , sequence);
910-
911945 segment->remove ();
946+
912947 } while (pos < queue.getCount ());
913948 }
914949 }
0 commit comments