Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 57 additions & 31 deletions src/history/CheckpointBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,22 @@
#include "history/HistoryManager.h"
#include "ledger/LedgerManager.h"
#include "main/Application.h"
#include "main/ErrorMessages.h"
#include "util/XDRStream.h"

namespace stellar
{
bool
void
CheckpointBuilder::ensureOpen(uint32_t ledgerSeq)
{
ZoneScoped;
releaseAssert(mApp.getHistoryArchiveManager().publishEnabled());
mSkipFirstCheckpointSinceItIsIncomplete = false;
if (!mOpen)
{
releaseAssert(!mTxResults);
releaseAssert(!mTxs);
releaseAssert(!mLedgerHeaders);
// Don't start writing checkpoint until proper checkpoint boundary
// This can occur if a node enabled publish mid-checkpoint
if (mPublishWasDisabled && !HistoryManager::isFirstLedgerInCheckpoint(
ledgerSeq, mApp.getConfig()))
{
return false;
}

mPublishWasDisabled = false;

auto checkpoint = HistoryManager::checkpointContainingLedger(
ledgerSeq, mApp.getConfig());
Expand All @@ -50,7 +43,6 @@ CheckpointBuilder::ensureOpen(uint32_t ledgerSeq)
mLedgerHeaders->open(ledger.localPath_nogz_dirty());
mOpen = true;
}
return true;
}

void
Expand Down Expand Up @@ -135,11 +127,15 @@ CheckpointBuilder::appendTransactionSet(uint32_t ledgerSeq,
throw std::runtime_error("Startup validation not performed");
}

if (!ensureOpen(ledgerSeq))
// Skip incomplete checkpoint if we're not on the checkpoint boundary
if (skipIncompleteFirstCheckpointSinceRestart() &&
!HistoryManager::isFirstLedgerInCheckpoint(ledgerSeq, mApp.getConfig()))
{
return;
}

ensureOpen(ledgerSeq);

if (!resultSet.results.empty())
{
TransactionHistoryResultEntry results;
Expand All @@ -162,11 +158,16 @@ CheckpointBuilder::appendLedgerHeader(LedgerHeader const& header,
throw std::runtime_error("Startup validation not performed");
}

if (!ensureOpen(header.ledgerSeq))
// Skip incomplete checkpoint if we're not on the checkpoint boundary
if (skipIncompleteFirstCheckpointSinceRestart() &&
!HistoryManager::isFirstLedgerInCheckpoint(header.ledgerSeq,
mApp.getConfig()))
{
return;
}

ensureOpen(header.ledgerSeq);

LedgerHeaderHistoryEntry lhe;
lhe.header = header;
lhe.hash = xdrSha256(header);
Expand Down Expand Up @@ -239,7 +240,7 @@ CheckpointBuilder::cleanup(uint32_t lcl)
fmt::format("Failed to delete next checkpoint file {}",
next.localPath_nogz_dirty()));
}
return;
return true;
}

if (!fs::exists(ft.localPath_nogz_dirty()))
Expand All @@ -248,8 +249,7 @@ CheckpointBuilder::cleanup(uint32_t lcl)
"Skipping recovery of file {}, does not exist. This can "
"occur if publish was previously disabled.",
ft.localPath_nogz_dirty());
mPublishWasDisabled = true;
return;
return false;
}

// Find a tmp file; any potentially invalid files _must_ be tmp files,
Expand All @@ -263,32 +263,24 @@ CheckpointBuilder::cleanup(uint32_t lcl)
out.open(tmpFile.string());
XDRInputFileStream in;
in.open(ft.localPath_nogz_dirty());
uint32_t lastReadLedgerSeq = 0;
uint32_t lastWrittenLedgerSeq = 0;
while (in)
{
try
{
if (!in.readOne(entry))
{
// If file doesn't end on LCL, it's corrupt
if (enforceLCL && lastReadLedgerSeq != lcl)
{
throw std::runtime_error(
fmt::format("Corrupt checkpoint file {}, ends "
"on ledger {}, LCL is {}",
ft.localPath_nogz_dirty(),
getLedgerSeq(entry), lcl));
}
break;
}
lastReadLedgerSeq = getLedgerSeq(entry);
if (lastReadLedgerSeq > lcl)
auto seq = getLedgerSeq(entry);
if (seq > lcl)
{
CLOG_INFO(History, "Truncating {} at ledger {}",
ft.localPath_nogz_dirty(), lcl);
break;
}
out.durableWriteOne(entry);
lastWrittenLedgerSeq = seq;
}
catch (xdr::xdr_runtime_error const& e)
{
Expand All @@ -300,6 +292,16 @@ CheckpointBuilder::cleanup(uint32_t lcl)
break;
}
}

// Once we finished truncating, check that we ended on LCL.
// If file doesn't end on LCL, it's corrupt
if (enforceLCL && lastWrittenLedgerSeq != lcl)
{
throw std::runtime_error(fmt::format(
"Corrupt checkpoint file {}, ends "
"on ledger {}, LCL is {}",
ft.localPath_nogz_dirty(), lastWrittenLedgerSeq, lcl));
}
}

if (!fs::durableRename(
Expand All @@ -308,13 +310,37 @@ CheckpointBuilder::cleanup(uint32_t lcl)
{
throw std::runtime_error("Failed to rename checkpoint file");
}

return true;
};

// We can only require ledger header to be at LCL; transactions and results
// can have gaps (if there were empty ledgers)
recover(res, TransactionHistoryResultEntry{}, /* enforceLCL */ false);
recover(txs, TransactionHistoryEntry{}, /* enforceLCL */ false);
recover(ledger, LedgerHeaderHistoryEntry{}, /* enforceLCL */ true);
auto resExists =
recover(res, TransactionHistoryResultEntry{}, /* enforceLCL */ false);
auto txExists =
recover(txs, TransactionHistoryEntry{}, /* enforceLCL */ false);
auto headerExists =
recover(ledger, LedgerHeaderHistoryEntry{}, /* enforceLCL */ true);

if (!resExists && !txExists && !headerExists)
{
mSkipFirstCheckpointSinceItIsIncomplete = true;
CLOG_INFO(History, "No checkpoint files found during recovery, likely "
"publish was previously disabled");
}
else if (!(resExists && txExists && headerExists))
{
std::string errMsg =
fmt::format("Some checkpoint files were not found during recovery, "
"results={}, transactions={}, headers={}.",
resExists, txExists, headerExists);
CLOG_ERROR(History, "{}", errMsg);
CLOG_ERROR(History, "Delete incomplete checkpoint files and restart.");
CLOG_ERROR(History, "{}", REPORT_INTERNAL_BUG);
throw std::runtime_error(errMsg);
}

mStartupValidationComplete = true;
}
}
12 changes: 10 additions & 2 deletions src/history/CheckpointBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ class CheckpointBuilder
std::unique_ptr<XDROutputFileStream> mLedgerHeaders;
bool mOpen{false};
bool mStartupValidationComplete{false};
bool mPublishWasDisabled{false};
// Skip building this checkpoint if it can't be completed.
// This may happen if a node enabled publishing mid-checkpoint.
bool mSkipFirstCheckpointSinceItIsIncomplete{false};

bool ensureOpen(uint32_t ledgerSeq);
void ensureOpen(uint32_t ledgerSeq);

public:
CheckpointBuilder(Application& app);
Expand All @@ -82,5 +84,11 @@ class CheckpointBuilder
// Finalize checkpoint by renaming all temporary files to their canonical
// names. No-op if files are already rotated.
void checkpointComplete(uint32_t checkpoint);

bool
skipIncompleteFirstCheckpointSinceRestart() const
{
return mSkipFirstCheckpointSinceItIsIncomplete;
}
};
}
8 changes: 8 additions & 0 deletions src/history/HistoryManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ HistoryManagerImpl::maybeQueueHistoryCheckpoint(uint32_t lcl,
return false;
}

if (mCheckpointBuilder.skipIncompleteFirstCheckpointSinceRestart())
{
CLOG_INFO(
History,
"Skipping incomplete checkpoint, publish was previously disabled");
return false;
}

queueCurrentHistory(lcl, ledgerVers);
return true;
}
Expand Down
Loading