Skip to content

Commit e8982a4

Browse files
committed
refactor: Improve thread safety, resource management, and observability
Implement comprehensive improvements based on code quality review: Thread Safety: - Add shared_mutex to PostingList for thread-safe concurrent access - Protect Roaring bitmap operations from data races - Add dedicated thread safety tests for PostingList Resource Management: - Introduce FDGuard and ScopeGuard utilities for RAII patterns - Fix FD leaks in ConnectionAcceptor and TcpServer - Ensure proper cleanup on exceptions using scope guards - Fix active_workers_ race condition in ThreadPool Performance Optimization: - Precompute cache keys in QueryParser to avoid redundant normalization - Use precomputed keys in CacheManager for faster lookups - Move cache key generation (md5, cache_key, query_normalizer) to query module - Update module dependencies to reflect new architecture Rate Limiting Integration: - Integrate RateLimiter into TcpServer and HttpServer - Add IP-based token bucket rate limiting - Return 429 status code for HTTP rate limit violations - Add configuration support for api.tcp.max_connections API Completeness: - Add POST /{table}/count endpoint to HTTP API - Implement tcp.max_connections configuration support - Add missing metrics to /health/detail endpoint (current_gtid, processed_events, queue_size) Testing: - Replace SUCCEED() with actual assertions in binlog filter validation tests - Replace trivial EXPECT_TRUE(true) with meaningful assertions - Add PostingList thread safety tests (5 test cases) - Add HTTP COUNT endpoint tests (4 test cases) - All 1,209 tests pass CI/CD: - Add GitHub Actions workflow for AddressSanitizer and ThreadSanitizer - Configure ASAN with leak detection and initialization order checking - Configure TSAN for data race and deadlock detection - Add path filtering and caching for efficient CI runs - Workflow initially disabled (manual trigger only) This change addresses critical thread safety issues, prevents resource leaks, and improves overall system reliability and observability.
1 parent 890f287 commit e8982a4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1589
-114
lines changed

.github/workflows/sanitizers.yml

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
name: Sanitizers (Disabled)
2+
3+
# Workflow is disabled - only manual trigger available
4+
# To re-enable, uncomment the push and pull_request triggers below
5+
on:
6+
workflow_dispatch: # Manual trigger only
7+
# push:
8+
# branches: [ main, develop ]
9+
# pull_request:
10+
# branches: [ main, develop ]
11+
12+
concurrency:
13+
group: ${{ github.workflow }}-${{ github.ref }}
14+
cancel-in-progress: true
15+
16+
jobs:
17+
# Detect which files have changed to skip unnecessary jobs
18+
changes:
19+
name: Detect Changes
20+
runs-on: ubuntu-latest
21+
permissions:
22+
pull-requests: read
23+
outputs:
24+
src: ${{ steps.filter.outputs.src }}
25+
tests: ${{ steps.filter.outputs.tests }}
26+
steps:
27+
- uses: actions/checkout@v4
28+
- uses: dorny/paths-filter@v3
29+
id: filter
30+
with:
31+
filters: |
32+
src:
33+
- 'src/**'
34+
- 'CMakeLists.txt'
35+
- 'third_party/**'
36+
tests:
37+
- 'tests/**'
38+
39+
# AddressSanitizer - Detects memory errors (use-after-free, buffer overflows, etc.)
40+
asan:
41+
name: AddressSanitizer
42+
needs: changes
43+
if: needs.changes.outputs.src == 'true' || needs.changes.outputs.tests == 'true'
44+
runs-on: ubuntu-latest
45+
46+
steps:
47+
- name: Checkout code
48+
uses: actions/checkout@v4
49+
50+
- name: Cache CMake dependencies
51+
uses: actions/cache@v4
52+
with:
53+
path: build/_deps
54+
key: ${{ runner.os }}-cmake-deps-asan-${{ hashFiles('CMakeLists.txt', 'third_party/**') }}
55+
restore-keys: |
56+
${{ runner.os }}-cmake-deps-
57+
58+
- name: Install dependencies
59+
run: |
60+
sudo apt-get update
61+
sudo apt-get install -y \
62+
cmake \
63+
build-essential \
64+
libmysqlclient-dev \
65+
libicu-dev \
66+
libreadline-dev \
67+
pkg-config
68+
69+
- name: Configure CMake with ASAN
70+
run: |
71+
mkdir -p build
72+
cd build
73+
cmake -DCMAKE_BUILD_TYPE=Debug \
74+
-DBUILD_TESTS=ON \
75+
-DENABLE_ASAN=ON \
76+
-DUSE_ICU=ON \
77+
-DUSE_MYSQL=ON \
78+
..
79+
80+
- name: Build
81+
run: |
82+
cd build
83+
make -j$(nproc)
84+
85+
- name: Run tests with ASAN
86+
run: |
87+
cd build
88+
# Set ASAN options for better error reporting
89+
export ASAN_OPTIONS=detect_leaks=1:check_initialization_order=1:detect_stack_use_after_return=1
90+
ctest --output-on-failure --parallel $(nproc)
91+
92+
- name: Upload build artifacts on failure
93+
if: failure()
94+
uses: actions/upload-artifact@v4
95+
with:
96+
name: asan-build-logs
97+
path: |
98+
build/CMakeFiles/*.log
99+
build/Testing/Temporary/
100+
101+
# ThreadSanitizer - Detects data races and thread safety issues
102+
tsan:
103+
name: ThreadSanitizer
104+
needs: changes
105+
if: needs.changes.outputs.src == 'true' || needs.changes.outputs.tests == 'true'
106+
runs-on: ubuntu-latest
107+
108+
steps:
109+
- name: Checkout code
110+
uses: actions/checkout@v4
111+
112+
- name: Cache CMake dependencies
113+
uses: actions/cache@v4
114+
with:
115+
path: build/_deps
116+
key: ${{ runner.os }}-cmake-deps-tsan-${{ hashFiles('CMakeLists.txt', 'third_party/**') }}
117+
restore-keys: |
118+
${{ runner.os }}-cmake-deps-
119+
120+
- name: Install dependencies
121+
run: |
122+
sudo apt-get update
123+
sudo apt-get install -y \
124+
cmake \
125+
build-essential \
126+
libmysqlclient-dev \
127+
libicu-dev \
128+
libreadline-dev \
129+
pkg-config
130+
131+
- name: Configure CMake with TSAN
132+
run: |
133+
mkdir -p build
134+
cd build
135+
cmake -DCMAKE_BUILD_TYPE=Debug \
136+
-DBUILD_TESTS=ON \
137+
-DENABLE_TSAN=ON \
138+
-DUSE_ICU=ON \
139+
-DUSE_MYSQL=ON \
140+
..
141+
142+
- name: Build
143+
run: |
144+
cd build
145+
make -j$(nproc)
146+
147+
- name: Run tests with TSAN
148+
run: |
149+
cd build
150+
# Set TSAN options for better error reporting
151+
export TSAN_OPTIONS=second_deadlock_stack=1:halt_on_error=0
152+
ctest --output-on-failure --parallel $(nproc)
153+
154+
- name: Upload build artifacts on failure
155+
if: failure()
156+
uses: actions/upload-artifact@v4
157+
with:
158+
name: tsan-build-logs
159+
path: |
160+
build/CMakeFiles/*.log
161+
build/Testing/Temporary/

src/cache/CMakeLists.txt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
add_library(mygramdb_cache STATIC
2-
md5.cpp
3-
cache_key.cpp
4-
query_normalizer.cpp
52
result_compressor.cpp
63
query_cache.cpp
74
invalidation_manager.cpp
@@ -14,7 +11,7 @@ target_include_directories(mygramdb_cache PUBLIC
1411
)
1512

1613
target_link_libraries(mygramdb_cache PUBLIC
17-
mygramdb_query
14+
mygramdb_query # For Query AST types and cache key generation
1815
mygramdb_utils
1916
lz4 # For LZ4 compression
2017
spdlog::spdlog

src/cache/cache_entry.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include <string>
1212
#include <vector>
1313

14-
#include "cache/cache_key.h"
14+
#include "query/cache_key.h"
1515
#include "query/query_parser.h"
1616

1717
namespace mygramdb::cache {

src/cache/cache_manager.cpp

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
#include "cache/cache_manager.h"
77

8-
#include "cache/cache_key.h"
8+
#include "query/cache_key.h"
99
#include "server/server_types.h"
1010

1111
namespace mygramdb::cache {
@@ -52,14 +52,20 @@ std::optional<std::vector<DocId>> CacheManager::Lookup(const query::Query& query
5252
return std::nullopt;
5353
}
5454

55-
// Normalize query and generate cache key
56-
const std::string normalized = QueryNormalizer::Normalize(query);
57-
if (normalized.empty()) {
58-
return std::nullopt;
55+
// Use precomputed cache key if available (performance optimization)
56+
CacheKey key;
57+
if (query.cache_key.has_value()) {
58+
key.hash_high = query.cache_key.value().first;
59+
key.hash_low = query.cache_key.value().second;
60+
} else {
61+
// Fallback: compute cache key on-the-fly (for backwards compatibility)
62+
const std::string normalized = QueryNormalizer::Normalize(query);
63+
if (normalized.empty()) {
64+
return std::nullopt;
65+
}
66+
key = CacheKeyGenerator::Generate(normalized);
5967
}
6068

61-
const CacheKey key = CacheKeyGenerator::Generate(normalized);
62-
6369
// Lookup in cache
6470
return query_cache_->Lookup(key);
6571
}
@@ -74,14 +80,20 @@ std::optional<CacheLookupResult> CacheManager::LookupWithMetadata(const query::Q
7480
return std::nullopt;
7581
}
7682

77-
// Normalize query and generate cache key
78-
const std::string normalized = QueryNormalizer::Normalize(query);
79-
if (normalized.empty()) {
80-
return std::nullopt;
83+
// Use precomputed cache key if available (performance optimization)
84+
CacheKey key;
85+
if (query.cache_key.has_value()) {
86+
key.hash_high = query.cache_key.value().first;
87+
key.hash_low = query.cache_key.value().second;
88+
} else {
89+
// Fallback: compute cache key on-the-fly (for backwards compatibility)
90+
const std::string normalized = QueryNormalizer::Normalize(query);
91+
if (normalized.empty()) {
92+
return std::nullopt;
93+
}
94+
key = CacheKeyGenerator::Generate(normalized);
8195
}
8296

83-
const CacheKey key = CacheKeyGenerator::Generate(normalized);
84-
8597
// Lookup in cache with metadata
8698
QueryCache::LookupMetadata metadata;
8799
auto result = query_cache_->LookupWithMetadata(key, metadata);

src/cache/cache_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#include "cache/invalidation_manager.h"
1414
#include "cache/invalidation_queue.h"
1515
#include "cache/query_cache.h"
16-
#include "cache/query_normalizer.h"
16+
#include "query/query_normalizer.h"
1717
#include "config/config.h"
1818
#include "query/query_parser.h"
1919

src/cache/invalidation_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#include <unordered_set>
1414

1515
#include "cache/cache_entry.h"
16-
#include "cache/cache_key.h"
16+
#include "query/cache_key.h"
1717

1818
namespace mygramdb::cache {
1919

src/cache/invalidation_queue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
#include <unordered_map>
1616
#include <unordered_set>
1717

18-
#include "cache/cache_key.h"
18+
#include "query/cache_key.h"
1919

2020
namespace mygramdb::server {
2121
struct TableContext;

src/cache/query_cache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
#include <unordered_map>
1616

1717
#include "cache/cache_entry.h"
18-
#include "cache/cache_key.h"
18+
#include "query/cache_key.h"
1919
#include "cache/result_compressor.h"
2020

2121
namespace mygramdb::cache {

src/config/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,12 @@ struct ApiConfig {
205205
static constexpr int kDefaultRateLimitCapacity = 100; ///< Default burst size
206206
static constexpr int kDefaultRateLimitRefillRate = 10; ///< Default tokens per second
207207
static constexpr int kDefaultRateLimitMaxClients = 10000; ///< Default max tracked clients
208+
static constexpr int kDefaultMaxConnections = 10000; ///< Default maximum concurrent connections
208209

209210
struct {
210211
std::string bind = "127.0.0.1";
211212
int port = defaults::kTcpPort;
213+
int max_connections = kDefaultMaxConnections; ///< Maximum concurrent connections
212214
} tcp;
213215

214216
struct {

src/index/index.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ PostingList* Index::GetOrCreatePostingList(std::string_view term) {
723723
const PostingList* Index::GetPostingList(std::string_view term) const {
724724
// NOTE: This method assumes postings_mutex_ is already locked by the caller
725725
// C++17: unordered_map doesn't support heterogeneous lookup, so we convert to std::string
726+
// TODO(performance): Consider upgrading to C++20 or using absl::flat_hash_map for heterogeneous lookup
726727
auto iterator = term_postings_.find(std::string(term));
727728
return iterator != term_postings_.end() ? iterator->second.get() : nullptr;
728729
}

0 commit comments

Comments
 (0)