@@ -1119,15 +1119,15 @@ class CTransactionItem : public CSimpleInterface
11191119
11201120static constexpr unsigned defaultSaveThresholdSecs = 0 ; // disabled
11211121static constexpr unsigned defaultDeltaSaveTransactionThreshold = 0 ; // disabled
1122- static constexpr unsigned defaultDeltaMemMaxMB = 10 ;
1122+ static constexpr unsigned defaultDeltaMemMaxMB = 64 ; // 64MB a reasonably large default, to cope with large blobs being committed
11231123static constexpr unsigned defaultDeltaTransactionQueueLimit = 10000 ;
11241124class CDeltaWriter : implements IThreaded
11251125{
11261126 IStoreHelper *iStoreHelper = nullptr ;
11271127 StringBuffer dataPath;
11281128 StringBuffer backupPath;
11291129 unsigned transactionQueueLimit = defaultDeltaTransactionQueueLimit; // absolute limit, will block if this far behind
1130- memsize_t transactionMaxMem = defaultDeltaMemMaxMB * 0x100000 ; // 10MB
1130+ memsize_t transactionMaxMem = defaultDeltaMemMaxMB * 0x100000 ;
11311131 unsigned totalQueueLimitHits = 0 ;
11321132 unsigned saveThresholdSecs = 0 ;
11331133 cycle_t lastSaveTime = 0 ;
@@ -1414,40 +1414,34 @@ class CDeltaWriter : implements IThreaded
14141414 // keep going whilst there's things pending
14151415 while (true )
14161416 {
1417- CLeavableCriticalBlock b ( pendingCrit);
1418- std::queue<Owned<CTransactionItem>> todo = std::move (pending);
1419- if ( 0 == todo. size ())
1417+ // NB: ensure consistent lock ordering of blockedSaveCrit and pendingCrit
1418+ CHECKEDCRITICALBLOCK (blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
1419+ std::queue<Owned<CTransactionItem>> todo;
14201420 {
1421- if (writeRequested)
1421+ CriticalBlock b (pendingCrit);
1422+ todo = std::move (pending);
1423+ if (0 == todo.size ())
14221424 {
1423- // NB: if here, implies someone signalled via requestAsyncWrite()
1425+ if (writeRequested)
1426+ {
1427+ // NB: if here, implies someone signalled via requestAsyncWrite()
14241428
1425- // if reason we're here is because sem timedout, consume the signal that was sent
1426- if (semTimedout)
1427- pendingTransactionsSem.wait ();
1429+ // if reason we're here is because sem timedout, consume the signal that was sent
1430+ if (semTimedout)
1431+ pendingTransactionsSem.wait ();
14281432
1429- writeRequested = false ;
1430- if (signalWhenAllWritten) // can only be true if writeRequested was true
1431- {
1432- signalWhenAllWritten = false ;
1433- allWrittenSem.signal ();
1433+ writeRequested = false ;
1434+ if (signalWhenAllWritten) // can only be true if writeRequested was true
1435+ {
1436+ signalWhenAllWritten = false ;
1437+ allWrittenSem.signal ();
1438+ }
14341439 }
1440+ break ;
14351441 }
1436- break ;
1442+ pendingSz = 0 ;
14371443 }
1438- pendingSz = 0 ;
14391444
1440- b.leave (); // NB: addToQueue could add to pending between here and regaining lock below
1441- // NB: ensure consistent lock ordering of blockedSaveCrit and pendingCrit
1442- CHECKEDCRITICALBLOCK (blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
1443- b.enter ();
1444- // check if new items added in window above
1445- while (pending.size ())
1446- {
1447- todo.push (std::move (pending.front ()));
1448- pending.pop ();
1449- }
1450- b.leave ();
14511445 // Because blockedSaveCrit is held, it will also block 'synchronous save' (see addToQueue)
14521446 // i.e. if stuck here, the transactions will start building up, and trigger a 'Forced synchronous save',
14531447 // which will in turn block. This must complete!
@@ -2266,14 +2260,15 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree)
22662260void CDeltaWriter::addToQueue (CTransactionItem *item)
22672261{
22682262 bool needsSyncSave = false ;
2263+ size_t items;
22692264 {
22702265 CriticalBlock b (pendingCrit);
22712266 pending.push (item);
22722267 // add actual size for externals, and nominal '100 byte' value for delta transactions
22732268 // it will act. as a rough guide to appoaching size threshold. It is not worth
22742269 // synchronously preparing and serializing here (which will be done asynchronously later)
22752270 pendingSz += (CTransactionItem::f_addext == item->type ) ? item->ext .dataLength : 100 ;
2276- size_t items = pending.size ();
2271+ items = pending.size ();
22772272 if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit))
22782273 {
22792274 if (lastSaveTime && ((get_cycles_now () - lastSaveTime) < thresholdDuration))
@@ -2293,12 +2288,12 @@ void CDeltaWriter::addToQueue(CTransactionItem *item)
22932288 ++totalQueueLimitHits;
22942289 // force a synchronous save
22952290 CCycleTimer timer;
2296- PROGLOG (" Forcing synchronous save of %u transactions (pendingSz=%zu)" , (unsigned )pending. size () , pendingSz);
2291+ PROGLOG (" Forcing synchronous save of %u transactions (pendingSz=%zu)" , (unsigned )items , pendingSz);
22972292
22982293 // NB: ensure consistent lock ordering of blockedSaveCrit and pendingCrit
22992294 CHECKEDCRITICALBLOCK (blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
23002295 CriticalBlock b (pendingCrit);
2301- size_t items = pending.size ();
2296+ items = pending.size ();
23022297 // NB: items could be 0. It's possible that the delta writer has caught up and flushed pending already
23032298 if (items && save (pending)) // if temporarily blocked, continue, meaning queue limit will overrun a bit (blocking window is short)
23042299 {
0 commit comments