Skip to content

Commit 2a4eef9

Browse files
authored
Merge pull request #21032 from jakesmith/HPCC-35929-dali-out-of-order
HPCC-35929 fix(dali): out of order Dali transactions Reviewed-by: Gavin Halliday <gavin.halliday@lexisnexisrisk.com> Merged-by: Gavin Halliday <gavin.halliday@lexisnexisrisk.com>
2 parents bc06089 + e62d37f commit 2a4eef9

File tree

1 file changed

+26
-31
lines changed

1 file changed

+26
-31
lines changed

dali/base/dasds.cpp

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,15 +1118,15 @@ class CTransactionItem : public CSimpleInterface
11181118

11191119
static constexpr unsigned defaultSaveThresholdSecs = 0; // disabled
11201120
static constexpr unsigned defaultDeltaSaveTransactionThreshold = 0; // disabled
1121-
static constexpr unsigned defaultDeltaMemMaxMB = 10;
1121+
static constexpr unsigned defaultDeltaMemMaxMB = 64; // 64MB a reasonably large default, to cope with large blobs being committed
11221122
static constexpr unsigned defaultDeltaTransactionQueueLimit = 10000;
11231123
class CDeltaWriter : implements IThreaded
11241124
{
11251125
IStoreHelper *iStoreHelper = nullptr;
11261126
StringBuffer dataPath;
11271127
StringBuffer backupPath;
11281128
unsigned transactionQueueLimit = defaultDeltaTransactionQueueLimit; // absolute limit, will block if this far behind
1129-
memsize_t transactionMaxMem = defaultDeltaMemMaxMB * 0x100000; // 10MB
1129+
memsize_t transactionMaxMem = defaultDeltaMemMaxMB * 0x100000;
11301130
unsigned totalQueueLimitHits = 0;
11311131
unsigned saveThresholdSecs = 0;
11321132
cycle_t lastSaveTime = 0;
@@ -1412,40 +1412,34 @@ class CDeltaWriter : implements IThreaded
14121412
// keep going whilst there's things pending
14131413
while (true)
14141414
{
1415-
CLeavableCriticalBlock b(pendingCrit);
1416-
std::queue<Owned<CTransactionItem>> todo = std::move(pending);
1417-
if (0 == todo.size())
1415+
// NB: ensure consistent lock ordering of blockedSaveCrit and pendingCrit
1416+
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
1417+
std::queue<Owned<CTransactionItem>> todo;
14181418
{
1419-
if (writeRequested)
1419+
CriticalBlock b(pendingCrit);
1420+
todo = std::move(pending);
1421+
if (0 == todo.size())
14201422
{
1421-
// NB: if here, implies someone signalled via requestAsyncWrite()
1423+
if (writeRequested)
1424+
{
1425+
// NB: if here, implies someone signalled via requestAsyncWrite()
14221426

1423-
// if reason we're here is because sem timedout, consume the signal that was sent
1424-
if (semTimedout)
1425-
pendingTransactionsSem.wait();
1427+
// if reason we're here is because sem timedout, consume the signal that was sent
1428+
if (semTimedout)
1429+
pendingTransactionsSem.wait();
14261430

1427-
writeRequested = false;
1428-
if (signalWhenAllWritten) // can only be true if writeRequested was true
1429-
{
1430-
signalWhenAllWritten = false;
1431-
allWrittenSem.signal();
1431+
writeRequested = false;
1432+
if (signalWhenAllWritten) // can only be true if writeRequested was true
1433+
{
1434+
signalWhenAllWritten = false;
1435+
allWrittenSem.signal();
1436+
}
14321437
}
1438+
break;
14331439
}
1434-
break;
1440+
pendingSz = 0;
14351441
}
1436-
pendingSz = 0;
14371442

1438-
b.leave(); // NB: addToQueue could add to pending between here and regaining lock below
1439-
// NB: ensure consistent lock ordering of blockedSaveCrit and pendingCrit
1440-
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
1441-
b.enter();
1442-
// check if new items added in window above
1443-
while (pending.size())
1444-
{
1445-
todo.push(std::move(pending.front()));
1446-
pending.pop();
1447-
}
1448-
b.leave();
14491443
// Because blockedSaveCrit is held, it will also block 'synchronous save' (see addToQueue)
14501444
// i.e. if stuck here, the transactions will start building up, and trigger a 'Forced synchronous save',
14511445
// which will in turn block. This must complete!
@@ -2227,14 +2221,15 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree)
22272221
void CDeltaWriter::addToQueue(CTransactionItem *item)
22282222
{
22292223
bool needsSyncSave = false;
2224+
size_t items;
22302225
{
22312226
CriticalBlock b(pendingCrit);
22322227
pending.push(item);
22332228
// add actual size for externals, and nominal '100 byte' value for delta transactions
22342229
// it will act. as a rough guide to appoaching size threshold. It is not worth
22352230
// synchronously preparing and serializing here (which will be done asynchronously later)
22362231
pendingSz += (CTransactionItem::f_addext == item->type) ? item->ext.dataLength : 100;
2237-
size_t items = pending.size();
2232+
items = pending.size();
22382233
if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit))
22392234
{
22402235
if (lastSaveTime && ((get_cycles_now() - lastSaveTime) < thresholdDuration))
@@ -2254,12 +2249,12 @@ void CDeltaWriter::addToQueue(CTransactionItem *item)
22542249
++totalQueueLimitHits;
22552250
// force a synchronous save
22562251
CCycleTimer timer;
2257-
PROGLOG("Forcing synchronous save of %u transactions (pendingSz=%zu)", (unsigned)pending.size(), pendingSz);
2252+
PROGLOG("Forcing synchronous save of %u transactions (pendingSz=%zu)", (unsigned)items, pendingSz);
22582253

22592254
// NB: ensure consistent lock ordering of blockedSaveCrit and pendingCrit
22602255
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
22612256
CriticalBlock b(pendingCrit);
2262-
size_t items = pending.size();
2257+
items = pending.size();
22632258
// NB: items could be 0. It's possible that the delta writer has caught up and flushed pending already
22642259
if (items && save(pending)) // if temporarily blocked, continue, meaning queue limit will overrun a bit (blocking window is short)
22652260
{

0 commit comments

Comments
 (0)