Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 59 additions & 24 deletions src/remote/server/ReplServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,17 +586,20 @@ namespace
return true;
}

enum ActionType { REPLICATE, REPLAY, FAST_FORWARD };

void replicate(Target* target,
TransactionList& transactions,
FB_UINT64 sequence, ULONG offset,
ULONG length, const UCHAR* data,
bool rewind)
ActionType action)
{
const Block* const header = (Block*) data;

const auto traNumber = header->traNumber;

if (!rewind || !traNumber || transactions.exist(traNumber))
if (action == REPLICATE ||
(action == REPLAY && (!traNumber || transactions.exist(traNumber))))
{
target->replicate(sequence, offset, length, data);
}
Expand All @@ -609,7 +612,7 @@ namespace
if (transactions.find(traNumber, pos))
transactions.remove(pos);
}
else if (!rewind)
else if (action != REPLAY)
{
transactions.clear();
}
Expand All @@ -618,7 +621,7 @@ namespace
{
fb_assert(traNumber);

if (!rewind && !transactions.exist(traNumber))
if (action != REPLAY && !transactions.exist(traNumber))
transactions.add(ActiveTransaction(traNumber, sequence));
}
}
Expand Down Expand Up @@ -751,6 +754,7 @@ namespace
const FB_UINT64 max_sequence = queue.back()->header.hdr_sequence;
FB_UINT64 next_sequence = 0;
const bool restart = target->isShutdown();
auto action = REPLICATE;

for (auto segment : queue)
{
Expand All @@ -768,21 +772,48 @@ namespace
const FB_UINT64 db_sequence = target->initReplica();
const FB_UINT64 last_db_sequence = control.getDbSequence();

if (sequence <= db_sequence)
{
target->verbose("Deleting segment %" UQUADFORMAT " due to fast forward", sequence);
segment->remove();
continue;
}

if (db_sequence != last_db_sequence)
{
target->verbose("Resetting replication to continue from segment %" UQUADFORMAT, db_sequence + 1);
control.saveDbSequence(db_sequence);
transactions.clear();
control.saveComplete(db_sequence, transactions);
last_sequence = db_sequence;
last_offset = 0;
if (sequence == db_sequence + 1)
{
if (const auto oldest = findOldest(transactions))
{
const TraNumber oldest_trans = oldest->tra_id;
const FB_UINT64 oldest_sequence = oldest ? oldest->sequence : 0;
target->verbose("Resetting replication to continue from segment %" UQUADFORMAT
" (new OAT: %" UQUADFORMAT " in segment %" UQUADFORMAT ")",
db_sequence + 1, oldest_trans, oldest_sequence);
}
else
{
target->verbose("Resetting replication to continue from segment %" UQUADFORMAT,
db_sequence + 1);
}

control.saveDbSequence(db_sequence);
return PROCESS_SHUTDOWN; // this enforces restart from OAT
}

if (action != FAST_FORWARD)
{
if (segment != queue.front())
{
fb_assert(false);
return PROCESS_SHUTDOWN;
}

if (db_sequence > max_sequence)
{
target->verbose("Database sequence has been changed to %" UQUADFORMAT
", waiting for appropriate segment", db_sequence);
return PROCESS_SUSPEND;
}

target->verbose("Database sequence has been changed to %" UQUADFORMAT
", preparing for replication reset", db_sequence);

action = FAST_FORWARD;
}
}

// If no new segments appeared since our last attempt,
Expand Down Expand Up @@ -857,17 +888,18 @@ namespace

if (blockLength)
{
const bool rewind = (sequence < last_sequence ||
const bool replay = (sequence < last_sequence ||
(sequence == last_sequence && (!last_offset || totalLength < last_offset)));
if (action != FAST_FORWARD)
action = replay ? REPLAY : REPLICATE;

UCHAR* const data = buffer.getBuffer(length);
memcpy(data, &header, sizeof(Block));

if (read(file, data + sizeof(Block), blockLength) != blockLength)
raiseError("Journal file %s read failed (error %d)", segment->filename.c_str(), ERRNO);

replicate(target, transactions, sequence, totalLength,
length, data, rewind);
replicate(target, transactions, sequence, totalLength, length, data, action);
}

totalLength += length;
Expand All @@ -886,7 +918,10 @@ namespace
oldest_sequence = oldest ? oldest->sequence : 0;
next_sequence = sequence + 1;

string extra;
string actionName, extra;
actionName = (action == FAST_FORWARD) ? "scanned" :
(action == REPLAY) ? "replayed" : "replicated";

if (oldest)
{
const TraNumber oldest_trans = oldest->tra_id;
Expand All @@ -898,8 +933,8 @@ namespace
extra = "deleting";
}

target->verbose("Segment %" UQUADFORMAT " (%u bytes) is replicated in %s, %s",
sequence, totalLength, interval.c_str(), extra.c_str());
target->verbose("Segment %" UQUADFORMAT " (%u bytes) is %s in %s, %s",
sequence, totalLength, actionName.c_str(), interval.c_str(), extra.c_str());

if (!oldest_sequence)
segment->remove();
Expand All @@ -921,8 +956,8 @@ namespace
break;

target->verbose("Deleting segment %" UQUADFORMAT " as no longer needed", sequence);

segment->remove();

} while (pos < queue.getCount());
}
}
Expand Down
Loading