Skip to content

Commit f72afcd

Browse files
committed
Make automatic online re-initialization reliable
1 parent 9fbd574 commit f72afcd

File tree

1 file changed

+59
-24
lines changed

1 file changed

+59
-24
lines changed

src/remote/server/ReplServer.cpp

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -586,17 +586,20 @@ namespace
586586
return true;
587587
}
588588

589+
enum ActionType { REPLICATE, REPLAY, FAST_FORWARD };
590+
589591
void replicate(Target* target,
590592
TransactionList& transactions,
591593
FB_UINT64 sequence, ULONG offset,
592594
ULONG length, const UCHAR* data,
593-
bool rewind)
595+
ActionType action)
594596
{
595597
const Block* const header = (Block*) data;
596598

597599
const auto traNumber = header->traNumber;
598600

599-
if (!rewind || !traNumber || transactions.exist(traNumber))
601+
if (action == REPLICATE ||
602+
(action == REPLAY && (!traNumber || transactions.exist(traNumber))))
600603
{
601604
target->replicate(sequence, offset, length, data);
602605
}
@@ -609,7 +612,7 @@ namespace
609612
if (transactions.find(traNumber, pos))
610613
transactions.remove(pos);
611614
}
612-
else if (!rewind)
615+
else if (action != REPLAY)
613616
{
614617
transactions.clear();
615618
}
@@ -618,7 +621,7 @@ namespace
618621
{
619622
fb_assert(traNumber);
620623

621-
if (!rewind && !transactions.exist(traNumber))
624+
if (action != REPLAY && !transactions.exist(traNumber))
622625
transactions.add(ActiveTransaction(traNumber, sequence));
623626
}
624627
}
@@ -751,6 +754,7 @@ namespace
751754
const FB_UINT64 max_sequence = queue.back()->header.hdr_sequence;
752755
FB_UINT64 next_sequence = 0;
753756
const bool restart = target->isShutdown();
757+
auto action = REPLICATE;
754758

755759
for (auto segment : queue)
756760
{
@@ -768,21 +772,48 @@ namespace
768772
const FB_UINT64 db_sequence = target->initReplica();
769773
const FB_UINT64 last_db_sequence = control.getDbSequence();
770774

771-
if (sequence <= db_sequence)
772-
{
773-
target->verbose("Deleting segment %" UQUADFORMAT " due to fast forward", sequence);
774-
segment->remove();
775-
continue;
776-
}
777-
778775
if (db_sequence != last_db_sequence)
779776
{
780-
target->verbose("Resetting replication to continue from segment %" UQUADFORMAT, db_sequence + 1);
781-
control.saveDbSequence(db_sequence);
782-
transactions.clear();
783-
control.saveComplete(db_sequence, transactions);
784-
last_sequence = db_sequence;
785-
last_offset = 0;
777+
if (sequence == db_sequence + 1)
778+
{
779+
if (const auto oldest = findOldest(transactions))
780+
{
781+
const TraNumber oldest_trans = oldest->tra_id;
782+
const FB_UINT64 oldest_sequence = oldest ? oldest->sequence : 0;
783+
target->verbose("Resetting replication to continue from segment %" UQUADFORMAT
784+
" (new OAT: %" UQUADFORMAT " in segment %" UQUADFORMAT ")",
785+
db_sequence + 1, oldest_trans, oldest_sequence);
786+
}
787+
else
788+
{
789+
target->verbose("Resetting replication to continue from segment %" UQUADFORMAT,
790+
db_sequence + 1);
791+
}
792+
793+
control.saveDbSequence(db_sequence);
794+
return PROCESS_SHUTDOWN; // this enforces restart from OAT
795+
}
796+
797+
if (action != FAST_FORWARD)
798+
{
799+
if (segment != queue.front())
800+
{
801+
fb_assert(false);
802+
return PROCESS_SHUTDOWN;
803+
}
804+
805+
if (db_sequence > max_sequence)
806+
{
807+
target->verbose("Database sequence has been changed to %" UQUADFORMAT
808+
", waiting for appropriate segment", db_sequence);
809+
return PROCESS_SUSPEND;
810+
}
811+
812+
target->verbose("Database sequence has been changed to %" UQUADFORMAT
813+
", preparing for replication reset", db_sequence);
814+
815+
action = FAST_FORWARD;
816+
}
786817
}
787818

788819
// If no new segments appeared since our last attempt,
@@ -857,17 +888,18 @@ namespace
857888

858889
if (blockLength)
859890
{
860-
const bool rewind = (sequence < last_sequence ||
891+
const bool replay = (sequence < last_sequence ||
861892
(sequence == last_sequence && (!last_offset || totalLength < last_offset)));
893+
if (action != FAST_FORWARD)
894+
action = replay ? REPLAY : REPLICATE;
862895

863896
UCHAR* const data = buffer.getBuffer(length);
864897
memcpy(data, &header, sizeof(Block));
865898

866899
if (read(file, data + sizeof(Block), blockLength) != blockLength)
867900
raiseError("Journal file %s read failed (error %d)", segment->filename.c_str(), ERRNO);
868901

869-
replicate(target, transactions, sequence, totalLength,
870-
length, data, rewind);
902+
replicate(target, transactions, sequence, totalLength, length, data, action);
871903
}
872904

873905
totalLength += length;
@@ -886,7 +918,10 @@ namespace
886918
oldest_sequence = oldest ? oldest->sequence : 0;
887919
next_sequence = sequence + 1;
888920

889-
string extra;
921+
string actionName, extra;
922+
actionName = (action == FAST_FORWARD) ? "scanned" :
923+
(action == REPLAY) ? "replayed" : "replicated";
924+
890925
if (oldest)
891926
{
892927
const TraNumber oldest_trans = oldest->tra_id;
@@ -898,8 +933,8 @@ namespace
898933
extra = "deleting";
899934
}
900935

901-
target->verbose("Segment %" UQUADFORMAT " (%u bytes) is replicated in %s, %s",
902-
sequence, totalLength, interval.c_str(), extra.c_str());
936+
target->verbose("Segment %" UQUADFORMAT " (%u bytes) is %s in %s, %s",
937+
sequence, totalLength, actionName.c_str(), interval.c_str(), extra.c_str());
903938

904939
if (!oldest_sequence)
905940
segment->remove();
@@ -921,8 +956,8 @@ namespace
921956
break;
922957

923958
target->verbose("Deleting segment %" UQUADFORMAT " as no longer needed", sequence);
924-
925959
segment->remove();
960+
926961
} while (pos < queue.getCount());
927962
}
928963
}

0 commit comments

Comments
 (0)