Skip to content

Commit 15acb83

Browse files
committed
Minor improvements to publishing
1 parent 73cbad7 commit 15acb83

File tree

3 files changed

+69
-32
lines changed

3 files changed

+69
-32
lines changed

src/history/CheckpointBuilder.cpp

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,22 @@
33
#include "history/HistoryManager.h"
44
#include "ledger/LedgerManager.h"
55
#include "main/Application.h"
6+
#include "main/ErrorMessages.h"
67
#include "util/XDRStream.h"
78

89
namespace stellar
910
{
10-
bool
11+
void
1112
CheckpointBuilder::ensureOpen(uint32_t ledgerSeq)
1213
{
1314
ZoneScoped;
1415
releaseAssert(mApp.getHistoryArchiveManager().publishEnabled());
16+
mPublishWasDisabled = false;
1517
if (!mOpen)
1618
{
1719
releaseAssert(!mTxResults);
1820
releaseAssert(!mTxs);
1921
releaseAssert(!mLedgerHeaders);
20-
// Don't start writing checkpoint until proper checkpoint boundary
21-
// This can occur if a node enabled publish mid-checkpoint
22-
if (mPublishWasDisabled && !HistoryManager::isFirstLedgerInCheckpoint(
23-
ledgerSeq, mApp.getConfig()))
24-
{
25-
return false;
26-
}
27-
28-
mPublishWasDisabled = false;
2922

3023
auto checkpoint = HistoryManager::checkpointContainingLedger(
3124
ledgerSeq, mApp.getConfig());
@@ -50,7 +43,6 @@ CheckpointBuilder::ensureOpen(uint32_t ledgerSeq)
5043
mLedgerHeaders->open(ledger.localPath_nogz_dirty());
5144
mOpen = true;
5245
}
53-
return true;
5446
}
5547

5648
void
@@ -135,11 +127,14 @@ CheckpointBuilder::appendTransactionSet(uint32_t ledgerSeq,
135127
throw std::runtime_error("Startup validation not performed");
136128
}
137129

138-
if (!ensureOpen(ledgerSeq))
130+
if (publishWasDisabled() &&
131+
!HistoryManager::isFirstLedgerInCheckpoint(ledgerSeq, mApp.getConfig()))
139132
{
140133
return;
141134
}
142135

136+
ensureOpen(ledgerSeq);
137+
143138
if (!resultSet.results.empty())
144139
{
145140
TransactionHistoryResultEntry results;
@@ -162,11 +157,14 @@ CheckpointBuilder::appendLedgerHeader(LedgerHeader const& header,
162157
throw std::runtime_error("Startup validation not performed");
163158
}
164159

165-
if (!ensureOpen(header.ledgerSeq))
160+
if (publishWasDisabled() && !HistoryManager::isFirstLedgerInCheckpoint(
161+
header.ledgerSeq, mApp.getConfig()))
166162
{
167163
return;
168164
}
169165

166+
ensureOpen(header.ledgerSeq);
167+
170168
LedgerHeaderHistoryEntry lhe;
171169
lhe.header = header;
172170
lhe.hash = xdrSha256(header);
@@ -239,7 +237,7 @@ CheckpointBuilder::cleanup(uint32_t lcl)
239237
fmt::format("Failed to delete next checkpoint file {}",
240238
next.localPath_nogz_dirty()));
241239
}
242-
return;
240+
return true;
243241
}
244242

245243
if (!fs::exists(ft.localPath_nogz_dirty()))
@@ -248,8 +246,7 @@ CheckpointBuilder::cleanup(uint32_t lcl)
248246
"Skipping recovery of file {}, does not exist. This can "
249247
"occur if publish was previously disabled.",
250248
ft.localPath_nogz_dirty());
251-
mPublishWasDisabled = true;
252-
return;
249+
return false;
253250
}
254251

255252
// Find a tmp file; any potentially invalid files _must_ be tmp files,
@@ -263,32 +260,24 @@ CheckpointBuilder::cleanup(uint32_t lcl)
263260
out.open(tmpFile.string());
264261
XDRInputFileStream in;
265262
in.open(ft.localPath_nogz_dirty());
266-
uint32_t lastReadLedgerSeq = 0;
263+
uint32_t lastWrittenLedgerSeq = 0;
267264
while (in)
268265
{
269266
try
270267
{
271268
if (!in.readOne(entry))
272269
{
273-
// If file doesn't end on LCL, it's corrupt
274-
if (enforceLCL && lastReadLedgerSeq != lcl)
275-
{
276-
throw std::runtime_error(
277-
fmt::format("Corrupt checkpoint file {}, ends "
278-
"on ledger {}, LCL is {}",
279-
ft.localPath_nogz_dirty(),
280-
getLedgerSeq(entry), lcl));
281-
}
282270
break;
283271
}
284-
lastReadLedgerSeq = getLedgerSeq(entry);
285-
if (lastReadLedgerSeq > lcl)
272+
auto seq = getLedgerSeq(entry);
273+
if (seq > lcl)
286274
{
287275
CLOG_INFO(History, "Truncating {} at ledger {}",
288276
ft.localPath_nogz_dirty(), lcl);
289277
break;
290278
}
291279
out.durableWriteOne(entry);
280+
lastWrittenLedgerSeq = seq;
292281
}
293282
catch (xdr::xdr_runtime_error const& e)
294283
{
@@ -300,6 +289,16 @@ CheckpointBuilder::cleanup(uint32_t lcl)
300289
break;
301290
}
302291
}
292+
293+
// Once we finished truncating, check that we ended on LCL.
294+
// If file doesn't end on LCL, it's corrupt
295+
if (enforceLCL && lastWrittenLedgerSeq != lcl)
296+
{
297+
throw std::runtime_error(fmt::format(
298+
"Corrupt checkpoint file {}, ends "
299+
"on ledger {}, LCL is {}",
300+
ft.localPath_nogz_dirty(), lastWrittenLedgerSeq, lcl));
301+
}
303302
}
304303

305304
if (!fs::durableRename(
@@ -308,13 +307,37 @@ CheckpointBuilder::cleanup(uint32_t lcl)
308307
{
309308
throw std::runtime_error("Failed to rename checkpoint file");
310309
}
310+
311+
return true;
311312
};
312313

313314
// We can only require ledger header to be at LCL; transactions and results
314315
// can have gaps (if there were empty ledgers)
315-
recover(res, TransactionHistoryResultEntry{}, /* enforceLCL */ false);
316-
recover(txs, TransactionHistoryEntry{}, /* enforceLCL */ false);
317-
recover(ledger, LedgerHeaderHistoryEntry{}, /* enforceLCL */ true);
316+
auto resExists =
317+
recover(res, TransactionHistoryResultEntry{}, /* enforceLCL */ false);
318+
auto txExists =
319+
recover(txs, TransactionHistoryEntry{}, /* enforceLCL */ false);
320+
auto headerExists =
321+
recover(ledger, LedgerHeaderHistoryEntry{}, /* enforceLCL */ true);
322+
323+
if (!resExists && !txExists && !headerExists)
324+
{
325+
mPublishWasDisabled = true;
326+
CLOG_INFO(History, "No checkpoint files found during recovery, likely "
327+
"publish was previously disabled");
328+
}
329+
else if (!(resExists && txExists && headerExists))
330+
{
331+
std::string errMsg =
332+
fmt::format("Some checkpoint files were not found during recovery, "
333+
"results={}, transactions={}, headers={}.",
334+
resExists, txExists, headerExists);
335+
CLOG_ERROR(History, "{}", errMsg);
336+
CLOG_ERROR(History, "Delete incomplete checkpoint files and restart.");
337+
CLOG_ERROR(History, "{}", REPORT_INTERNAL_BUG);
338+
throw std::runtime_error(errMsg);
339+
}
340+
318341
mStartupValidationComplete = true;
319342
}
320343
}

src/history/CheckpointBuilder.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class CheckpointBuilder
5959
bool mStartupValidationComplete{false};
6060
bool mPublishWasDisabled{false};
6161

62-
bool ensureOpen(uint32_t ledgerSeq);
62+
void ensureOpen(uint32_t ledgerSeq);
6363

6464
public:
6565
CheckpointBuilder(Application& app);
@@ -82,5 +82,11 @@ class CheckpointBuilder
8282
// Finalize checkpoint by renaming all temporary files to their canonical
8383
// names. No-op if files are already rotated.
8484
void checkpointComplete(uint32_t checkpoint);
85+
86+
bool
87+
publishWasDisabled() const
88+
{
89+
return mPublishWasDisabled;
90+
}
8591
};
8692
}

src/history/HistoryManagerImpl.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,14 @@ HistoryManagerImpl::maybeQueueHistoryCheckpoint(uint32_t lcl,
316316
return false;
317317
}
318318

319+
if (mCheckpointBuilder.publishWasDisabled())
320+
{
321+
CLOG_INFO(
322+
History,
323+
"Skipping incomplete checkpoint, publish was previously disabled");
324+
return false;
325+
}
326+
319327
queueCurrentHistory(lcl, ledgerVers);
320328
return true;
321329
}

0 commit comments

Comments
 (0)