Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/bucket/BucketSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,11 @@ LiveBucketSnapshot::scanForEntriesOfType(
return Loop::INCOMPLETE;
}

auto& stream = getStream();
// We open up a stream locally here because otherwise, we might race with
// other methods accessing the stream while populating in memory soroban
// state
XDRInputFileStream stream;
stream.open(mBucket->getFilename().string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems suspicious: race accessing the stream suggests we're using the same stream for eviction scanning and state population, which doesn't sound right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eviction scan also opens its own stream

// Open new stream for eviction scan to not interfere with BucketListDB load
// streams
XDRInputFileStream stream{};
stream.open(mBucket->getFilename());

I believe this races with getEntryAtOffset.

stream.seek(range->first);

BucketEntry be;
Expand Down
30 changes: 29 additions & 1 deletion src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class TempLedgerVersionSetter : NonMovableOrCopyable
ApplyBucketsWork::ApplyBucketsWork(
Application& app,
std::map<std::string, std::shared_ptr<LiveBucket>> const& buckets,
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion)
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion,
bool waitForLedgerManager)
: Work(app, "apply-buckets", BasicWork::RETRY_NEVER)
, mBuckets(buckets)
, mApplyState(applyState)
Expand All @@ -64,6 +65,8 @@ ApplyBucketsWork::ApplyBucketsWork(
, mCounters(app.getClock().now())
, mIsApplyInvariantEnabled(
app.getInvariantManager().isBucketApplyInvariantEnabled())
, mWaitForLedgerManager{waitForLedgerManager}
, mLedgerManagerReadyTimer(app)
{
}

Expand Down Expand Up @@ -179,6 +182,25 @@ ApplyBucketsWork::prepareForNextBucket()
}
}

void
ApplyBucketsWork::waitForLedgerManager()
{
mLedgerManagerReadyTimer.expires_from_now(std::chrono::seconds{1});
mLedgerManagerReadyTimer.async_wait(
[this] {
if (mApp.getLedgerManager().getState() ==
LedgerManager::LM_BOOTING_STATE)
{
waitForLedgerManager();
}
else
{
wakeUp();
}
},
&VirtualTimer::onFailureNoop);
}

// We iterate through the live BucketList either in-order (level 0 curr, level 0
// snap, level 1 curr, etc). We keep track of the keys we have already
// seen, and only apply an entry to the DB if it has not been seen before. This
Expand Down Expand Up @@ -216,6 +238,12 @@ ApplyBucketsWork::doWork()

if (!mAssumeStateWork)
{
if (mWaitForLedgerManager && mApp.getLedgerManager().getState() ==
LedgerManager::LM_BOOTING_STATE)
{
waitForLedgerManager();
return BasicWork::State::WORK_WAITING;
}
// Step 2: apply buckets.
auto isCurr = mBucketToApplyIndex % 2 == 0;
if (mBucketApplicator)
Expand Down
7 changes: 6 additions & 1 deletion src/catchup/ApplyBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ class ApplyBucketsWork : public Work
void startBucket();
void prepareForNextBucket();

bool const mWaitForLedgerManager;
VirtualTimer mLedgerManagerReadyTimer;
void waitForLedgerManager();

public:
ApplyBucketsWork(
Application& app,
std::map<std::string, std::shared_ptr<LiveBucket>> const& buckets,
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion);
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion,
bool waitForLedgerManager = true);
~ApplyBucketsWork() = default;

std::string getStatus() const override;
Expand Down
25 changes: 25 additions & 0 deletions src/catchup/ApplyLedgerWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,38 @@ ApplyLedgerWork::ApplyLedgerWork(Application& app,
app, "apply-ledger-" + std::to_string(ledgerCloseData.getLedgerSeq()),
BasicWork::RETRY_NEVER)
, mLedgerCloseData(ledgerCloseData)
, mLedgerManagerReadyTimer(app)
{
}

void
ApplyLedgerWork::waitForLedgerManager()
{
mLedgerManagerReadyTimer.expires_from_now(std::chrono::seconds{1});
mLedgerManagerReadyTimer.async_wait(
[this] {
if (mApp.getLedgerManager().getState() ==
LedgerManager::LM_BOOTING_STATE)
{
waitForLedgerManager();
}
else
{
wakeUp();
}
},
&VirtualTimer::onFailureNoop);
}

BasicWork::State
ApplyLedgerWork::onRun()
{
ZoneScoped;
if (mApp.getLedgerManager().getState() == LedgerManager::LM_BOOTING_STATE)
{
waitForLedgerManager();
return BasicWork::State::WORK_WAITING;
}
mApp.getLedgerManager().applyLedger(mLedgerCloseData,
/* externalize */ false);
return BasicWork::State::WORK_SUCCESS;
Expand Down
2 changes: 2 additions & 0 deletions src/catchup/ApplyLedgerWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace stellar
class ApplyLedgerWork : public BasicWork
{
LedgerCloseData const mLedgerCloseData;
VirtualTimer mLedgerManagerReadyTimer;
void waitForLedgerManager();

public:
ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData);
Expand Down
3 changes: 2 additions & 1 deletion src/catchup/LedgerApplyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class LedgerApplyManager
enum class ProcessLedgerResult
{
PROCESSED_ALL_LEDGERS_SEQUENTIALLY,
WAIT_TO_APPLY_BUFFERED_OR_CATCHUP
WAIT_TO_APPLY_BUFFERED_OR_CATCHUP,
WAIT_FOR_STATE_REBUILD
};
static std::unique_ptr<LedgerApplyManager> create(Application& app);

Expand Down
20 changes: 15 additions & 5 deletions src/catchup/LedgerApplyManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,24 @@ LedgerApplyManagerImpl::processLedger(LedgerCloseData const& ledgerData,
mLargestLedgerSeqHeard =
std::max(mLargestLedgerSeqHeard, lastReceivedLedgerSeq);

if (mApp.getLedgerManager().getState() == LedgerManager::LM_BOOTING_STATE)
{
CLOG_INFO(Ledger,
"LedgerManager still booting: close of ledger {} buffered. "
"mSyncingLedgers has {} ledgers",
ledgerData.getLedgerSeq(), mSyncingLedgers.size());
return ProcessLedgerResult::WAIT_FOR_STATE_REBUILD;
}

// 1. CatchupWork is not running yet
// 2. LedgerApplyManager received ledger that should be immediately applied
// by LedgerManager: check if we have any sequential ledgers. If so, attempt
// to apply mSyncingLedgers and possibly get back in sync
if (!mCatchupWork && lastReceivedLedgerSeq == *mLastQueuedToApply + 1)
if (!mCatchupWork)
{
tryApplySyncingLedgers();
return ProcessLedgerResult::PROCESSED_ALL_LEDGERS_SEQUENTIALLY;
// 2. We successfully applied all of mSyncingLedgers
if (mSyncingLedgers.empty())
{
return ProcessLedgerResult::PROCESSED_ALL_LEDGERS_SEQUENTIALLY;
}
}

// For the rest of this method: we know LCL has fallen behind the network
Expand Down
10 changes: 7 additions & 3 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,13 @@ HerderImpl::bootstrap()
mLedgerManager.moveToSynced();
mHerderSCPDriver.bootstrap();

setupTriggerNextLedger();
newSlotExternalized(
true, mLedgerManager.getLastClosedLedgerHeader().header.scpValue);
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
if (trackingConsensusLedgerIndex() == lcl.header.ledgerSeq)
{
setupTriggerNextLedger();
newSlotExternalized(
true, mLedgerManager.getLastClosedLedgerHeader().header.scpValue);
}
}

void
Expand Down
17 changes: 14 additions & 3 deletions src/ledger/LedgerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,11 @@ class LedgerManager
// Loading state from database, not yet active
LM_BOOTING_STATE,

// Finished loading state from database, active
LM_BOOTED_STATE,

// local state is in sync with view of consensus coming from herder
// desynchronization will cause transition to LM_BOOTING_STATE.
// desynchronization will cause transition to LM_CATCHING_UP_STATE
LM_SYNCED_STATE,

// local state doesn't match view of consensus from herder
Expand Down Expand Up @@ -297,8 +300,16 @@ class LedgerManager
// Called by application lifecycle events, system startup.
virtual void startNewLedger() = 0;

// loads the last ledger information from the database
virtual void loadLastKnownLedger() = 0;
// loads the last ledger information from the database with the following
// parameters:
// * callback: function to call on main thread once ledger load has
// completed (in particular, Soroban in-memory state is loaded). Note
// that this is always posted as a task to the main thread.
// * asyncPopulateInMemoryState: whether or not to populate in-memory
// Soroban state asynchronously.
virtual void
loadLastKnownLedger(std::optional<std::function<void()>> callback,
bool asyncPopulateInMemoryState = false) = 0;

// Helper for a faster load of the last closed ledger used only for various
// diagnostics utils outside of the main application flow. This skips
Expand Down
Loading
Loading