Skip to content

Commit eba663a

Browse files
Test fixes and code hardening in preparation for parallel features (#5022)
As we're planning to enable various parallel features by default, this PR cleans up some test paths and hardens the code. Concrete changes are: - Crank in tests to support async apply flow - Harden shutdown paths (avoid calling LM in App destructor - if BM initialization throws, LM is null) - Don't parallel apply in standalone mode (as it expects immediate result) - Test fixes to properly work with Postgres (e.g. make timestamps valid to avoid pg complaining)
2 parents 196bd37 + 7f1ec5a commit eba663a

File tree

12 files changed

+64
-34
lines changed

12 files changed

+64
-34
lines changed

src/bucket/test/BucketTestUtils.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,10 @@ closeLedger(Application& app, std::optional<SecretKey> skToSignValue,
102102
app.getHerder().externalizeValue(TxSetXDRFrame::makeEmpty(lcl), ledgerNum,
103103
lcl.header.scpValue.closeTime, upgrades,
104104
skToSignValue);
105-
// NB: this assert will probably stop being true when background apply is
106-
// turned on by default: externalize will have handed the ledger off to
107-
// apply but not yet received the results of apply or updated LCL. The fix
108-
// should be just to crank here until LCL advances to ledgerSeq.
105+
while (lm.getLastClosedLedgerNum() < ledgerNum)
106+
{
107+
app.getClock().crank(true);
108+
}
109109
releaseAssert(lm.getLastClosedLedgerNum() == ledgerNum);
110110
return lm.getLastClosedLedgerHeader().hash;
111111
}
@@ -235,11 +235,13 @@ LedgerManagerForBucketTests::finalizeLedgerTxnChanges(
235235

236236
LedgerTxn ltxEvictions(ltx);
237237

238+
auto sorobanConfig =
239+
SorobanNetworkConfig::loadFromLedger(ltxEvictions);
240+
238241
auto evictedState =
239242
mApp.getBucketManager().resolveBackgroundEvictionScan(
240243
ltxEvictions, lh.ledgerSeq, keys, initialLedgerVers,
241-
mApp.getLedgerManager()
242-
.getLastClosedSorobanNetworkConfig());
244+
sorobanConfig);
243245
if (protocolVersionStartsFrom(
244246
initialLedgerVers,
245247
LiveBucket::

src/herder/HerderImpl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,10 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq,
936936
StellarValue sv =
937937
makeStellarValue(txSet->getContentsHash(), closeTime, upgrades, sk);
938938
getHerderSCPDriver().valueExternalized(ledgerSeq, xdr::xdr_to_opaque(sv));
939+
while (mApp.getLedgerManager().getLastClosedLedgerNum() < ledgerSeq)
940+
{
941+
mApp.getClock().crank(true);
942+
}
939943
}
940944

941945
bool

src/ledger/LedgerManagerImpl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ LedgerManagerImpl::ApplyState::getSorobanInMemoryStateSizeForTesting() const
267267
void
268268
LedgerManagerImpl::ApplyState::threadInvariant() const
269269
{
270-
if (mAppConnector.getConfig().EXPERIMENTAL_PARALLEL_LEDGER_APPLY)
270+
if (mAppConnector.getConfig().parallelLedgerClose())
271271
{
272272
releaseAssert(threadIsMain() || mAppConnector.threadIsType(
273273
Application::ThreadType::APPLY));

src/main/ApplicationImpl.cpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ ApplicationImpl::~ApplicationImpl()
633633
mStopping = true;
634634
try
635635
{
636-
idempotentShutdown();
636+
idempotentShutdown(false);
637637
}
638638
catch (std::exception const& e)
639639
{
@@ -821,7 +821,7 @@ ApplicationImpl::start()
821821
}
822822

823823
void
824-
ApplicationImpl::idempotentShutdown()
824+
ApplicationImpl::idempotentShutdown(bool forgetBuckets)
825825
{
826826
// Graceful shutdown sequence:
827827
// Perform a graceful shutdown by first signaling all managers to stop
@@ -850,11 +850,14 @@ ApplicationImpl::idempotentShutdown()
850850
}
851851
if (mBucketManager)
852852
{
853-
// This call happens in shutdown -- before destruction -- so that we can
854-
// be sure other subsystems (ledger etc.) are still alive and we can
855-
// call into them to figure out which buckets _are_ referenced.
856-
mBucketManager->forgetUnreferencedBuckets(
857-
mLedgerManager->getLastClosedLedgerHAS());
853+
if (forgetBuckets)
854+
{
855+
// This call happens in shutdown -- before destruction -- so that we
856+
// can be sure other subsystems (ledger etc.) are still alive and we
857+
// can call into them to figure out which buckets _are_ referenced.
858+
mBucketManager->forgetUnreferencedBuckets(
859+
mLedgerManager->getLastClosedLedgerHAS());
860+
}
858861
mBucketManager->shutdown();
859862
}
860863
if (mHerder)
@@ -875,7 +878,7 @@ ApplicationImpl::gracefulStop()
875878
return;
876879
}
877880
mStopping = true;
878-
idempotentShutdown();
881+
idempotentShutdown(true);
879882

880883
mStoppingTimer.expires_from_now(
881884
std::chrono::seconds(SHUTDOWN_DELAY_SECONDS));
@@ -900,7 +903,7 @@ ApplicationImpl::shutdownWorkScheduler()
900903
}
901904
}
902905

903-
void
906+
bool
904907
ApplicationImpl::shutdownThread(
905908
std::unique_ptr<std::thread>& threadPtr,
906909
std::unique_ptr<asio::io_context::work>& workPtr,
@@ -919,23 +922,29 @@ ApplicationImpl::shutdownThread(
919922
LOG_INFO(DEFAULT_LOG, "Joining {} thread", threadName);
920923
threadPtr->join();
921924
threadPtr.reset();
925+
return true;
922926
}
927+
return false;
923928
}
924929

925930
void
926931
ApplicationImpl::joinAllThreads()
927932
{
928-
uint32_t const THREAD_COUNT = 3 + mWorkerThreads.size();
929-
shutdownThread(mLedgerCloseThread, mLedgerCloseWork, "ledger close");
933+
uint32_t joined = 0;
934+
joined +=
935+
shutdownThread(mLedgerCloseThread, mLedgerCloseWork, "ledger close");
930936
for (auto& w : mWorkerThreads)
931937
{
932-
shutdownThread(w, mWork, "worker");
938+
joined += shutdownThread(w, mWork, "worker");
933939
}
934940
mWorkerThreads.clear();
935941

936-
shutdownThread(mOverlayThread, mOverlayWork, "overlay");
937-
shutdownThread(mEvictionThread, mEvictionWork, "eviction");
938-
LOG_INFO(DEFAULT_LOG, "Joined all {} threads", THREAD_COUNT);
942+
joined += shutdownThread(mOverlayThread, mOverlayWork, "overlay");
943+
joined += shutdownThread(mEvictionThread, mEvictionWork, "eviction");
944+
if (joined)
945+
{
946+
LOG_INFO(DEFAULT_LOG, "Joined all {} threads", joined);
947+
}
939948
}
940949

941950
std::string

src/main/ApplicationImpl.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,10 @@ class ApplicationImpl : public Application
292292
void upgradeToCurrentSchemaAndMaybeRebuildLedger(bool applyBuckets,
293293
bool forceRebuild);
294294

295-
void idempotentShutdown();
296-
void shutdownThread(std::unique_ptr<std::thread>& threadPtr,
295+
// Set `forgetBuckets` to true to clean up unreferenced buckets
296+
// Note: this flag requires LM and BM to be fully constructed
297+
void idempotentShutdown(bool forgetBuckets);
298+
bool shutdownThread(std::unique_ptr<std::thread>& threadPtr,
297299
std::unique_ptr<asio::io_context::work>& workPtr,
298300
std::string const& threadName);
299301
};

src/main/Config.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,7 +1916,8 @@ Config::processConfig(std::shared_ptr<cpptoml::table> t)
19161916
{
19171917
std::string msg =
19181918
"Invalid configuration: EXPERIMENTAL_PARALLEL_LEDGER_APPLY "
1919-
"does not support SQLite. Either switch to Postgres or set "
1919+
"does not support SQLite or RUN_STANDALONE mode. Either switch "
1920+
"to Postgres or set "
19201921
"EXPERIMENTAL_PARALLEL_LEDGER_APPLY=false";
19211922
throw std::runtime_error(msg);
19221923
}
@@ -2479,8 +2480,10 @@ Config::allBucketsInMemory() const
24792480
bool
24802481
Config::parallelLedgerClose() const
24812482
{
2483+
// Standalone mode expects synchronous ledger application
24822484
return EXPERIMENTAL_PARALLEL_LEDGER_APPLY &&
2483-
!(DATABASE.value.find("sqlite3://") != std::string::npos);
2485+
!(DATABASE.value.find("sqlite3://") != std::string::npos) &&
2486+
!RUN_STANDALONE;
24842487
}
24852488

24862489
void

src/main/test/CommandHandlerTests.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ TEST_CASE("manualclose", "[commandhandler]")
198198
auto& commandHandler = app->getCommandHandler();
199199
std::string retStr;
200200
issue(commandHandler, retStr);
201+
app->gracefulStop();
201202
};
202203

203204
SECTION("'manualclose' is forbidden if MANUAL_CLOSE is not configured")

src/overlay/test/OverlayTests.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,8 +2253,10 @@ TEST_CASE("database is purged at overlay start", "[overlay]")
22532253
auto app = createTestApplication(clock, cfg, true, false);
22542254
auto& om = app->getOverlayManager();
22552255
auto& peerManager = om.getPeerManager();
2256-
auto record = [](size_t numFailures) {
2257-
return PeerRecord{{}, numFailures, static_cast<int>(PeerType::INBOUND)};
2256+
auto record = [app](size_t numFailures) {
2257+
return PeerRecord{
2258+
VirtualClock::systemPointToTm(app->getClock().system_now()),
2259+
numFailures, static_cast<int>(PeerType::INBOUND)};
22582260
};
22592261

22602262
// Need to set max tx size on tests that start OverlayManager without

src/overlay/test/PeerManagerTests.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,10 @@ TEST_CASE("purge peer table", "[overlay][PeerManager]")
513513
VirtualClock clock;
514514
auto app = createTestApplication(clock, getTestConfig());
515515
auto& peerManager = app->getOverlayManager().getPeerManager();
516-
auto record = [](size_t numFailures) {
517-
return PeerRecord{{}, numFailures, static_cast<int>(PeerType::INBOUND)};
516+
auto record = [&app](size_t numFailures) {
517+
return PeerRecord{
518+
VirtualClock::systemPointToTm(app->getClock().system_now()),
519+
numFailures, static_cast<int>(PeerType::INBOUND)};
518520
};
519521

520522
peerManager.store(localhost(1), record(1), false);

src/test/TxTests.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -594,10 +594,10 @@ closeLedgerOn(Application& app, uint32 ledgerSeq, TimePoint closeTime,
594594
}
595595
app.getHerder().externalizeValue(txSet.first, ledgerSeq, closeTime,
596596
upgrades);
597-
// NB: this assert will probably stop being true when background apply is
598-
// turned on by default: externalize will have handed the ledger off to
599-
// apply but not yet received the results of apply or updated LCL. The fix
600-
// should be just to crank here until LCL advances to ledgerSeq.
597+
while (app.getLedgerManager().getLastClosedLedgerNum() < ledgerSeq)
598+
{
599+
app.getClock().crank(true);
600+
}
601601
releaseAssert(app.getLedgerManager().getLastClosedLedgerNum() == ledgerSeq);
602602
auto& lm = static_cast<LedgerManagerImpl&>(app.getLedgerManager());
603603
return lm.mLatestTxResultSet;

0 commit comments

Comments
 (0)