diff --git a/cpp/pixels-retina/CMakeLists.txt b/cpp/pixels-retina/CMakeLists.txt
index 7065d0e6b2..f5fcad58fb 100644
--- a/cpp/pixels-retina/CMakeLists.txt
+++ b/cpp/pixels-retina/CMakeLists.txt
@@ -38,6 +38,7 @@ include_directories(${CMAKE_SOURCE_DIR}/include)
# Source files
set(SOURCES
+ lib/EpochManager.cpp
lib/TileVisibility.cpp
lib/RGVisibility.cpp
lib/RGVisibilityJni.cpp
@@ -63,6 +64,7 @@ install(TARGETS pixels-retina
# Add the test executable
add_executable(tile_visibility_tests test/TileVisibilityTest.cpp)
add_executable(rg_visibility_tests test/RGVisibilityTest.cpp)
+add_executable(visibility_perf_tests test/VisibilityPerformanceTest.cpp)
# Link the test executable with the library
target_link_libraries(tile_visibility_tests
@@ -73,10 +75,15 @@ target_link_libraries(rg_visibility_tests
pixels-retina
GTest::gtest_main
)
+target_link_libraries(visibility_perf_tests
+ pixels-retina
+ GTest::gtest_main
+)
include(GoogleTest)
gtest_discover_tests(tile_visibility_tests)
gtest_discover_tests(rg_visibility_tests)
+gtest_discover_tests(visibility_perf_tests)
# Set build type to Debug if not specified
# if (NOT CMAKE_BUILD_TYPE)
diff --git a/cpp/pixels-retina/include/EpochManager.h b/cpp/pixels-retina/include/EpochManager.h
new file mode 100644
index 0000000000..9a701599f0
--- /dev/null
+++ b/cpp/pixels-retina/include/EpochManager.h
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+#ifndef PIXELS_RETINA_EPOCH_MANAGER_H
+#define PIXELS_RETINA_EPOCH_MANAGER_H
+
+#include
+#include
+#include
+
+/**
+ * EpochManager - A global epoch-based memory reclamation system
+ *
+ * This manager allows safe deferred deletion of objects in a concurrent
+ * environment using epoch-based reclamation. Threads announce their presence
+ * in an epoch, and objects can only be reclaimed when all threads have
+ * advanced past the epoch in which the object was retired.
+ */
+class EpochManager {
+public:
+ static EpochManager& getInstance() {
+ static EpochManager instance;
+ return instance;
+ }
+
+ /**
+ * Enter the critical section. Thread announces it's participating in
+ * the current epoch.
+ */
+ void enterEpoch();
+
+ /**
+ * Exit the critical section. Thread leaves the epoch.
+ */
+ void exitEpoch();
+
+ /**
+ * Advance the global epoch counter. Should be called by GC operations.
+ * Returns the new epoch value.
+ */
+ uint64_t advanceEpoch();
+
+ /**
+ * Check if an object retired at the given epoch can be safely reclaimed.
+ * An object can be reclaimed if all threads have advanced past its retire epoch.
+ */
+ bool canReclaim(uint64_t retireEpoch) const;
+
+ /**
+ * Get the current global epoch.
+ */
+ uint64_t getCurrentEpoch() const {
+ return globalEpoch.load(std::memory_order_acquire);
+ }
+
+private:
+ EpochManager() : globalEpoch(1) {}
+ ~EpochManager();
+
+ EpochManager(const EpochManager&) = delete;
+ EpochManager& operator=(const EpochManager&) = delete;
+
+ struct ThreadInfo {
+ std::atomic localEpoch{0}; // 0 means not in critical section
+ std::atomic active{true};
+ std::thread::id threadId;
+ ThreadInfo* next{nullptr};
+ };
+
+ std::atomic globalEpoch;
+ std::atomic threadListHead{nullptr};
+ std::mutex threadListMutex; // Protects thread list modifications
+
+ ThreadInfo* getOrCreateThreadInfo();
+ static thread_local ThreadInfo* tlsThreadInfo;
+};
+
+/**
+ * RAII helper for epoch protection
+ */
+class EpochGuard {
+public:
+ EpochGuard() {
+ EpochManager::getInstance().enterEpoch();
+ }
+
+ ~EpochGuard() {
+ EpochManager::getInstance().exitEpoch();
+ }
+
+ EpochGuard(const EpochGuard&) = delete;
+ EpochGuard& operator=(const EpochGuard&) = delete;
+};
+
+#endif // PIXELS_RETINA_EPOCH_MANAGER_H
diff --git a/cpp/pixels-retina/include/RGVisibility.h b/cpp/pixels-retina/include/RGVisibility.h
index af6e9080c7..88867c52ec 100644
--- a/cpp/pixels-retina/include/RGVisibility.h
+++ b/cpp/pixels-retina/include/RGVisibility.h
@@ -40,20 +40,12 @@ class RGVisibility {
private:
static constexpr uint32_t VISIBILITY_RECORD_CAPACITY = 256;
- static constexpr uint32_t MAX_ACCESS_COUNT = 0x007FFFFF;
- static constexpr uint32_t GC_MASK = 0xFF000000;
- static constexpr uint32_t ACCESS_MASK = 0x00FFFFFF;
- static constexpr uint32_t ACCESS_INC = 0x00000001;
static constexpr uint32_t BITMAP_SIZE_PER_TILE_VISIBILITY = 4;
- static constexpr uint32_t RG_READ_LEASE_MS = 100;
TileVisibility* tileVisibilities;
const uint64_t tileCount;
- std::atomic flag; // high 1 byte is the gc flag, low 3 bytes are the access count
TileVisibility* getTileVisibility(uint32_t rowId) const;
- void beginRGAccess();
- void endRGAccess();
};
#endif //RG_VISIBILITY_H
diff --git a/cpp/pixels-retina/include/TileVisibility.h b/cpp/pixels-retina/include/TileVisibility.h
index a6af5bee47..2d9b71db46 100644
--- a/cpp/pixels-retina/include/TileVisibility.h
+++ b/cpp/pixels-retina/include/TileVisibility.h
@@ -29,6 +29,7 @@
#include
#include
#include
+#include
inline uint64_t makeDeleteIndex(uint8_t rowId, uint64_t ts) {
return (static_cast(rowId) << 56 | (ts & 0x00FFFFFFFFFFFFFFULL));
@@ -48,6 +49,41 @@ struct DeleteIndexBlock {
std::atomic next{nullptr};
};
+/**
+ * VersionedData - A versioned snapshot of the base state
+ * Used for Copy-on-Write during garbage collection
+ * IMPORTANT: head is part of the version to ensure atomic visibility
+ */
+struct VersionedData {
+ uint64_t baseBitmap[4];
+ uint64_t baseTimestamp;
+ DeleteIndexBlock* head; // Delete chain head, part of the version
+
+ VersionedData() : baseTimestamp(0), head(nullptr) {
+ baseBitmap[0] = baseBitmap[1] = baseBitmap[2] = baseBitmap[3] = 0;
+ }
+
+ VersionedData(uint64_t ts, const uint64_t bitmap[4], DeleteIndexBlock* h)
+ : baseTimestamp(ts), head(h) {
+ baseBitmap[0] = bitmap[0];
+ baseBitmap[1] = bitmap[1];
+ baseBitmap[2] = bitmap[2];
+ baseBitmap[3] = bitmap[3];
+ }
+};
+
+/**
+ * RetiredVersion - Tracks a retired version for epoch-based reclamation
+ */
+struct RetiredVersion {
+ VersionedData* data;
+ DeleteIndexBlock* blocksToDelete; // Head of the chain to delete
+ uint64_t retireEpoch;
+
+ RetiredVersion(VersionedData* d, DeleteIndexBlock* b, uint64_t e)
+ : data(d), blocksToDelete(b), retireEpoch(e) {}
+};
+
class TileVisibility {
public:
TileVisibility();
@@ -61,11 +97,12 @@ class TileVisibility {
TileVisibility(const TileVisibility &) = delete;
TileVisibility &operator=(const TileVisibility &) = delete;
- uint64_t baseBitmap[4];
- uint64_t baseTimestamp;
- std::atomic head;
+ void reclaimRetiredVersions();
+
+ std::atomic currentVersion;
std::atomic tail;
std::atomic tailUsed;
+ std::vector retired; // Protected by GC (single writer)
};
#endif // PIXELS_RETINA_TILE_VISIBILITY_H
diff --git a/cpp/pixels-retina/lib/EpochManager.cpp b/cpp/pixels-retina/lib/EpochManager.cpp
new file mode 100644
index 0000000000..bfe583f467
--- /dev/null
+++ b/cpp/pixels-retina/lib/EpochManager.cpp
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+#include "EpochManager.h"
+
+// Thread-local storage for thread info
+thread_local EpochManager::ThreadInfo* EpochManager::tlsThreadInfo = nullptr;
+
+EpochManager::~EpochManager() {
+ ThreadInfo* current = threadListHead.load(std::memory_order_acquire);
+ while (current) {
+ ThreadInfo* next = current->next;
+ delete current;
+ current = next;
+ }
+}
+
+EpochManager::ThreadInfo* EpochManager::getOrCreateThreadInfo() {
+ if (tlsThreadInfo) {
+ return tlsThreadInfo;
+ }
+
+ // Create new thread info
+ ThreadInfo* newInfo = new ThreadInfo();
+ newInfo->threadId = std::this_thread::get_id();
+
+ // Add to global list
+ std::lock_guard lock(threadListMutex);
+ newInfo->next = threadListHead.load(std::memory_order_relaxed);
+ threadListHead.store(newInfo, std::memory_order_release);
+
+ tlsThreadInfo = newInfo;
+ return newInfo;
+}
+
+void EpochManager::enterEpoch() {
+ ThreadInfo* info = getOrCreateThreadInfo();
+ uint64_t currentEpoch = globalEpoch.load(std::memory_order_acquire);
+ info->localEpoch.store(currentEpoch, std::memory_order_release);
+}
+
+void EpochManager::exitEpoch() {
+ if (tlsThreadInfo) {
+ tlsThreadInfo->localEpoch.store(0, std::memory_order_release);
+ }
+}
+
+uint64_t EpochManager::advanceEpoch() {
+ return globalEpoch.fetch_add(1, std::memory_order_acq_rel) + 1;
+}
+
+bool EpochManager::canReclaim(uint64_t retireEpoch) const {
+ // Scan all threads to find the minimum active epoch
+ ThreadInfo* current = threadListHead.load(std::memory_order_acquire);
+
+ while (current) {
+ if (!current->active.load(std::memory_order_acquire)) {
+ current = current->next;
+ continue;
+ }
+
+ uint64_t localEpoch = current->localEpoch.load(std::memory_order_acquire);
+
+ // localEpoch == 0 means the thread is not in critical section, skip it
+ if (localEpoch != 0 && localEpoch <= retireEpoch) {
+ // Found a thread still in or before the retire epoch
+ return false;
+ }
+
+ current = current->next;
+ }
+
+ // All threads have advanced past the retire epoch
+ return true;
+}
diff --git a/cpp/pixels-retina/lib/RGVisibility.cpp b/cpp/pixels-retina/lib/RGVisibility.cpp
index 8ce8db0099..84925c0040 100644
--- a/cpp/pixels-retina/lib/RGVisibility.cpp
+++ b/cpp/pixels-retina/lib/RGVisibility.cpp
@@ -24,7 +24,6 @@
RGVisibility::RGVisibility(uint64_t rgRecordNum)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
- flag.store(0, std::memory_order_relaxed);
void* rawMemory = operator new[](tileCount * sizeof(TileVisibility));
tileVisibilities = static_cast(rawMemory);
for (uint64_t i = 0; i < tileCount; ++i) {
@@ -34,7 +33,6 @@ RGVisibility::RGVisibility(uint64_t rgRecordNum)
RGVisibility::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector& initialBitmap)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
- flag.store(0, std::memory_order_relaxed);
void* rawMemory = operator new[](tileCount * sizeof(TileVisibility));
tileVisibilities = static_cast(rawMemory);
@@ -59,61 +57,11 @@ RGVisibility::~RGVisibility() {
operator delete[](tileVisibilities);
}
-void RGVisibility::beginRGAccess() {
- while (true) {
- uint32_t v = flag.load(std::memory_order_acquire);
- uint32_t accessCount = v & ACCESS_MASK;
-
- if (accessCount >= MAX_ACCESS_COUNT) {
- throw std::runtime_error("Reaches the max concurrent access count.");
- }
-
- if ((v & GC_MASK) > 0 ||
- !flag.compare_exchange_strong(v, v + ACCESS_INC, std::memory_order_acq_rel)) {
- // We failed to get gc lock or increase access count.
- if ((v & GC_MASK) > 0) {
- // if there is an existing gc, sleep for 10ms.
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
- continue;
- }
- break;
- }
-}
-
-void RGVisibility::endRGAccess() {
- uint32_t v = flag.load(std::memory_order_acquire);
- while((v & ACCESS_MASK) > 0) {
- if (flag.compare_exchange_strong(v, v - ACCESS_INC, std::memory_order_acq_rel)) {
- break;
- }
- v = flag.load(std::memory_order_acquire);
- }
-}
-
void RGVisibility::collectRGGarbage(uint64_t timestamp) {
- // Set the gc flag.
- flag.store(flag.load(std::memory_order_acquire) | GC_MASK, std::memory_order_release);
-
- // Wait for all access to end.
- while (true) {
- uint32_t v = flag.load(std::memory_order_acquire);
- if ((v & ACCESS_MASK) == 0) {
- break;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
-
- assert((flag.load(std::memory_order_acquire) & GC_MASK) > 0);
- assert((flag.load(std::memory_order_acquire) & ACCESS_MASK) == 0);
-
- // Garbage collect.
+ // TileVisibility::collectTileGarbage uses COW + Epoch, so it's safe to call concurrently
for (uint64_t i = 0; i < tileCount; i++) {
tileVisibilities[i].collectTileGarbage(timestamp);
}
-
- // Clear the gc flag.
- flag.store(flag.load(std::memory_order_acquire) & ~GC_MASK, std::memory_order_release);
}
TileVisibility* RGVisibility::getTileVisibility(uint32_t rowId) const {
@@ -125,34 +73,20 @@ TileVisibility* RGVisibility::getTileVisibility(uint32_t rowId) const {
}
void RGVisibility::deleteRGRecord(uint32_t rowId, uint64_t timestamp) {
- try {
- beginRGAccess();
- TileVisibility* tileVisibility = getTileVisibility(rowId);
- tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp);
- endRGAccess();
- }
- catch (const std::runtime_error& e) {
- endRGAccess();
- throw std::runtime_error("Failed to delete record: " + std::string(e.what()));
- }
+ // TileVisibility::deleteTileRecord is lock-free and concurrent-safe
+ TileVisibility* tileVisibility = getTileVisibility(rowId);
+ tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp);
}
uint64_t* RGVisibility::getRGVisibilityBitmap(uint64_t timestamp) {
- beginRGAccess();
+ // TileVisibility::getTileVisibilityBitmap uses Epoch protection internally
uint64_t* bitmap = new uint64_t[tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY];
memset(bitmap, 0, tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY * sizeof(uint64_t));
- try {
- for (uint64_t i = 0; i < tileCount; i++) {
- tileVisibilities[i].getTileVisibilityBitmap(timestamp, bitmap + i * BITMAP_SIZE_PER_TILE_VISIBILITY);
- }
- endRGAccess();
- return bitmap;
- } catch (const std::runtime_error& e) {
- delete[] bitmap;
- endRGAccess();
- throw std::runtime_error("Failed to get visibility bitmap: " + std::string(e.what()));
+ for (uint64_t i = 0; i < tileCount; i++) {
+ tileVisibilities[i].getTileVisibilityBitmap(timestamp, bitmap + i * BITMAP_SIZE_PER_TILE_VISIBILITY);
}
+ return bitmap;
}
uint64_t RGVisibility::getBitmapSize() const {
diff --git a/cpp/pixels-retina/lib/TileVisibility.cpp b/cpp/pixels-retina/lib/TileVisibility.cpp
index a351314876..e58671f2d1 100644
--- a/cpp/pixels-retina/lib/TileVisibility.cpp
+++ b/cpp/pixels-retina/lib/TileVisibility.cpp
@@ -19,32 +19,51 @@
*/
#include "TileVisibility.h"
+#include "EpochManager.h"
#include
#include
#include
-TileVisibility::TileVisibility() : baseTimestamp(0UL) {
- memset(baseBitmap, 0, 4 * sizeof(uint64_t));
- head.store(nullptr, std::memory_order_release);
+TileVisibility::TileVisibility() {
+ VersionedData* initialVersion = new VersionedData();
+ currentVersion.store(initialVersion, std::memory_order_release);
tail.store(nullptr, std::memory_order_release);
tailUsed.store(0, std::memory_order_release);
}
-TileVisibility::TileVisibility(uint64_t ts, const uint64_t bitmap[4])
- : baseTimestamp(ts) {
- memcpy(baseBitmap, bitmap, 4 * sizeof(uint64_t));
- head.store(nullptr, std::memory_order_release);
+TileVisibility::TileVisibility(uint64_t ts, const uint64_t bitmap[4]) {
+ VersionedData* initialVersion = new VersionedData(ts, bitmap, nullptr);
+ currentVersion.store(initialVersion, std::memory_order_release);
tail.store(nullptr, std::memory_order_release);
+ tailUsed.store(0, std::memory_order_release);
}
TileVisibility::~TileVisibility() {
- DeleteIndexBlock *blk = head.load(std::memory_order_acquire);
- while (blk) {
- DeleteIndexBlock *next = blk->next.load(std::memory_order_acquire);
- delete blk;
- blk = next;
+ // Clean up current version and its delete chain
+ VersionedData* ver = currentVersion.load(std::memory_order_acquire);
+ if (ver) {
+ DeleteIndexBlock *blk = ver->head;
+ while (blk) {
+ DeleteIndexBlock *next = blk->next.load(std::memory_order_acquire);
+ delete blk;
+ blk = next;
+ }
+ delete ver;
+ }
+
+ // Clean up retired versions and their delete chains
+ for (auto& retired : this->retired) {
+ if (retired.data) {
+ delete retired.data;
+ }
+ DeleteIndexBlock* blk = retired.blocksToDelete;
+ while (blk) {
+ DeleteIndexBlock* next = blk->next.load(std::memory_order_acquire);
+ delete blk;
+ blk = next;
+ }
}
}
@@ -52,28 +71,40 @@ void TileVisibility::deleteTileRecord(uint8_t rowId, uint64_t ts) {
uint64_t item = makeDeleteIndex(rowId, ts);
while (true) {
DeleteIndexBlock *curTail = tail.load(std::memory_order_acquire);
- if (!curTail) { // empty list
- /**
- * Issue: There is a delay in reading.
- * Reads are judged from the head, and if the head pointer is
- * not changed in time, the latest data cannot be read.
- */
+ if (!curTail) { // empty list - need to create first block and update version
DeleteIndexBlock *newBlk = new DeleteIndexBlock();
newBlk->items[0] = item;
DeleteIndexBlock *expectedTail = nullptr;
if (!tail.compare_exchange_strong(expectedTail, newBlk,
- std::memory_order_acq_rel)) {
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ delete newBlk;
+ continue;
+ }
+
+ // COW: Create new version with the new head
+ VersionedData* oldVer = currentVersion.load(std::memory_order_acquire);
+ VersionedData* newVer = new VersionedData(oldVer->baseTimestamp, oldVer->baseBitmap, newBlk);
+
+ if (currentVersion.compare_exchange_strong(oldVer, newVer, std::memory_order_acq_rel)) {
+ // Success: retire old version (no chain to delete since head was nullptr)
+ delete oldVer;
+ tailUsed.store(1, std::memory_order_release);
+ return;
+ } else {
+ // CAS failed, retry from beginning
+ delete newVer;
+ tail.store(nullptr, std::memory_order_release);
delete newBlk;
continue;
}
- head.store(newBlk, std::memory_order_release);
- tailUsed.store(1, std::memory_order_release);
- return;
} else {
size_t pos = tailUsed.load(std::memory_order_acquire);
if (pos < DeleteIndexBlock::BLOCK_CAPACITY) {
- if (tailUsed.compare_exchange_strong(pos, pos + 1, std::memory_order_acq_rel)) {
+ if (tailUsed.compare_exchange_strong(pos, pos + 1,
+ std::memory_order_relaxed,
+ std::memory_order_relaxed)) {
curTail->items[pos] = item;
return;
}
@@ -89,13 +120,16 @@ void TileVisibility::deleteTileRecord(uint8_t rowId, uint64_t ts) {
DeleteIndexBlock *expectedNext = nullptr;
if (!curTail->next.compare_exchange_strong(
- expectedNext, newBlk, std::memory_order_acq_rel)) {
+ expectedNext, newBlk,
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
delete newBlk;
continue;
}
tail.compare_exchange_strong(curTail, newBlk,
- std::memory_order_acq_rel);
+ std::memory_order_release,
+ std::memory_order_relaxed);
tailUsed.store(1, std::memory_order_release);
return;
}
@@ -136,15 +170,21 @@ inline void process_bitmap_block_256(const DeleteIndexBlock *blk,
}
void TileVisibility::getTileVisibilityBitmap(uint64_t ts, uint64_t outBitmap[4]) const {
- if (ts < baseTimestamp) {
+ // Enter epoch protection
+ EpochGuard guard;
+
+ // Load current version under epoch protection
+ VersionedData* ver = currentVersion.load(std::memory_order_acquire);
+
+ if (ts < ver->baseTimestamp) {
throw std::runtime_error("need to read checkpoint from disk");
}
- std::memcpy(outBitmap, baseBitmap, 4 * sizeof(uint64_t));
- if (ts == baseTimestamp) {
+ std::memcpy(outBitmap, ver->baseBitmap, 4 * sizeof(uint64_t));
+ if (ts == ver->baseTimestamp) {
return;
}
- DeleteIndexBlock *blk = head.load(std::memory_order_acquire);
+ DeleteIndexBlock *blk = ver->head;
#ifdef RETINA_SIMD
const __m256i signBit = _mm256_set1_epi64x(0x8000000000000000ULL);
const __m256i vThrFlip = _mm256_xor_si256(_mm256_set1_epi64x(ts), signBit);
@@ -152,8 +192,8 @@ void TileVisibility::getTileVisibilityBitmap(uint64_t ts, uint64_t outBitmap[4])
#endif
while (blk) {
- DeleteIndexBlock *currentTail = tail.load(std::memory_order_acquire);
- size_t currentTailUsed = tailUsed.load(std::memory_order_acquire);
+ DeleteIndexBlock *currentTail = tail.load(std::memory_order_relaxed);
+ size_t currentTailUsed = tailUsed.load(std::memory_order_relaxed);
size_t count = (blk == currentTail)
? currentTailUsed
: DeleteIndexBlock::BLOCK_CAPACITY;
@@ -189,21 +229,22 @@ void TileVisibility::getTileVisibilityBitmap(uint64_t ts, uint64_t outBitmap[4])
}
}
- blk = blk->next.load(std::memory_order_acquire);
+ blk = blk->next.load(std::memory_order_relaxed);
}
}
void TileVisibility::collectTileGarbage(uint64_t ts) {
- // The upper layers have ensured that there are no reads or writes at this point
- // so we can safely delete the records
-
- if (ts <= baseTimestamp) {
+ // Load old version
+ VersionedData* oldVer = currentVersion.load(std::memory_order_acquire);
+
+ if (ts <= oldVer->baseTimestamp) {
return;
}
- DeleteIndexBlock *blk = head.load(std::memory_order_acquire);
+ // Find the last block that should be compacted
+ DeleteIndexBlock *blk = oldVer->head;
DeleteIndexBlock *lastFullBlk = nullptr;
- uint64_t newBaseTimestamp = baseTimestamp;
+ uint64_t newBaseTimestamp = oldVer->baseTimestamp;
while (blk) {
size_t count = (blk == tail.load(std::memory_order_acquire))
@@ -225,26 +266,91 @@ void TileVisibility::collectTileGarbage(uint64_t ts) {
blk = blk->next.load(std::memory_order_acquire);
}
- if (lastFullBlk) {
- getTileVisibilityBitmap(ts, baseBitmap);
- baseTimestamp = newBaseTimestamp;
-
- DeleteIndexBlock *current = head.load(std::memory_order_acquire);
- DeleteIndexBlock *newHead =
- lastFullBlk->next.load(std::memory_order_acquire);
-
- head.store(newHead, std::memory_order_release);
+ if (!lastFullBlk) {
+ // Nothing to compact
+ return;
+ }
- DeleteIndexBlock *curTail = tail.load(std::memory_order_acquire);
+ // Create new version with Copy-on-Write
+ // Manually compute the new base bitmap from oldVer
+ uint64_t newBaseBitmap[4];
+ std::memcpy(newBaseBitmap, oldVer->baseBitmap, 4 * sizeof(uint64_t));
+
+ // Apply deletes from oldVer->head up to lastFullBlk
+ blk = oldVer->head;
+ while (blk) {
+ size_t count = (blk == lastFullBlk)
+ ? ((blk == tail.load(std::memory_order_acquire))
+ ? tailUsed.load(std::memory_order_acquire)
+ : DeleteIndexBlock::BLOCK_CAPACITY)
+ : DeleteIndexBlock::BLOCK_CAPACITY;
+
+ for (size_t i = 0; i < count; i++) {
+ uint64_t item = blk->items[i];
+ uint64_t delTs = extractTimestamp(item);
+ if (delTs <= ts) {
+ SET_BITMAP_BIT(newBaseBitmap, extractRowId(item));
+ }
+ }
+
+ if (blk == lastFullBlk) {
+ break;
+ }
+ blk = blk->next.load(std::memory_order_acquire);
+ }
+
+ // Get new head and break the chain to avoid double-free
+ DeleteIndexBlock* newHead = lastFullBlk->next.load(std::memory_order_acquire);
+ lastFullBlk->next.store(nullptr, std::memory_order_release);
+
+ // Create new version with new head - this is the atomic COW update
+ VersionedData* newVer = new VersionedData(newBaseTimestamp, newBaseBitmap, newHead);
+
+ // CAS to install new version atomically
+ if (currentVersion.compare_exchange_strong(oldVer, newVer,
+ std::memory_order_acq_rel)) {
+ // Successfully updated
+ // Retire old version and its delete chain
+ uint64_t retireEpoch = EpochManager::getInstance().advanceEpoch();
+ retired.emplace_back(oldVer, oldVer->head, retireEpoch);
+
+ // Update tail if needed (if all blocks were compacted)
if (!newHead) {
- tail.store(newHead, std::memory_order_release);
+ tail.store(nullptr, std::memory_order_release);
+ tailUsed.store(0, std::memory_order_release);
}
+
+ // Try to reclaim retired versions
+ reclaimRetiredVersions();
+ } else {
+ // CAS failed, another GC happened concurrently
+ // Restore the chain link
+ lastFullBlk->next.store(newHead, std::memory_order_release);
+ delete newVer;
+ }
+}
- while (current != lastFullBlk->next.load(std::memory_order_acquire)) {
- DeleteIndexBlock *next = current->next.load(
- std::memory_order_acquire);
- delete current;
- current = next;
+void TileVisibility::reclaimRetiredVersions() {
+ // Remove retired versions that can be safely reclaimed
+ auto it = retired.begin();
+ while (it != retired.end()) {
+ if (EpochManager::getInstance().canReclaim(it->retireEpoch)) {
+ // Safe to delete
+ if (it->data) {
+ delete it->data;
+ }
+
+ // Delete the chain of blocks
+ DeleteIndexBlock* blk = it->blocksToDelete;
+ while (blk) {
+ DeleteIndexBlock* next = blk->next.load(std::memory_order_acquire);
+ delete blk;
+ blk = next;
+ }
+
+ it = retired.erase(it);
+ } else {
+ ++it;
}
}
}
diff --git a/cpp/pixels-retina/test/VisibilityPerformanceTest.cpp b/cpp/pixels-retina/test/VisibilityPerformanceTest.cpp
new file mode 100644
index 0000000000..381cbf9168
--- /dev/null
+++ b/cpp/pixels-retina/test/VisibilityPerformanceTest.cpp
@@ -0,0 +1,583 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+#include "gtest/gtest.h"
+#include "RGVisibility.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+// Configurable test parameters
+#ifndef PERF_SMALL_DATASET
+#define PERF_SMALL_DATASET 1024
+#endif
+
+#ifndef PERF_MEDIUM_DATASET
+#define PERF_MEDIUM_DATASET 102400
+#endif
+
+#ifndef PERF_LARGE_DATASET
+#define PERF_LARGE_DATASET 1024000
+#endif
+
+#ifndef PERF_NUM_OPERATIONS
+#define PERF_NUM_OPERATIONS 10000
+#endif
+
+#ifndef PERF_NUM_THREADS
+#define PERF_NUM_THREADS 8
+#endif
+
+using namespace std::chrono;
+
+// Performance statistics helper class
+class PerfStats {
+public:
+ void addSample(double microseconds) {
+ samples.push_back(microseconds);
+ }
+
+ void calculate() {
+ if (samples.empty()) return;
+
+ std::sort(samples.begin(), samples.end());
+
+ min = samples.front();
+ max = samples.back();
+ avg = std::accumulate(samples.begin(), samples.end(), 0.0) / samples.size();
+
+ p50 = samples[samples.size() * 50 / 100];
+ p95 = samples[samples.size() * 95 / 100];
+ p99 = samples[samples.size() * 99 / 100];
+ }
+
+ void print(const std::string& name) const {
+ std::cout << "\n=== " << name << " ===" << std::endl;
+ std::cout << std::fixed << std::setprecision(2);
+ std::cout << " Count: " << samples.size() << std::endl;
+ std::cout << " Min: " << min << " μs" << std::endl;
+ std::cout << " Max: " << max << " μs" << std::endl;
+ std::cout << " Average: " << avg << " μs" << std::endl;
+ std::cout << " P50: " << p50 << " μs" << std::endl;
+ std::cout << " P95: " << p95 << " μs" << std::endl;
+ std::cout << " P99: " << p99 << " μs" << std::endl;
+
+ if (!samples.empty()) {
+ double totalSeconds = std::accumulate(samples.begin(), samples.end(), 0.0) / 1000000.0;
+ double throughput = samples.size() / totalSeconds;
+ std::cout << " Throughput: " << std::fixed << std::setprecision(0)
+ << throughput << " ops/sec" << std::endl;
+ }
+ }
+
+ double getAvg() const { return avg; }
+ size_t getCount() const { return samples.size(); }
+
+private:
+ std::vector samples;
+ double min = 0, max = 0, avg = 0, p50 = 0, p95 = 0, p99 = 0;
+};
+
+// Performance test base class
+class VisibilityPerfTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ std::cout << "\n" << std::string(80, '=') << std::endl;
+ }
+
+ void TearDown() override {
+ std::cout << std::string(80, '=') << std::endl;
+ }
+
+ // Timing helper function
+ template
+ double measureMicroseconds(Func&& func) {
+ auto start = high_resolution_clock::now();
+ func();
+ auto end = high_resolution_clock::now();
+ return duration_cast(end - start).count();
+ }
+};
+
+// ============================================================================
+// Single-threaded performance tests
+// ============================================================================
+
+TEST_F(VisibilityPerfTest, DeleteRecordPerformance_SmallDataset) {
+ const uint64_t ROW_COUNT = PERF_SMALL_DATASET;
+ const uint64_t NUM_OPS = std::min((uint64_t)PERF_NUM_OPERATIONS, ROW_COUNT);
+
+ std::cout << "Testing deleteRGRecord() performance with " << ROW_COUNT << " rows" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+ PerfStats stats;
+
+ for (uint64_t i = 0; i < NUM_OPS; i++) {
+ uint64_t timestamp = i + 1;
+ uint32_t rowId = i % ROW_COUNT;
+
+ double elapsed = measureMicroseconds([&]() {
+ rgVisibility->deleteRGRecord(rowId, timestamp);
+ });
+
+ stats.addSample(elapsed);
+ }
+
+ stats.calculate();
+ stats.print("deleteRGRecord() - Small Dataset");
+
+ delete rgVisibility;
+}
+
+TEST_F(VisibilityPerfTest, DeleteRecordPerformance_MediumDataset) {
+ const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET;
+ const uint64_t NUM_OPS = std::min((uint64_t)PERF_NUM_OPERATIONS, ROW_COUNT);
+
+ std::cout << "Testing deleteRGRecord() performance with " << ROW_COUNT << " rows" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+ PerfStats stats;
+
+ for (uint64_t i = 0; i < NUM_OPS; i++) {
+ uint64_t timestamp = i + 1;
+ uint32_t rowId = i % ROW_COUNT;
+
+ double elapsed = measureMicroseconds([&]() {
+ rgVisibility->deleteRGRecord(rowId, timestamp);
+ });
+
+ stats.addSample(elapsed);
+ }
+
+ stats.calculate();
+ stats.print("deleteRGRecord() - Medium Dataset");
+
+ delete rgVisibility;
+}
+
+TEST_F(VisibilityPerfTest, DeleteRecordPerformance_LargeDataset) {
+ const uint64_t ROW_COUNT = PERF_LARGE_DATASET;
+ const uint64_t NUM_OPS = std::min((uint64_t)PERF_NUM_OPERATIONS, ROW_COUNT);
+
+ std::cout << "Testing deleteRGRecord() performance with " << ROW_COUNT << " rows" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+ PerfStats stats;
+
+ for (uint64_t i = 0; i < NUM_OPS; i++) {
+ uint64_t timestamp = i + 1;
+ uint32_t rowId = i % ROW_COUNT;
+
+ double elapsed = measureMicroseconds([&]() {
+ rgVisibility->deleteRGRecord(rowId, timestamp);
+ });
+
+ stats.addSample(elapsed);
+ }
+
+ stats.calculate();
+ stats.print("deleteRGRecord() - Large Dataset");
+
+ delete rgVisibility;
+}
+
+TEST_F(VisibilityPerfTest, GetBitmapPerformance) {
+ const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET;
+ const uint64_t NUM_DELETES = 1000;
+
+ std::cout << "Testing getRGVisibilityBitmap() performance" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+
+ // Perform some delete operations first
+ for (uint64_t i = 0; i < NUM_DELETES; i++) {
+ rgVisibility->deleteRGRecord(i, i + 1);
+ }
+
+ PerfStats stats;
+
+ for (int i = 0; i < PERF_NUM_OPERATIONS; i++) {
+ uint64_t timestamp = (i % NUM_DELETES) + 1;
+
+ double elapsed = measureMicroseconds([&]() {
+ uint64_t* bitmap = rgVisibility->getRGVisibilityBitmap(timestamp);
+ delete[] bitmap;
+ });
+
+ stats.addSample(elapsed);
+ }
+
+ stats.calculate();
+ stats.print("getRGVisibilityBitmap()");
+
+ delete rgVisibility;
+}
+
+TEST_F(VisibilityPerfTest, CollectGarbagePerformance) {
+ const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET;
+ const uint64_t NUM_DELETES = 10000;
+
+ std::cout << "Testing collectRGGarbage() performance" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+
+ // Perform many delete operations first
+ for (uint64_t i = 0; i < NUM_DELETES; i++) {
+ rgVisibility->deleteRGRecord(i % ROW_COUNT, i + 1);
+ }
+
+ PerfStats stats;
+
+ for (int i = 0; i < 100; i++) {
+ uint64_t gcTimestamp = (i + 1) * 100;
+
+ double elapsed = measureMicroseconds([&]() {
+ rgVisibility->collectRGGarbage(gcTimestamp);
+ });
+
+ stats.addSample(elapsed);
+ }
+
+ stats.calculate();
+ stats.print("collectRGGarbage()");
+
+ delete rgVisibility;
+}
+
+// ============================================================================
+// Multi-threaded performance tests
+// ============================================================================
+
+TEST_F(VisibilityPerfTest, ConcurrentDeletePerformance) {
+ const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET;
+ const int NUM_THREADS = PERF_NUM_THREADS;
+ const int OPS_PER_THREAD = PERF_NUM_OPERATIONS / NUM_THREADS;
+
+ std::cout << "Testing concurrent deleteRGRecord() with "
+ << NUM_THREADS << " threads" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+ std::vector threadStats(NUM_THREADS);
+ std::atomic timestamp{0};
+
+ auto start = high_resolution_clock::now();
+
+ std::vector threads;
+ for (int t = 0; t < NUM_THREADS; t++) {
+ threads.emplace_back([&, t]() {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution dist(0, ROW_COUNT - 1);
+
+ for (int i = 0; i < OPS_PER_THREAD; i++) {
+ uint64_t ts = timestamp.fetch_add(1) + 1;
+ uint32_t rowId = dist(gen);
+
+ auto opStart = high_resolution_clock::now();
+ rgVisibility->deleteRGRecord(rowId, ts);
+ auto opEnd = high_resolution_clock::now();
+
+ double elapsed = duration_cast(opEnd - opStart).count();
+ threadStats[t].addSample(elapsed);
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ auto end = high_resolution_clock::now();
+ double totalTime = duration_cast(end - start).count();
+
+ // Calculate statistics for all threads
+ PerfStats combinedStats;
+ for (auto& stats : threadStats) {
+ stats.calculate();
+ }
+
+ std::cout << "\nConcurrent Delete Performance:" << std::endl;
+ std::cout << " Threads: " << NUM_THREADS << std::endl;
+ std::cout << " Total Ops: " << NUM_THREADS * OPS_PER_THREAD << std::endl;
+ std::cout << " Total Time: " << std::fixed << std::setprecision(2)
+ << totalTime / 1000.0 << " ms" << std::endl;
+ std::cout << " Throughput: " << std::fixed << std::setprecision(0)
+ << (NUM_THREADS * OPS_PER_THREAD) / (totalTime / 1000000.0)
+ << " ops/sec" << std::endl;
+
+ for (int t = 0; t < NUM_THREADS; t++) {
+ std::cout << "\n Thread " << t << " - Avg latency: "
+ << std::fixed << std::setprecision(2)
+ << threadStats[t].getAvg() << " μs" << std::endl;
+ }
+
+ delete rgVisibility;
+}
+
+TEST_F(VisibilityPerfTest, ConcurrentReadPerformance) {
+ const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET;
+ const int NUM_THREADS = PERF_NUM_THREADS;
+ const int OPS_PER_THREAD = PERF_NUM_OPERATIONS / NUM_THREADS;
+
+ std::cout << "Testing concurrent getRGVisibilityBitmap() with "
+ << NUM_THREADS << " threads" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+
+ // Perform some delete operations in advance
+ const uint64_t NUM_DELETES = 1000;
+ for (uint64_t i = 0; i < NUM_DELETES; i++) {
+ rgVisibility->deleteRGRecord(i % ROW_COUNT, i + 1);
+ }
+
+ std::vector threadStats(NUM_THREADS);
+
+ auto start = high_resolution_clock::now();
+
+ std::vector threads;
+ for (int t = 0; t < NUM_THREADS; t++) {
+ threads.emplace_back([&, t]() {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution dist(1, NUM_DELETES);
+
+ for (int i = 0; i < OPS_PER_THREAD; i++) {
+ uint64_t ts = dist(gen);
+
+ auto opStart = high_resolution_clock::now();
+ uint64_t* bitmap = rgVisibility->getRGVisibilityBitmap(ts);
+ delete[] bitmap;
+ auto opEnd = high_resolution_clock::now();
+
+ double elapsed = duration_cast(opEnd - opStart).count();
+ threadStats[t].addSample(elapsed);
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ auto end = high_resolution_clock::now();
+ double totalTime = duration_cast(end - start).count();
+
+ for (auto& stats : threadStats) {
+ stats.calculate();
+ }
+
+ std::cout << "\nConcurrent Read Performance:" << std::endl;
+ std::cout << " Threads: " << NUM_THREADS << std::endl;
+ std::cout << " Total Ops: " << NUM_THREADS * OPS_PER_THREAD << std::endl;
+ std::cout << " Total Time: " << std::fixed << std::setprecision(2)
+ << totalTime / 1000.0 << " ms" << std::endl;
+ std::cout << " Throughput: " << std::fixed << std::setprecision(0)
+ << (NUM_THREADS * OPS_PER_THREAD) / (totalTime / 1000000.0)
+ << " ops/sec" << std::endl;
+
+ for (int t = 0; t < NUM_THREADS; t++) {
+ std::cout << "\n Thread " << t << " - Avg latency: "
+ << std::fixed << std::setprecision(2)
+ << threadStats[t].getAvg() << " μs" << std::endl;
+ }
+
+ delete rgVisibility;
+}
+
+TEST_F(VisibilityPerfTest, MixedWorkloadPerformance) {
+ const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET;
+ const int NUM_WRITE_THREADS = PERF_NUM_THREADS / 2;
+ const int NUM_READ_THREADS = PERF_NUM_THREADS / 2;
+ const int OPS_PER_THREAD = PERF_NUM_OPERATIONS / PERF_NUM_THREADS;
+
+ std::cout << "Testing mixed read/write workload with "
+ << NUM_WRITE_THREADS << " writers and "
+ << NUM_READ_THREADS << " readers" << std::endl;
+
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+ std::atomic timestamp{0};
+ std::atomic running{true};
+
+ std::vector writeStats(NUM_WRITE_THREADS);
+ std::vector readStats(NUM_READ_THREADS);
+
+ auto start = high_resolution_clock::now();
+
+ // Start write threads
+ std::vector threads;
+ for (int t = 0; t < NUM_WRITE_THREADS; t++) {
+ threads.emplace_back([&, t]() {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution dist(0, ROW_COUNT - 1);
+
+ for (int i = 0; i < OPS_PER_THREAD; i++) {
+ uint64_t ts = timestamp.fetch_add(1) + 1;
+ uint32_t rowId = dist(gen);
+
+ auto opStart = high_resolution_clock::now();
+ rgVisibility->deleteRGRecord(rowId, ts);
+ auto opEnd = high_resolution_clock::now();
+
+ double elapsed = duration_cast(opEnd - opStart).count();
+ writeStats[t].addSample(elapsed);
+ }
+ });
+ }
+
+ // Start read threads
+ for (int t = 0; t < NUM_READ_THREADS; t++) {
+ threads.emplace_back([&, t]() {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+
+ for (int i = 0; i < OPS_PER_THREAD; i++) {
+ uint64_t maxTs = timestamp.load();
+ if (maxTs == 0) {
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ continue;
+ }
+
+ std::uniform_int_distribution dist(1, maxTs);
+ uint64_t ts = dist(gen);
+
+ auto opStart = high_resolution_clock::now();
+ uint64_t* bitmap = rgVisibility->getRGVisibilityBitmap(ts);
+ delete[] bitmap;
+ auto opEnd = high_resolution_clock::now();
+
+ double elapsed = duration_cast(opEnd - opStart).count();
+ readStats[t].addSample(elapsed);
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ auto end = high_resolution_clock::now();
+ double totalTime = duration_cast(end - start).count();
+
+ // Calculate statistics
+ for (auto& stats : writeStats) {
+ stats.calculate();
+ }
+ for (auto& stats : readStats) {
+ stats.calculate();
+ }
+
+ std::cout << "\nMixed Workload Performance:" << std::endl;
+ std::cout << " Write Threads: " << NUM_WRITE_THREADS << std::endl;
+ std::cout << " Read Threads: " << NUM_READ_THREADS << std::endl;
+ std::cout << " Total Time: " << std::fixed << std::setprecision(2)
+ << totalTime / 1000.0 << " ms" << std::endl;
+
+ size_t totalWrites = 0, totalReads = 0;
+ for (auto& stats : writeStats) {
+ totalWrites += stats.getCount();
+ }
+ for (auto& stats : readStats) {
+ totalReads += stats.getCount();
+ }
+
+ std::cout << " Total Writes: " << totalWrites << std::endl;
+ std::cout << " Total Reads: " << totalReads << std::endl;
+ std::cout << " Write Throughput: " << std::fixed << std::setprecision(0)
+ << totalWrites / (totalTime / 1000000.0) << " ops/sec" << std::endl;
+ std::cout << " Read Throughput: " << std::fixed << std::setprecision(0)
+ << totalReads / (totalTime / 1000000.0) << " ops/sec" << std::endl;
+
+ delete rgVisibility;
+}
+
+// ============================================================================
+// Scalability tests - Testing scalability with different thread counts
+// ============================================================================
+
+TEST_F(VisibilityPerfTest, ScalabilityTest) {
+ const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET;
+ const int TOTAL_OPS = 50000;
+
+ std::cout << "Testing scalability with different thread counts" << std::endl;
+ std::cout << std::string(60, '-') << std::endl;
+ std::cout << std::setw(10) << "Threads"
+ << std::setw(15) << "Time (ms)"
+ << std::setw(20) << "Throughput (ops/s)"
+ << std::setw(15) << "Speedup" << std::endl;
+ std::cout << std::string(60, '-') << std::endl;
+
+ double baselineTime = 0;
+
+ for (int numThreads : {1, 2, 4, 8, 16}) {
+ RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT);
+ std::atomic timestamp{0};
+
+ int opsPerThread = TOTAL_OPS / numThreads;
+
+ auto start = high_resolution_clock::now();
+
+ std::vector threads;
+ for (int t = 0; t < numThreads; t++) {
+ threads.emplace_back([&]() {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution dist(0, ROW_COUNT - 1);
+
+ for (int i = 0; i < opsPerThread; i++) {
+ uint64_t ts = timestamp.fetch_add(1) + 1;
+ uint32_t rowId = dist(gen);
+ rgVisibility->deleteRGRecord(rowId, ts);
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ auto end = high_resolution_clock::now();
+ double totalTime = duration_cast(end - start).count();
+ double throughput = (numThreads * opsPerThread) / (totalTime / 1000000.0);
+
+ if (numThreads == 1) {
+ baselineTime = totalTime;
+ }
+
+ double speedup = baselineTime / totalTime;
+
+ std::cout << std::setw(10) << numThreads
+ << std::setw(15) << std::fixed << std::setprecision(2) << totalTime / 1000.0
+ << std::setw(20) << std::fixed << std::setprecision(0) << throughput
+ << std::setw(15) << std::fixed << std::setprecision(2) << speedup << "x"
+ << std::endl;
+
+ delete rgVisibility;
+ }
+
+ std::cout << std::string(60, '-') << std::endl;
+}