Skip to content

Commit 7d42b93

Browse files
authored
Refresh ReadableFile metadata on EOF and ETag precondition failures (#13)
1 parent f9f70b0 commit 7d42b93

File tree

10 files changed

+283
-74
lines changed

10 files changed

+283
-74
lines changed

include/AVEVA/RocksDB/Plugin/Azure/Impl/PageBlob.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#pragma once
55
#include "AVEVA/RocksDB/Plugin/Core/BlobClient.hpp"
66
#include <azure/storage/blobs/page_blob_client.hpp>
7+
#include <azure/core/etag.hpp>
78
namespace AVEVA::RocksDB::Plugin::Azure::Impl
89
{
910
class PageBlob final : public Core::BlobClient
@@ -19,6 +20,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
1920
virtual void SetCapacity(int64_t capacity) override;
2021
virtual void DownloadTo(const std::string& path, int64_t offset, int64_t length) override;
2122
virtual int64_t DownloadTo(std::span<char> buffer, int64_t blobOffset, int64_t readLength) override;
23+
virtual int64_t Download(std::span<char> buffer, int64_t blobOffset, int64_t readLength, const ::Azure::ETag& ifMatch) override;
2224
virtual void UploadPages(const std::span<char> buffer, int64_t blobOffset) override;
25+
virtual ::Azure::ETag GetEtag() override;
2326
};
2427
}

include/AVEVA/RocksDB/Plugin/Azure/Impl/ReadableFileImpl.hpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include "AVEVA/RocksDB/Plugin/Core/FileCache.hpp"
66
#include "AVEVA/RocksDB/Plugin/Core/BlobClient.hpp"
77

8+
#include <boost/log/trivial.hpp>
9+
810
#include <cstdint>
911
#include <string>
1012
#include <string_view>
@@ -17,12 +19,17 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
1719
std::shared_ptr<Core::BlobClient> m_blobClient;
1820
std::shared_ptr<Core::FileCache> m_fileCache;
1921
int64_t m_offset;
20-
int64_t m_size;
22+
mutable int64_t m_size;
23+
mutable ::Azure::ETag m_etag;
24+
std::shared_ptr<boost::log::sources::severity_logger_mt<boost::log::trivial::severity_level>> m_logger;
25+
26+
int64_t DownloadWithRetry(const int64_t offset, const int64_t bytesToRead, char* buffer) const;
2127

2228
public:
2329
ReadableFileImpl(std::string_view name,
2430
std::shared_ptr<Core::BlobClient> blobClient,
25-
std::shared_ptr<Core::FileCache> fileCache);
31+
std::shared_ptr<Core::FileCache> fileCache,
32+
std::shared_ptr<boost::log::sources::severity_logger_mt<boost::log::trivial::severity_level>> logger);
2633

2734
// NOTE: Increments m_offset
2835
[[nodiscard]] int64_t SequentialRead(int64_t bytesToRead, char* buffer);
@@ -33,5 +40,6 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
3340
int64_t GetOffset() const;
3441
void Skip(int64_t n);
3542
int64_t GetSize() const;
43+
void RefreshBlobMetadata() const;
3644
};
3745
}

include/AVEVA/RocksDB/Plugin/Core/BlobClient.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-FileCopyrightText: Copyright 2025 AVEVA
33

44
#pragma once
5+
#include <azure/core/etag.hpp>
56
#include <cstdint>
67
#include <string>
78
#include <span>
@@ -59,5 +60,21 @@ namespace AVEVA::RocksDB::Plugin::Core
5960
/// <param name="buffer">A span containing the page data to upload.</param>
6061
/// <param name="blobOffset">The offset within the blob where the data should be uploaded.</param>
6162
virtual void UploadPages(const std::span<char> buffer, int64_t blobOffset) = 0;
63+
64+
/// <summary>
65+
/// Retrieve the current ETag of the blob.
66+
/// </summary>
67+
/// <returns>The current ETag of the blob.</returns>
68+
virtual ::Azure::ETag GetEtag() = 0;
69+
70+
/// <summary>
71+
/// Downloads a portion of the blob into the provided buffer, performing an ETag match check.
72+
/// </summary>
73+
/// <param name="buffer">A span of bytes where the downloaded data will be stored.</param>
74+
/// <param name="blobOffset">The starting position (in bytes) in the blob from which to begin downloading.</param>
75+
/// <param name="readLength">The number of bytes to download from the offset.</param>
76+
/// <param name="ifMatch">The ETag to check against.</param>
77+
/// <returns>The number of bytes actually downloaded.</returns>
78+
virtual int64_t Download(std::span<char> buffer, int64_t blobOffset, int64_t readLength, const ::Azure::ETag& ifMatch) = 0;
6279
};
6380
}

src/AVEVA/RocksDB/Plugin/Azure/Impl/BlobFilesystemImpl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,11 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
196196
auto cache = m_fileCaches.find(prefix);
197197
if (cache != m_fileCaches.end())
198198
{
199-
return ReadableFileImpl{ realPath, std::move(blobClient), cache->second };
199+
return ReadableFileImpl{ realPath, std::move(blobClient), cache->second, m_logger };
200200
}
201201
else
202202
{
203-
return ReadableFileImpl{ realPath, std::move(blobClient), nullptr };
203+
return ReadableFileImpl{ realPath, std::move(blobClient), nullptr, m_logger};
204204
}
205205
}
206206

src/AVEVA/RocksDB/Plugin/Azure/Impl/PageBlob.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
#include "AVEVA/RocksDB/Plugin/Azure/Impl/PageBlob.hpp"
55
#include "AVEVA/RocksDB/Plugin/Azure/Impl/BlobHelpers.hpp"
6+
#include <azure/core/etag.hpp>
7+
#include <azure/core/context.hpp>
8+
#include <cassert>
69
namespace AVEVA::RocksDB::Plugin::Azure::Impl
710
{
811
PageBlob::PageBlob(::Azure::Storage::Blobs::PageBlobClient client)
@@ -54,4 +57,26 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
5457
::Azure::Core::IO::MemoryBodyStream dataStream(reinterpret_cast<uint8_t*>(buffer.data()), buffer.size());
5558
m_client.UploadPages(blobOffset, dataStream);
5659
}
60+
61+
::Azure::ETag PageBlob::GetEtag()
62+
{
63+
const auto properties = m_client.GetProperties();
64+
return properties.Value.ETag;
65+
}
66+
67+
int64_t PageBlob::Download(std::span<char> buffer, int64_t offset, int64_t length, const ::Azure::ETag& ifMatch)
68+
{
69+
::Azure::Storage::Blobs::DownloadBlobOptions options
70+
{
71+
.Range = ::Azure::Core::Http::HttpRange { offset, length }
72+
};
73+
options.AccessConditions.IfMatch = ifMatch;
74+
const auto result = m_client.Download(options, ::Azure::Core::Context{});
75+
const auto& content = result.Value;
76+
77+
auto bytesRead = content.BodyStream->ReadToCount(reinterpret_cast<uint8_t*>(buffer.data()), buffer.size());
78+
79+
assert(static_cast<int>(content.ContentRange.Length.ValueOr(-1)) == bytesRead && "Bytes read differ from server ContentRange");
80+
return bytesRead;
81+
}
5782
}

src/AVEVA/RocksDB/Plugin/Azure/Impl/ReadableFileImpl.cpp

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,27 @@
22
// SPDX-FileCopyrightText: Copyright 2025 AVEVA
33

44
#include "AVEVA/RocksDB/Plugin/Azure/Impl/ReadableFileImpl.hpp"
5+
6+
#include <boost/log/trivial.hpp>
7+
58
#include <cassert>
9+
#include <azure/core/exception.hpp>
10+
11+
using namespace boost::log::trivial;
612
namespace AVEVA::RocksDB::Plugin::Azure::Impl
713
{
814
ReadableFileImpl::ReadableFileImpl(std::string_view name,
915
std::shared_ptr<Core::BlobClient> blobClient,
10-
std::shared_ptr<Core::FileCache> fileCache)
16+
std::shared_ptr<Core::FileCache> fileCache,
17+
std::shared_ptr<boost::log::sources::severity_logger_mt<boost::log::trivial::severity_level>> logger)
1118
: m_name(name),
1219
m_blobClient(std::move(blobClient)),
1320
m_fileCache(std::move(fileCache)),
1421
m_offset(0),
15-
m_size(m_blobClient ? m_blobClient->GetSize() : 0LL)
22+
m_size(m_blobClient ? m_blobClient->GetSize() : 0LL),
23+
m_logger(std::move(logger))
1624
{
25+
m_etag = m_blobClient->GetEtag();
1726
}
1827

1928
int64_t ReadableFileImpl::SequentialRead(const int64_t bytesToRead, char* buffer)
@@ -31,19 +40,12 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
3140
m_offset += static_cast<int64_t>(*bytesRead);
3241
return static_cast<int64_t>(*bytesRead);
3342
}
34-
}
43+
}
3544

36-
int64_t bytesRead = 0;
3745
assert(m_size >= m_offset && "m_size needs to be bigger than m_offset or else we will overflow");
38-
int64_t bytesRequested = m_size - m_offset;
39-
if (bytesRequested > bytesToRead) bytesRequested = bytesToRead;
40-
if (bytesRequested <= 0)
41-
{
42-
return 0;
43-
}
4446

45-
const auto result = m_blobClient->DownloadTo(std::span<char>(buffer, static_cast<std::size_t>(bytesRequested)), m_offset, bytesRequested);
46-
bytesRead = result > 0 ? result : 0;
47+
auto bytesRead = DownloadWithRetry(m_offset, bytesToRead, buffer);
48+
bytesRead = std::max<int64_t>(bytesRead, 0);
4749

4850
m_offset += bytesRead;
4951
return bytesRead;
@@ -65,18 +67,8 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
6567
}
6668
}
6769

68-
int64_t bytesRead = 0;
69-
70-
assert(m_size >= offset && "m_size needs to be bigger than or equal to offset or else we will overflow");
71-
int64_t bytesRequested = m_size - offset;
72-
if (bytesRequested > bytesToRead) bytesRequested = bytesToRead;
73-
if (bytesRequested <= 0)
74-
{
75-
return 0;
76-
}
77-
78-
const auto result = m_blobClient->DownloadTo(std::span<char>(buffer, static_cast<std::size_t>(bytesRequested)), offset, bytesRequested);
79-
bytesRead = result > 0 ? result : 0;
70+
auto bytesRead = DownloadWithRetry(offset, bytesToRead, buffer);
71+
bytesRead = std::max<int64_t>(bytesRead, 0);
8072

8173
return bytesRead;
8274
}
@@ -93,6 +85,58 @@ namespace AVEVA::RocksDB::Plugin::Azure::Impl
9385

9486
int64_t ReadableFileImpl::GetSize() const
9587
{
88+
RefreshBlobMetadata();
89+
9690
return m_size;
9791
}
92+
93+
int64_t ReadableFileImpl::DownloadWithRetry(const int64_t offset, const int64_t bytesToRead, char* buffer) const
94+
{
95+
int64_t bytesRead = 0;
96+
97+
bool success = false;
98+
do
99+
{
100+
auto remaining = std::max<int64_t>(0, m_size - offset);
101+
if (remaining == 0)
102+
{
103+
auto latestEtag = m_blobClient->GetEtag();
104+
if (latestEtag != m_etag)
105+
{
106+
RefreshBlobMetadata();
107+
continue;
108+
}
109+
110+
return 0;
111+
}
112+
113+
auto toRead = std::min(bytesToRead, remaining);
114+
try
115+
{
116+
bytesRead = m_blobClient->Download(std::span<char>(buffer, static_cast<size_t>(toRead)), offset, toRead, m_etag);
117+
bytesRead = std::min(bytesRead, remaining);
118+
success = true;
119+
}
120+
catch (const ::Azure::Core::RequestFailedException& ex)
121+
{
122+
if (ex.StatusCode == ::Azure::Core::Http::HttpStatusCode::PreconditionFailed)
123+
{
124+
RefreshBlobMetadata();
125+
}
126+
else
127+
{
128+
throw;
129+
}
130+
}
131+
} while (!success);
132+
133+
return bytesRead;
134+
}
135+
136+
void ReadableFileImpl::RefreshBlobMetadata() const
137+
{
138+
m_size = m_blobClient->GetSize();
139+
m_etag = m_blobClient->GetEtag();
140+
BOOST_LOG_SEV(*m_logger, debug) << "Blob metadata refreshed for file '" << m_name << "' :size = " << m_size << " bytes, etag = " << m_etag.ToString();
141+
}
98142
}

tests/AVEVA/RocksDB/Plugin/Azure/Impl/BlobFilesystemIntegrationTests.cpp

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,3 +1206,101 @@ TEST_F(BlobFilesystemIntegrationTests, CreateWriteableFile_AfterDelete_Recreates
12061206
EXPECT_TRUE(m_filesystem->FileExists(path));
12071207
EXPECT_EQ(1024, m_filesystem->GetFileSize(path));
12081208
}
1209+
1210+
TEST_F(BlobFilesystemIntegrationTests, SequentialRead_ETagMismatch_RefreshesAndRetriess)
1211+
{
1212+
// Arrange
1213+
std::string blobName = m_containerPrefix + "/original-" + m_blobName;
1214+
1215+
std::vector<char> initialData(512, 'a');
1216+
auto file = m_filesystem->CreateWriteableFile(blobName);
1217+
file.Append(initialData);
1218+
file.Sync(); // Sync to update size metadata
1219+
1220+
auto readFile = m_filesystem->CreateReadableFile(blobName);
1221+
std::vector<char> readBuffer(512);
1222+
auto bytesRead = readFile.SequentialRead(static_cast<int64_t>(512), readBuffer.data());
1223+
EXPECT_EQ(512, bytesRead);
1224+
EXPECT_EQ(512, readFile.GetSize());
1225+
EXPECT_TRUE(std::all_of(readBuffer.begin(), readBuffer.end(), [](char c) { return c == 'a'; }));
1226+
1227+
// Act
1228+
std::vector<char> updatedData(512, 'b');
1229+
file.Append(updatedData);
1230+
file.Sync(); // Sync to update size metadata
1231+
1232+
// Assert
1233+
std::vector<char> readAppendedBuffer(1024);
1234+
bytesRead = readFile.SequentialRead(static_cast<int64_t>(1024), readAppendedBuffer.data());
1235+
EXPECT_EQ(512, bytesRead);
1236+
EXPECT_EQ(1024, readFile.GetSize());
1237+
EXPECT_EQ(512, std::count(readAppendedBuffer.begin(), readAppendedBuffer.begin() + 512, 'b'));
1238+
1239+
// Cleanup
1240+
EXPECT_TRUE(m_filesystem->DeleteFile(blobName));
1241+
}
1242+
1243+
TEST_F(BlobFilesystemIntegrationTests, RandomRead_ETagMismatch_RefreshesAndRetries)
1244+
{
1245+
// Arrange
1246+
std::string blobName = m_containerPrefix + "/original-" + m_blobName;
1247+
1248+
std::vector<char> initialData(512, 'a');
1249+
auto file = m_filesystem->CreateWriteableFile(blobName);
1250+
file.Append(initialData);
1251+
file.Sync(); // Sync to update size metadata
1252+
1253+
auto readFile = m_filesystem->CreateReadableFile(blobName);
1254+
std::vector<char> readBuffer(512);
1255+
auto bytesRead = readFile.RandomRead(0, static_cast<int64_t>(512), readBuffer.data());
1256+
EXPECT_EQ(512, bytesRead);
1257+
EXPECT_EQ(512, readFile.GetSize());
1258+
EXPECT_TRUE(std::all_of(readBuffer.begin(), readBuffer.end(), [](char c) { return c == 'a'; }));
1259+
1260+
// Act
1261+
std::vector<char> updatedData(512, 'b');
1262+
file.Append(updatedData);
1263+
file.Sync(); // Sync to update size metadata
1264+
1265+
// Assert
1266+
std::vector<char> readAppendedBuffer(1024);
1267+
bytesRead = readFile.RandomRead(0, static_cast<int64_t>(1024), readAppendedBuffer.data());
1268+
EXPECT_EQ(1024, bytesRead);
1269+
EXPECT_EQ(1024, readFile.GetSize());
1270+
EXPECT_EQ(512, std::count(readAppendedBuffer.begin(), readAppendedBuffer.begin() + 512, 'a'));
1271+
EXPECT_EQ(512, std::count(readAppendedBuffer.begin() + 512, readAppendedBuffer.end(), 'b'));
1272+
1273+
// Cleanup
1274+
EXPECT_TRUE(m_filesystem->DeleteFile(blobName));
1275+
}
1276+
1277+
TEST_F(BlobFilesystemIntegrationTests, RandomRead_AfterBlobGrows_UpdatesSize)
1278+
{
1279+
// Arrange
1280+
std::string blobName = m_containerPrefix + "/original-" + m_blobName;
1281+
std::vector<char> initialData(256, 'Z');
1282+
auto writeFile = m_filesystem->CreateWriteableFile(blobName);
1283+
writeFile.Append(initialData);
1284+
writeFile.Sync();
1285+
1286+
auto readFile = m_filesystem->CreateReadableFile(blobName);
1287+
EXPECT_EQ(256, readFile.GetSize());
1288+
1289+
// Act
1290+
std::vector<char> appendData(512, 'Y');
1291+
auto reopenFile = m_filesystem->ReopenWriteableFile(blobName);
1292+
reopenFile.Append(appendData);
1293+
reopenFile.Sync();
1294+
1295+
// Assert
1296+
EXPECT_EQ(768, readFile.GetSize());
1297+
1298+
std::vector<char> buffer(100);
1299+
int64_t bytesRead = readFile.RandomRead(300, 100, buffer.data());
1300+
EXPECT_EQ(100, bytesRead);
1301+
EXPECT_TRUE(std::all_of(buffer.begin(), buffer.end(), [](char c) { return c == 'Y'; }));
1302+
EXPECT_EQ(768, readFile.GetSize());
1303+
1304+
// Cleanup
1305+
EXPECT_TRUE(m_filesystem->DeleteFile(blobName));
1306+
}

0 commit comments

Comments
 (0)