@@ -596,17 +596,20 @@ namespace
596596 return true ;
597597 }
598598
599+ enum ActionType { REPLICATE, REPLAY, FAST_FORWARD };
600+
599601 void replicate (Target* target,
600602 TransactionList& transactions,
601603 FB_UINT64 sequence, ULONG offset,
602604 ULONG length, const UCHAR* data,
603- bool rewind )
605+ ActionType action )
604606 {
605607 const Block* const header = (Block*) data;
606608
607609 const auto traNumber = header->traNumber ;
608610
609- if (!rewind || !traNumber || transactions.exist (traNumber))
611+ if (action == REPLICATE ||
612+ (action == REPLAY && (!traNumber || transactions.exist (traNumber))))
610613 {
611614 target->replicate (sequence, offset, length, data);
612615 }
@@ -619,7 +622,7 @@ namespace
619622 if (transactions.find (traNumber, pos))
620623 transactions.remove (pos);
621624 }
622- else if (!rewind )
625+ else if (action != REPLAY )
623626 {
624627 transactions.clear ();
625628 }
@@ -628,7 +631,7 @@ namespace
628631 {
629632 fb_assert (traNumber);
630633
631- if (!rewind && !transactions.exist (traNumber))
634+ if (action != REPLAY && !transactions.exist (traNumber))
632635 transactions.add (ActiveTransaction (traNumber, sequence));
633636 }
634637 }
@@ -761,6 +764,7 @@ namespace
761764 const FB_UINT64 max_sequence = queue.back ()->header .hdr_sequence ;
762765 FB_UINT64 next_sequence = 0 ;
763766 const bool restart = target->isShutdown ();
767+ auto action = REPLICATE;
764768
765769 for (auto segment : queue)
766770 {
@@ -778,21 +782,48 @@ namespace
778782 const FB_UINT64 db_sequence = target->initReplica ();
779783 const FB_UINT64 last_db_sequence = control.getDbSequence ();
780784
781- if (sequence <= db_sequence)
782- {
783- target->verbose (" Deleting segment %" UQUADFORMAT " due to fast forward" , sequence);
784- segment->remove ();
785- continue ;
786- }
787-
788785 if (db_sequence != last_db_sequence)
789786 {
790- target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT, db_sequence + 1 );
791- control.saveDbSequence (db_sequence);
792- transactions.clear ();
793- control.saveComplete (db_sequence, transactions);
794- last_sequence = db_sequence;
795- last_offset = 0 ;
787+ if (sequence == db_sequence + 1 )
788+ {
789+ if (const auto oldest = findOldest (transactions))
790+ {
791+ const TraNumber oldest_trans = oldest->tra_id ;
792+ const FB_UINT64 oldest_sequence = oldest ? oldest->sequence : 0 ;
793+ target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT
794+ " (new OAT: %" UQUADFORMAT " in segment %" UQUADFORMAT " )" ,
795+ db_sequence + 1 , oldest_trans, oldest_sequence);
796+ }
797+ else
798+ {
799+ target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT,
800+ db_sequence + 1 );
801+ }
802+
803+ control.saveDbSequence (db_sequence);
804+ return PROCESS_SHUTDOWN; // this enforces restart from OAT
805+ }
806+
807+ if (action != FAST_FORWARD)
808+ {
809+ if (segment != queue.front ())
810+ {
811+ fb_assert (false );
812+ return PROCESS_SHUTDOWN;
813+ }
814+
815+ if (db_sequence > max_sequence)
816+ {
817+ target->verbose (" Database sequence has been changed to %" UQUADFORMAT
818+ " , waiting for appropriate segment" , db_sequence);
819+ return PROCESS_SUSPEND;
820+ }
821+
822+ target->verbose (" Database sequence has been changed to %" UQUADFORMAT
823+ " , preparing for replication reset" , db_sequence);
824+
825+ action = FAST_FORWARD;
826+ }
796827 }
797828
798829 // If no new segments appeared since our last attempt,
@@ -867,17 +898,18 @@ namespace
867898
868899 if (blockLength)
869900 {
870- const bool rewind = (sequence < last_sequence ||
901+ const bool replay = (sequence < last_sequence ||
871902 (sequence == last_sequence && (!last_offset || totalLength < last_offset)));
903+ if (action != FAST_FORWARD)
904+ action = replay ? REPLAY : REPLICATE;
872905
873906 UCHAR* const data = buffer.getBuffer (length);
874907 memcpy (data, &header, sizeof (Block));
875908
876909 if (read (file, data + sizeof (Block), blockLength) != blockLength)
877910 raiseError (" Journal file %s read failed (error %d)" , segment->filename .c_str (), ERRNO);
878911
879- replicate (target, transactions, sequence, totalLength,
880- length, data, rewind);
912+ replicate (target, transactions, sequence, totalLength, length, data, action);
881913 }
882914
883915 totalLength += length;
@@ -896,7 +928,10 @@ namespace
896928 oldest_sequence = oldest ? oldest->sequence : 0 ;
897929 next_sequence = sequence + 1 ;
898930
899- string extra;
931+ string actionName, extra;
932+ actionName = (action == FAST_FORWARD) ? " scanned" :
933+ (action == REPLAY) ? " replayed" : " replicated" ;
934+
900935 if (oldest)
901936 {
902937 const TraNumber oldest_trans = oldest->tra_id ;
@@ -908,8 +943,8 @@ namespace
908943 extra = " deleting" ;
909944 }
910945
911- target->verbose (" Segment %" UQUADFORMAT " (%u bytes) is replicated in %s, %s" ,
912- sequence, totalLength, interval.c_str (), extra.c_str ());
946+ target->verbose (" Segment %" UQUADFORMAT " (%u bytes) is %s in %s, %s" ,
947+ sequence, totalLength, actionName. c_str (), interval.c_str (), extra.c_str ());
913948
914949 if (!oldest_sequence)
915950 segment->remove ();
@@ -931,8 +966,8 @@ namespace
931966 break ;
932967
933968 target->verbose (" Deleting segment %" UQUADFORMAT " as no longer needed" , sequence);
934-
935969 segment->remove ();
970+
936971 } while (pos < queue.getCount ());
937972 }
938973 }
0 commit comments