Skip to content

Commit 052aab7

Browse files
Simplify timing in Renewal thread (#16)
* Initial implementation * Cleanup for compiler * Fixed tests * Update src/AVEVA/RocksDB/Plugin/Azure/Impl/BlobFilesystemImpl.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Use steady clock instead of system clock * Catch const std::exception * Make EnsureLiveness private * Move member variables and clean up assert * Moved lease length check * Only sleep if needs retry * Simplified renewal timing logic * Remove dev tests * Update src/AVEVA/RocksDB/Plugin/Azure/Impl/LockFileImpl.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Hold lock the whole time renewal is occuring --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent f559445 commit 052aab7

File tree

4 files changed

+80
-34
lines changed

4 files changed

+80
-34
lines changed

include/AVEVA/RocksDB/Plugin/Azure/Impl/Configuration.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
1717
};
1818

1919
static const constexpr std::chrono::seconds LeaseLength = std::chrono::seconds(20);
20-
static const constexpr std::chrono::seconds RenewalDelay = std::chrono::seconds(10);
20+
static const constexpr std::chrono::seconds RenewalDelay = std::chrono::seconds(5);
2121
static const constexpr size_t MaxCacheSize = static_cast<size_t>(1024) * 1024 * 1024; // 1GB
2222
static const constexpr int MaxClientRetries = 8;
2323
};

include/AVEVA/RocksDB/Plugin/Azure/Impl/LockFileImpl.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,23 @@
55
#include <azure/storage/blobs/page_blob_client.hpp>
66
#include <azure/storage/blobs/blob_lease_client.hpp>
77
#include <memory>
8+
#include <chrono>
89
namespace AVEVA::RocksDB::Plugin::Azure::Impl
910
{
1011
class LockFileImpl
1112
{
1213
std::unique_ptr<::Azure::Storage::Blobs::PageBlobClient> m_file;
1314
std::unique_ptr<::Azure::Storage::Blobs::BlobLeaseClient> m_lease = nullptr;
15+
mutable std::chrono::steady_clock::time_point m_lastRenewalTime;
16+
std::chrono::seconds m_leaseLength;
1417

1518
public:
16-
LockFileImpl(std::unique_ptr<::Azure::Storage::Blobs::PageBlobClient> file);
19+
LockFileImpl(std::unique_ptr<::Azure::Storage::Blobs::PageBlobClient> file, std::chrono::seconds leaseLength);
1720
bool Lock();
1821
void Renew() const;
1922
void Unlock();
23+
24+
[[nodiscard]] std::chrono::seconds TimeSinceLastRenewal() const;
25+
[[nodiscard]] bool HasExceededLeaseLength() const;
2026
};
2127
}

src/AVEVA/RocksDB/Plugin/Azure/Impl/BlobFilesystemImpl.cpp

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
353353

354354
auto client = std::make_unique<::Azure::Storage::Blobs::PageBlobClient>(container.GetPageBlobClient(std::string(realPath)));
355355
client->CreateIfNotExists(Configuration::PageBlob::DefaultSize);
356-
auto lockFile = std::make_shared<LockFileImpl>(std::move(client));
356+
auto lockFile = std::make_shared<LockFileImpl>(std::move(client), Configuration::LeaseLength);
357357
if (lockFile->Lock())
358358
{
359359
m_locks.push_back(lockFile);
@@ -750,7 +750,6 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
750750
void BlobFilesystemImpl::RenewLease(std::stop_token stopToken)
751751
{
752752
BOOST_LOG_SEV(*m_logger, severity_level::info) << "Starting blob lease renewal thread";
753-
auto startTime = std::chrono::steady_clock::now();
754753
try
755754
{
756755
while (!stopToken.stop_requested())
@@ -760,38 +759,51 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
760759
break;
761760
}
762761

762+
// Sleep before attempting renewal
763+
static const constexpr auto sleepInterval = std::chrono::milliseconds(100);
764+
static const constexpr auto maxSleepIterations = 50;
765+
static_assert(sleepInterval * maxSleepIterations == Configuration::RenewalDelay);
766+
for (int i = 0; i < maxSleepIterations && !stopToken.stop_requested(); ++i)
767+
{
768+
std::this_thread::sleep_for(sleepInterval);
769+
}
770+
771+
if (stopToken.stop_requested())
772+
{
773+
break;
774+
}
775+
763776
{
764-
std::vector<LockFileImpl*> needsRetry;
765777
std::scoped_lock lock(m_lockFilesMutex);
778+
std::vector<LockFileImpl*> needsRetry;
766779
needsRetry.reserve(m_locks.size());
767-
for (const auto& lockPtr : m_locks)
780+
for (const auto& l : m_locks)
768781
{
769-
needsRetry.push_back(lockPtr.get());
782+
needsRetry.push_back(l.get());
770783
}
771784

785+
// Attempt to renew all locks with retries
772786
int retries = 0;
773787
while (needsRetry.size() > 0 && retries < 5 && !stopToken.stop_requested())
774788
{
775-
const auto timeElapsed = std::chrono::steady_clock::now() - startTime;
776-
if (timeElapsed >= Configuration::LeaseLength)
777-
{
778-
throw std::runtime_error("Lease length time exceeded. Unsafe to continue");
779-
}
780-
781-
std::erase_if(needsRetry, [this](const auto* client) -> bool
789+
std::erase_if(needsRetry, [this](const auto& client) -> bool
782790
{
783791
try
784792
{
785793
client->Renew();
786794
return true;
787795
}
788-
catch (const std::exception& e)
796+
catch (const ::Azure::Core::RequestFailedException& e)
789797
{
790-
BOOST_LOG_SEV(*m_logger, severity_level::error) << "Failed to renew lease for blob " << e.what();
791-
}
792-
catch (...)
793-
{
794-
BOOST_LOG_SEV(*m_logger, severity_level::error) << "Failed to renew lease for blob ";
798+
if (e.StatusCode == ::Azure::Core::Http::HttpStatusCode::Conflict)
799+
{
800+
BOOST_LOG_SEV(*m_logger, severity_level::error) << "Failed to renew lease due to conflict, lease might be expired: " << e.what();
801+
throw;
802+
}
803+
else
804+
{
805+
BOOST_LOG_SEV(*m_logger, severity_level::error) << "Failed to renew lease: " << e.what();
806+
}
795807
}
796808

797809
return false;
@@ -804,15 +816,6 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
804816
}
805817
}
806818
}
807-
808-
startTime = std::chrono::steady_clock::now();
809-
static const constexpr auto sleepInterval = std::chrono::milliseconds(100);
810-
static const constexpr auto maxSleepIterations = 100;
811-
static_assert(sleepInterval * maxSleepIterations == Configuration::RenewalDelay);
812-
for (int i = 0; i < maxSleepIterations && !stopToken.stop_requested(); ++i)
813-
{
814-
std::this_thread::sleep_for(sleepInterval);
815-
}
816819
}
817820
}
818821
catch (const std::exception &e)

src/AVEVA/RocksDB/Plugin/Azure/Impl/LockFileImpl.cpp

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,15 @@
77
#include <azure/storage/blobs.hpp>
88

99
#include <cassert>
10+
#include <optional>
11+
#include <stdexcept>
12+
#include <string>
1013
namespace AVEVA::RocksDB::Plugin::Azure::Impl
1114
{
12-
LockFileImpl::LockFileImpl(std::unique_ptr<::Azure::Storage::Blobs::PageBlobClient> file)
13-
: m_file(std::move(file))
15+
LockFileImpl::LockFileImpl(std::unique_ptr<::Azure::Storage::Blobs::PageBlobClient> file, std::chrono::seconds leaseLength)
16+
: m_file(std::move(file)),
17+
m_lastRenewalTime(std::chrono::steady_clock::now()),
18+
m_leaseLength(leaseLength)
1419
{
1520
}
1621

@@ -25,13 +30,13 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
2530
auto start = std::chrono::high_resolution_clock::now();
2631
auto end = std::chrono::high_resolution_clock::now();
2732
std::optional<std::exception> ex;
28-
while ((end - start) < Configuration::LeaseLength)
33+
while ((end - start) < m_leaseLength)
2934
{
3035
try
3136
{
3237
const auto leaseId = ::Azure::Storage::Blobs::BlobLeaseClient::CreateUniqueLeaseId();
3338
m_lease = std::make_unique<::Azure::Storage::Blobs::BlobLeaseClient>(*m_file, leaseId);
34-
const auto response = m_lease->Acquire(Configuration::LeaseLength);
39+
const auto response = m_lease->Acquire(m_leaseLength);
3540
assert(response.Value.LeaseId == leaseId);
3641

3742
ex.reset();
@@ -59,17 +64,49 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
5964
return false;
6065
}
6166

67+
// Set the initial renewal time when lock is acquired
68+
m_lastRenewalTime = std::chrono::steady_clock::now();
6269
return true;
6370
}
6471

6572
void LockFileImpl::Renew() const
6673
{
67-
m_lease->Renew();
74+
if (m_lease == nullptr)
75+
{
76+
throw std::runtime_error("Cannot renew lease that has not been acquired");
77+
}
78+
79+
if (HasExceededLeaseLength())
80+
{
81+
const auto timeSinceRenewal = TimeSinceLastRenewal();
82+
throw std::runtime_error("Cannot renew expired lease. Time since last renewal: " +
83+
std::to_string(timeSinceRenewal.count()) + " seconds (max: " +
84+
std::to_string(m_leaseLength.count()) + " seconds)");
85+
}
86+
87+
[[maybe_unused]] const auto result = m_lease->Renew();
88+
m_lastRenewalTime = std::chrono::steady_clock::now();
6889
}
6990

7091
void LockFileImpl::Unlock()
7192
{
93+
if (m_lease == nullptr)
94+
{
95+
throw std::runtime_error("Cannot release lease that has not been acquired");
96+
}
97+
7298
m_lease->Release();
7399
m_lease.reset();
74100
}
101+
102+
std::chrono::seconds LockFileImpl::TimeSinceLastRenewal() const
103+
{
104+
return std::chrono::duration_cast<std::chrono::seconds>(
105+
std::chrono::steady_clock::now() - m_lastRenewalTime);
106+
}
107+
108+
bool LockFileImpl::HasExceededLeaseLength() const
109+
{
110+
return TimeSinceLastRenewal() >= m_leaseLength;
111+
}
75112
}

0 commit comments

Comments
 (0)