Skip to content

Commit b6744e3

Browse files
committed
fix: Resolve critical bugs in cache, MySQL, and concurrency handling
## Cache Configuration - Change CacheConfig from int max_memory_mb to size_t max_memory_bytes for precise byte-level memory control and overflow prevention - Convert MB to bytes at config load time, bytes to MB at display time - Fix LRUEvictionCleansUpMetadata test with byte-precision cache sizing ## Resource Management - Add RAII pattern for MySQL result sets to prevent memory leaks (MysqlResultGuard in connection.cpp) ## Thread Safety & Concurrency - Fix InvalidationManager::ClearTable deadlock by using lock_guard - Add comprehensive test coverage for concurrent scenarios ## Test Reliability - Add RESOURCE_LOCK to HTTP server tests to prevent port conflicts during parallel execution This commit resolves critical issues including: - Deadlock risks in cache invalidation - Memory leaks in MySQL connection handling - Race conditions in concurrent operations - Test flakiness in parallel execution
1 parent 070c890 commit b6744e3

29 files changed

+1210
-118
lines changed

src/cache/cache_manager.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@ CacheManager::CacheManager(const config::CacheConfig& cache_config,
1515
: enabled_(cache_config.enabled) {
1616
if (enabled_) {
1717
// Create query cache
18-
const size_t max_memory_bytes = static_cast<size_t>(cache_config.max_memory_mb) * 1024 * 1024;
19-
query_cache_ = std::make_unique<QueryCache>(max_memory_bytes, cache_config.min_query_cost_ms);
18+
query_cache_ = std::make_unique<QueryCache>(cache_config.max_memory_bytes, cache_config.min_query_cost_ms);
2019

2120
// Create invalidation manager
2221
invalidation_mgr_ = std::make_unique<InvalidationManager>(query_cache_.get());
2322

23+
// Set eviction callback to clean up invalidation metadata
24+
query_cache_->SetEvictionCallback([this](const CacheKey& key) {
25+
if (invalidation_mgr_) {
26+
invalidation_mgr_->UnregisterCacheEntry(key);
27+
}
28+
});
29+
2430
// Create invalidation queue with table_contexts for per-table ngram settings
2531
invalidation_queue_ =
2632
std::make_unique<InvalidationQueue>(query_cache_.get(), invalidation_mgr_.get(), table_contexts);

src/cache/invalidation_manager.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,8 @@ std::unordered_set<CacheKey> InvalidationManager::InvalidateAffectedEntries(cons
7171
return affected_keys;
7272
}
7373

74-
void InvalidationManager::UnregisterCacheEntry(const CacheKey& key) {
75-
std::unique_lock lock(mutex_);
76-
74+
// Internal helper: unregister cache entry without locking (assumes mutex is already held)
75+
void InvalidationManager::UnregisterCacheEntryUnlocked(const CacheKey& key) {
7776
// Find metadata
7877
auto metadata_it = cache_metadata_.find(key);
7978
if (metadata_it == cache_metadata_.end()) {
@@ -107,6 +106,11 @@ void InvalidationManager::UnregisterCacheEntry(const CacheKey& key) {
107106
cache_metadata_.erase(metadata_it);
108107
}
109108

109+
void InvalidationManager::UnregisterCacheEntry(const CacheKey& key) {
110+
std::unique_lock lock(mutex_);
111+
UnregisterCacheEntryUnlocked(key);
112+
}
113+
110114
void InvalidationManager::ClearTable(const std::string& table_name) {
111115
std::unique_lock lock(mutex_);
112116

@@ -118,14 +122,12 @@ void InvalidationManager::ClearTable(const std::string& table_name) {
118122
}
119123
}
120124

121-
// Remove entries
122-
lock.unlock();
125+
// Remove entries while holding lock (use unlocked version to avoid deadlock)
123126
for (const auto& key : to_remove) {
124-
UnregisterCacheEntry(key);
127+
UnregisterCacheEntryUnlocked(key);
125128
}
126129

127-
// Remove table from reverse index
128-
lock.lock();
130+
// Remove table from reverse index (already holding lock)
129131
ngram_to_cache_keys_.erase(table_name);
130132
}
131133

src/cache/invalidation_manager.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,13 @@ class InvalidationManager {
120120
// Thread safety
121121
mutable std::shared_mutex mutex_;
122122

123+
/**
124+
* @brief Internal helper: unregister cache entry without locking
125+
* @param key Cache key to unregister
126+
* @note Assumes mutex_ is already held by caller
127+
*/
128+
void UnregisterCacheEntryUnlocked(const CacheKey& key);
129+
123130
/**
124131
* @brief Extract ngrams from text
125132
* @param text Text to extract ngrams from

src/cache/invalidation_queue.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ void InvalidationQueue::Enqueue(const std::string& table_name, const std::string
3838

3939
// Phase 2: Erase from cache immediately (no queuing)
4040
for (const auto& key : affected_keys) {
41+
// Unregister metadata first to prevent memory leak even if Erase throws
42+
invalidation_mgr_->UnregisterCacheEntry(key);
43+
4144
if (cache_ != nullptr) {
4245
cache_->Erase(key);
4346
}
@@ -119,6 +122,11 @@ void InvalidationQueue::WorkerLoop() {
119122
const auto time_since_oldest = now - oldest_timestamp;
120123

121124
if (pending_ngrams_.size() >= batch_size_ || time_since_oldest >= max_delay_) {
125+
// Check running_ before processing to handle spurious wakeup and shutdown
126+
if (!running_.load()) {
127+
break;
128+
}
129+
122130
// Process batch
123131
lock.unlock();
124132
ProcessBatch();
@@ -127,6 +135,11 @@ void InvalidationQueue::WorkerLoop() {
127135
const auto remaining_delay = max_delay_ - time_since_oldest;
128136
queue_cv_.wait_for(lock, remaining_delay,
129137
[this] { return !running_.load() || pending_ngrams_.size() >= batch_size_; });
138+
139+
// After wakeup, check running_ before continuing
140+
if (!running_.load()) {
141+
break;
142+
}
130143
}
131144
}
132145
}

src/cache/query_cache.cpp

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,17 @@ std::optional<std::vector<DocId>> QueryCache::Lookup(const CacheKey& key) {
7777
return std::nullopt;
7878
}
7979

80-
// Copy query_cost_ms before releasing lock to avoid use-after-free
80+
// Copy query_cost_ms and created_at before releasing lock to avoid use-after-free
8181
const double query_cost_ms = entry.query_cost_ms;
82+
const auto created_at = entry.metadata.created_at;
8283

8384
// Update access time (need to upgrade to unique lock)
8485
lock.unlock();
8586
std::unique_lock write_lock(mutex_);
8687

87-
// Re-check existence (might have been evicted)
88+
// Re-check existence and verify it's the same entry (not a new entry with same key)
8889
iter = cache_map_.find(key);
89-
if (iter != cache_map_.end()) {
90+
if (iter != cache_map_.end() && iter->second.first.metadata.created_at == created_at) {
9091
Touch(key);
9192
iter->second.first.metadata.last_accessed = std::chrono::steady_clock::now();
9293
iter->second.first.metadata.access_count++;
@@ -119,14 +120,14 @@ bool QueryCache::Insert(const CacheKey& key, const std::vector<DocId>& result, c
119120
return false;
120121
}
121122

122-
// Calculate memory usage
123+
// Create cache entry to calculate accurate memory usage
124+
CacheEntry temp_entry;
125+
temp_entry.compressed = std::move(compressed);
126+
temp_entry.metadata = metadata;
127+
123128
const size_t original_count = result.size(); // Number of DocId elements, not bytes
124-
const size_t compressed_size = compressed.size();
125-
size_t ngrams_size = 0;
126-
for (const auto& ngram : metadata.ngrams) {
127-
ngrams_size += ngram.capacity();
128-
}
129-
const size_t entry_memory = sizeof(CacheEntry) + compressed_size + ngrams_size;
129+
const size_t compressed_size = temp_entry.compressed.size();
130+
const size_t entry_memory = temp_entry.MemoryUsage();
130131

131132
// Don't cache if entry is too large
132133
if (entry_memory > max_memory_bytes_) {
@@ -148,24 +149,21 @@ bool QueryCache::Insert(const CacheKey& key, const std::vector<DocId>& result, c
148149
}
149150
}
150151

151-
// Create cache entry
152-
CacheEntry entry;
153-
entry.key = key;
154-
entry.compressed = std::move(compressed);
155-
entry.original_size = original_count; // Store count, not bytes
156-
entry.compressed_size = compressed_size;
157-
entry.query_cost_ms = query_cost_ms;
158-
entry.metadata = metadata;
159-
entry.metadata.created_at = std::chrono::steady_clock::now();
160-
entry.metadata.last_accessed = entry.metadata.created_at;
161-
entry.invalidated.store(false);
152+
// Complete cache entry (reuse temp_entry to maintain consistent memory calculation)
153+
temp_entry.key = key;
154+
temp_entry.original_size = original_count; // Store count, not bytes
155+
temp_entry.compressed_size = compressed_size;
156+
temp_entry.query_cost_ms = query_cost_ms;
157+
temp_entry.metadata.created_at = std::chrono::steady_clock::now();
158+
temp_entry.metadata.last_accessed = temp_entry.metadata.created_at;
159+
temp_entry.invalidated.store(false);
162160

163161
// Insert into LRU list (front = most recent)
164162
lru_list_.push_front(key);
165163
auto lru_it = lru_list_.begin();
166164

167165
// Insert into cache map using emplace to avoid copy
168-
cache_map_.emplace(key, std::make_pair(std::move(entry), lru_it));
166+
cache_map_.emplace(key, std::make_pair(std::move(temp_entry), lru_it));
169167

170168
// Update memory tracking
171169
total_memory_bytes_ += entry_memory;
@@ -279,6 +277,11 @@ bool QueryCache::EvictForSpace(size_t required_bytes) {
279277
lru_list_.pop_back();
280278
cache_map_.erase(iter);
281279

280+
// Notify eviction callback (for InvalidationManager cleanup)
281+
if (eviction_callback_) {
282+
eviction_callback_(lru_key);
283+
}
284+
282285
// Update memory tracking
283286
total_memory_bytes_ -= entry_memory;
284287
stats_.current_entries--;

src/cache/query_cache.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#pragma once
77

88
#include <atomic>
9+
#include <functional>
910
#include <list>
1011
#include <memory>
1112
#include <mutex>
@@ -113,6 +114,12 @@ struct CacheStatistics {
113114
*/
114115
class QueryCache {
115116
public:
117+
/**
118+
* @brief Callback type for eviction notifications
119+
* @param key The cache key being evicted
120+
*/
121+
using EvictionCallback = std::function<void(const CacheKey&)>;
122+
116123
/**
117124
* @brief Constructor
118125
* @param max_memory_bytes Maximum memory usage in bytes
@@ -213,6 +220,12 @@ class QueryCache {
213220
*/
214221
void IncrementInvalidationBatches() { stats_.invalidations_batches++; }
215222

223+
/**
224+
* @brief Set callback to be notified when entries are evicted
225+
* @param callback Function to call when an entry is evicted via LRU
226+
*/
227+
void SetEvictionCallback(EvictionCallback callback) { eviction_callback_ = std::move(callback); }
228+
216229
private:
217230
// LRU list: most recently used at front
218231
std::list<CacheKey> lru_list_;
@@ -233,6 +246,9 @@ class QueryCache {
233246
// Statistics
234247
CacheStatistics stats_;
235248

249+
// Eviction callback
250+
EvictionCallback eviction_callback_;
251+
236252
/**
237253
* @brief Evict entries to make room for new entry
238254
* @param required_bytes Bytes needed for new entry

src/config/config.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,8 @@ Config ParseConfigFromJson(const json& root) {
569569
config.cache.enabled = cache["enabled"].get<bool>();
570570
}
571571
if (cache.contains("max_memory_mb")) {
572-
config.cache.max_memory_mb = cache["max_memory_mb"].get<int>();
572+
int max_memory_mb = cache["max_memory_mb"].get<int>();
573+
config.cache.max_memory_bytes = static_cast<size_t>(max_memory_mb) * 1024 * 1024;
573574
}
574575
if (cache.contains("min_query_cost_ms")) {
575576
config.cache.min_query_cost_ms = cache["min_query_cost_ms"].get<double>();

src/config/config.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ struct LoggingConfig {
243243
*/
244244
struct CacheConfig {
245245
bool enabled = true; ///< Enable/disable cache (default: true)
246-
int max_memory_mb = 32; ///< Maximum cache memory in MB (default: 32) // NOLINT
246+
size_t max_memory_bytes = 32 * 1024 * 1024; ///< Maximum cache memory in bytes (default: 32MB) // NOLINT
247247
double min_query_cost_ms = 10.0; ///< Minimum query cost to cache (default: 10ms) // NOLINT
248248
int ttl_seconds = 3600; ///< Cache entry TTL (default: 1 hour, 0 = no TTL) // NOLINT
249249
std::string invalidation_strategy = "ngram"; ///< Invalidation strategy: "ngram", "table"

src/config/config_help.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ nlohmann::json ConfigToJson(const Config& config) {
210210
// Cache configuration
211211
json["cache"] = {
212212
{"enabled", config.cache.enabled},
213-
{"max_memory_mb", config.cache.max_memory_mb},
213+
{"max_memory_mb", config.cache.max_memory_bytes / (1024 * 1024)}, // Convert bytes to MB for display
214214
{"min_query_cost_ms", config.cache.min_query_cost_ms},
215215
{"ttl_seconds", config.cache.ttl_seconds},
216216
{"invalidation_strategy", config.cache.invalidation_strategy},

src/index/index.cpp

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,12 +445,41 @@ Index::IndexStatistics Index::GetStatistics() const {
445445
}
446446

447447
void Index::Optimize(uint64_t total_docs) {
448-
for (auto& [term, posting] : term_postings_) {
448+
// Check if already optimizing (prevent concurrent Optimize() calls)
449+
bool expected = false;
450+
if (!is_optimizing_.compare_exchange_strong(expected, true)) {
451+
spdlog::warn("Optimization already in progress, ignoring Optimize() request");
452+
return;
453+
}
454+
455+
// RAII guard to ensure flag is cleared
456+
struct OptimizationGuard {
457+
std::atomic<bool>& flag;
458+
explicit OptimizationGuard(std::atomic<bool>& f) : flag(f) {}
459+
~OptimizationGuard() { flag = false; }
460+
OptimizationGuard(const OptimizationGuard&) = delete;
461+
OptimizationGuard& operator=(const OptimizationGuard&) = delete;
462+
};
463+
OptimizationGuard guard(is_optimizing_);
464+
465+
// Take snapshot to prevent iterator invalidation while allowing concurrent searches
466+
std::vector<std::pair<std::string, PostingList*>> postings_snapshot;
467+
{
468+
std::shared_lock<std::shared_mutex> lock(postings_mutex_);
469+
postings_snapshot.reserve(term_postings_.size());
470+
for (const auto& [term, posting] : term_postings_) {
471+
postings_snapshot.emplace_back(term, posting.get());
472+
}
473+
}
474+
475+
// Optimize each posting
476+
for (const auto& [term, posting] : postings_snapshot) {
449477
posting->Optimize(total_docs);
450478
}
479+
451480
// NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers)
452481
// 1024: Standard conversion factor for bytes to KB to MB
453-
spdlog::info("Optimized index: {} terms, {} MB", term_postings_.size(), MemoryUsage() / (1024 * 1024));
482+
spdlog::info("Optimized index: {} terms, {} MB", postings_snapshot.size(), MemoryUsage() / (1024 * 1024));
454483
// NOLINTEND(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers)
455484
}
456485

0 commit comments

Comments
 (0)