Skip to content

Commit f559445

Browse files
Additional safety around blob lease renewal (#14)
* Initial implementation * Cleanup for compiler * Fixed tests * Update src/AVEVA/RocksDB/Plugin/Azure/Impl/BlobFilesystemImpl.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Use steady clock instead of system clock * Catch const std::exception * Make EnsureLiveness private * Move member variables and clean up assert * Moved lease length check * Only sleep if needs retry --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 7d42b93 commit f559445

File tree

3 files changed

+116
-28
lines changed

3 files changed

+116
-28
lines changed

include/AVEVA/RocksDB/Plugin/Azure/Impl/BlobFilesystemImpl.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <mutex>
2525
#include <vector>
2626
#include <optional>
27+
#include <source_location>
2728
namespace AVEVA::RocksDB::Plugin::Azure::Impl
2829
{
2930
class BlobFilesystemImpl
@@ -47,6 +48,7 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
4748
std::unordered_map<std::string, std::shared_ptr<Core::FileCache>, Core::StringHash, Core::StringEqual> m_fileCaches;
4849
std::mutex m_lockFilesMutex;
4950
std::vector<std::shared_ptr<LockFileImpl>> m_locks;
51+
std::stop_source m_filesystemStopSource;
5052
std::jthread m_lockRenewalThread;
5153

5254
public:
@@ -118,5 +120,6 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
118120
BlobFilesystemImpl(std::shared_ptr<boost::log::sources::severity_logger_mt<boost::log::trivial::severity_level>>&& logger, int64_t dataFileInitialSize = 0, int64_t dataFileBufferSize = 0);
119121
[[nodiscard]] const ::Azure::Storage::Blobs::BlobContainerClient& GetContainer(std::string_view prefix) const;
120122
void RenewLease(std::stop_token stopToken);
123+
void EnsureLiveness(std::source_location location = std::source_location::current()) const;
121124
};
122125
}

src/AVEVA/RocksDB/Plugin/Azure/Impl/BlobFilesystemImpl.cpp

Lines changed: 112 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
#include "AVEVA/RocksDB/Plugin/Azure/Impl/PageBlob.hpp"
1212

1313
#include <azure/storage/blobs.hpp>
14+
15+
using boost::log::trivial::severity_level;
16+
1417
namespace AVEVA::RocksDB::Plugin::Azure::Impl
1518
{
1619
BlobFilesystemImpl::BlobFilesystemImpl(const std::string& name,
@@ -189,6 +192,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
189192

190193
ReadableFileImpl BlobFilesystemImpl::CreateReadableFile(const std::string& filePath)
191194
{
195+
EnsureLiveness();
196+
192197
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
193198
const auto& container = GetContainer(prefix);
194199
auto pageBlobClient = container.GetPageBlobClient(std::string(realPath));
@@ -206,6 +211,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
206211

207212
WriteableFileImpl BlobFilesystemImpl::CreateWriteableFile(const std::string& filePath)
208213
{
214+
EnsureLiveness();
215+
209216
const auto fileType = Core::RocksDBHelpers::GetFileType(filePath);
210217
const auto isData = fileType == Core::RocksDBHelpers::FileClass::WAL ||
211218
fileType == Core::RocksDBHelpers::FileClass::SST;
@@ -246,6 +253,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
246253

247254
ReadWriteFileImpl BlobFilesystemImpl::CreateReadWriteFile(const std::string& filePath)
248255
{
256+
EnsureLiveness();
257+
249258
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
250259
const auto& container = GetContainer(prefix);
251260

@@ -267,6 +276,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
267276

268277
WriteableFileImpl BlobFilesystemImpl::ReopenWriteableFile(const std::string& filePath)
269278
{
279+
EnsureLiveness();
280+
270281
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
271282
const auto& container = GetContainer(prefix);
272283
const auto fileType = Core::RocksDBHelpers::GetFileType(filePath);
@@ -289,6 +300,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
289300

290301
WriteableFileImpl BlobFilesystemImpl::ReuseWritableFile(const std::string& filePath)
291302
{
303+
EnsureLiveness();
304+
292305
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
293306
const auto& container = GetContainer(prefix);
294307
const auto fileType = Core::RocksDBHelpers::GetFileType(filePath);
@@ -316,6 +329,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
316329

317330
LoggerImpl BlobFilesystemImpl::CreateLogger(const std::string& filePath, const int logLevel)
318331
{
332+
EnsureLiveness();
333+
319334
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
320335
const auto& container = GetContainer(prefix);
321336

@@ -329,6 +344,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
329344

330345
std::shared_ptr<LockFileImpl> BlobFilesystemImpl::LockFile(const std::string& filePath)
331346
{
347+
EnsureLiveness();
348+
332349
std::scoped_lock _(m_lockFilesMutex);
333350

334351
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
@@ -337,9 +354,9 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
337354
auto client = std::make_unique<::Azure::Storage::Blobs::PageBlobClient>(container.GetPageBlobClient(std::string(realPath)));
338355
client->CreateIfNotExists(Configuration::PageBlob::DefaultSize);
339356
auto lockFile = std::make_shared<LockFileImpl>(std::move(client));
340-
m_locks.push_back(lockFile);
341357
if (lockFile->Lock())
342358
{
359+
m_locks.push_back(lockFile);
343360
return lockFile;
344361
}
345362
else
@@ -350,6 +367,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
350367

351368
bool BlobFilesystemImpl::UnlockFile(const LockFileImpl& lock)
352369
{
370+
EnsureLiveness();
371+
353372
std::scoped_lock _(m_lockFilesMutex);
354373
const auto it = std::find_if(m_locks.cbegin(),
355374
m_locks.cend(),
@@ -372,12 +391,16 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
372391

373392
DirectoryImpl BlobFilesystemImpl::CreateDirectory(const std::string& directoryPath)
374393
{
394+
EnsureLiveness();
395+
375396
const auto [prefix, realPath] = StorageAccount::StripPrefix(directoryPath);
376397
return DirectoryImpl{ GetContainer(prefix), realPath };
377398
}
378399

379400
bool BlobFilesystemImpl::FileExists(const std::string& name)
380401
{
402+
EnsureLiveness();
403+
381404
const auto [prefix, realPath] = StorageAccount::StripPrefix(name);
382405
const auto& container = GetContainer(prefix);
383406

@@ -410,6 +433,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
410433

411434
std::vector<std::string> BlobFilesystemImpl::GetChildren(const std::string& directoryPath, int32_t sizeHint)
412435
{
436+
EnsureLiveness();
437+
413438
const auto [prefix, realPath] = StorageAccount::StripPrefix(directoryPath);
414439
const auto& container = GetContainer(prefix);
415440

@@ -461,6 +486,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
461486

462487
std::vector<BlobAttributes> BlobFilesystemImpl::GetChildrenFileAttributes(const std::string& directoryPath)
463488
{
489+
EnsureLiveness();
490+
464491
const auto [prefix, realPath] = StorageAccount::StripPrefix(directoryPath);
465492
const auto& container = GetContainer(prefix);
466493
std::vector<BlobAttributes> attributes;
@@ -493,6 +520,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
493520

494521
bool BlobFilesystemImpl::DeleteFile(const std::string& filePath) const
495522
{
523+
EnsureLiveness();
524+
496525
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
497526
const auto& container = GetContainer(prefix);
498527
const auto client = container.GetPageBlobClient(std::string(realPath));
@@ -509,6 +538,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
509538

510539
size_t BlobFilesystemImpl::DeleteDir(const std::string& directoryPath) const
511540
{
541+
EnsureLiveness();
542+
512543
const auto [prefix, realPath] = StorageAccount::StripPrefix(directoryPath);
513544
const auto& container = GetContainer(prefix);
514545

@@ -559,6 +590,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
559590

560591
void BlobFilesystemImpl::Truncate(const std::string& filePath, int64_t size) const
561592
{
593+
EnsureLiveness();
594+
562595
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
563596
const auto& container = GetContainer(prefix);
564597

@@ -573,6 +606,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
573606

574607
int64_t BlobFilesystemImpl::GetFileSize(const std::string& filePath) const
575608
{
609+
EnsureLiveness();
610+
576611
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
577612
const auto& container = GetContainer(prefix);
578613

@@ -582,6 +617,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
582617

583618
uint64_t BlobFilesystemImpl::GetFileModificationTime(const std::string& filePath) const
584619
{
620+
EnsureLiveness();
621+
585622
const auto [prefix, realPath] = StorageAccount::StripPrefix(filePath);
586623
const auto& container = GetContainer(prefix);
587624

@@ -593,12 +630,16 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
593630

594631
size_t BlobFilesystemImpl::GetLeaseClientCount()
595632
{
633+
EnsureLiveness();
634+
596635
std::scoped_lock _(m_lockFilesMutex);
597636
return m_locks.size();
598637
}
599638

600639
void BlobFilesystemImpl::RenameFile(const std::string& fromFilePath, const std::string& toFilePath) const
601640
{
641+
EnsureLiveness();
642+
602643
const auto [prefixAccountFrom, realPathFrom] = StorageAccount::StripPrefix(fromFilePath);
603644
const auto [prefixAccountTo, realPathTo] = StorageAccount::StripPrefix(toFilePath);
604645
if (prefixAccountFrom != prefixAccountTo)
@@ -708,48 +749,92 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
708749

709750
void BlobFilesystemImpl::RenewLease(std::stop_token stopToken)
710751
{
711-
while (!stopToken.stop_requested())
752+
BOOST_LOG_SEV(*m_logger, severity_level::info) << "Starting blob lease renewal thread";
753+
auto startTime = std::chrono::steady_clock::now();
754+
try
712755
{
756+
while (!stopToken.stop_requested())
713757
{
714-
std::scoped_lock lock(m_lockFilesMutex);
715758
if (stopToken.stop_requested())
716759
{
717760
break;
718761
}
719762

720-
std::vector<LockFileImpl*> needsRetry;
721-
needsRetry.reserve(m_locks.size());
722-
for (const auto& lockPtr : m_locks)
723763
{
724-
needsRetry.push_back(lockPtr.get());
725-
}
764+
std::vector<LockFileImpl*> needsRetry;
765+
std::scoped_lock lock(m_lockFilesMutex);
766+
needsRetry.reserve(m_locks.size());
767+
for (const auto& lockPtr : m_locks)
768+
{
769+
needsRetry.push_back(lockPtr.get());
770+
}
726771

727-
int retries = 0;
728-
while (needsRetry.size() > 0 && retries < 5 && !stopToken.stop_requested())
729-
{
730-
std::erase_if(needsRetry, [](const auto* client) -> bool
772+
int retries = 0;
773+
while (needsRetry.size() > 0 && retries < 5 && !stopToken.stop_requested())
774+
{
775+
const auto timeElapsed = std::chrono::steady_clock::now() - startTime;
776+
if (timeElapsed >= Configuration::LeaseLength)
731777
{
732-
try
733-
{
734-
client->Renew();
735-
return true;
736-
}
737-
catch (...)
778+
throw std::runtime_error("Lease length time exceeded. Unsafe to continue");
779+
}
780+
781+
std::erase_if(needsRetry, [this](const auto* client) -> bool
738782
{
783+
try
784+
{
785+
client->Renew();
786+
return true;
787+
}
788+
catch (const std::exception& e)
789+
{
790+
BOOST_LOG_SEV(*m_logger, severity_level::error) << "Failed to renew lease for blob " << e.what();
791+
}
792+
catch (...)
793+
{
794+
BOOST_LOG_SEV(*m_logger, severity_level::error) << "Failed to renew lease for blob ";
795+
}
796+
739797
return false;
740-
}
741-
});
798+
});
742799

743-
retries++;
800+
retries++;
801+
if (needsRetry.size() > 0)
802+
{
803+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
804+
}
805+
}
744806
}
745-
}
746807

747-
// Sleep AFTER doing work - use VERY small increments for testing
748-
// 10000 iterations * 1ms = 10 seconds total
749-
for (int i = 0; i < 10000 && !stopToken.stop_requested(); ++i)
750-
{
751-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
808+
startTime = std::chrono::steady_clock::now();
809+
static const constexpr auto sleepInterval = std::chrono::milliseconds(100);
810+
static const constexpr auto maxSleepIterations = 100;
811+
static_assert(sleepInterval * maxSleepIterations == Configuration::RenewalDelay);
812+
for (int i = 0; i < maxSleepIterations && !stopToken.stop_requested(); ++i)
813+
{
814+
std::this_thread::sleep_for(sleepInterval);
815+
}
752816
}
753817
}
818+
catch (const std::exception &e)
819+
{
820+
BOOST_LOG_SEV(*m_logger, severity_level::fatal) << "Stopping renewal thread " << e.what();
821+
m_filesystemStopSource.request_stop();
822+
}
823+
catch (...)
824+
{
825+
BOOST_LOG_SEV(*m_logger, severity_level::fatal) << "Stopping renewal thread";
826+
m_filesystemStopSource.request_stop();
827+
}
828+
829+
BOOST_LOG_SEV(*m_logger, severity_level::info) << "Exiting blob lease renewal thread";
830+
}
831+
832+
void BlobFilesystemImpl::EnsureLiveness(const std::source_location location) const
833+
{
834+
if (m_filesystemStopSource.stop_requested())
835+
{
836+
BOOST_LOG_SEV(*m_logger, severity_level::fatal) << "Unable to ensure safe database access when attempting to call " << location.function_name();
837+
throw std::runtime_error("Unable to ensure safe database access");
838+
}
754839
}
755840
}

vcpkg-configuration.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"default-registry": {
33
"kind": "git",
4-
"baseline": "6353ece09f6cebc34413ea04e69beab3fece73c2",
4+
"baseline": "9401ee272060dae4c80f530a04b8fbec143c8d21",
55
"repository": "https://github.com/microsoft/vcpkg"
66
},
77
"registries": [

0 commit comments

Comments
 (0)