Skip to content

Commit dc11a67

Browse files
committed
refactor: Restructure server architecture with isolated components
Fixes #3 This commit refactors the TCP server architecture to improve modularity, testability, and maintainability by separating concerns into isolated components. ## New Components - **ConnectionAcceptor**: Network I/O layer (socket creation, accept loop) - **RequestDispatcher**: Application logic (query parsing, routing) - **TableCatalog**: Centralized table resource management - **SnapshotScheduler**: Background snapshot scheduling - **StatisticsService**: Metrics aggregation with separated side effects ## Key Improvements - **Separation of concerns**: Network, application logic, and domain layers - **Testability**: Each component can be tested independently - **Code clarity**: Explicit side effects, clear data flow - **Maintainability**: Reduced coupling, single responsibility principle ## API Changes - Removed deprecated `ResponseFormatter::FormatInfoResponse(table_contexts, ServerStats&, ...)` - Removed deprecated `ResponseFormatter::FormatPrometheusMetrics(table_contexts, ServerStats&, ...)` - New pattern: Aggregate metrics → Update stats → Format response - Applied modernize-pass-by-value for ServerConfig, DumpConfig ## Code Quality - Fixed short variable names (it→iter, fd→client_fd/socket_fd) - Applied clang-tidy suggestions (modernize-pass-by-value) - Fixed unused variable warnings in tests - Improved test file path uniqueness for parallel execution ## Testing - All existing tests pass (tcp_server_test: 34/34, http_server_test: 21/21) - Test modifications only for infrastructure improvements - No changes to test logic or coverage
1 parent 950e38e commit dc11a67

26 files changed

+1433
-660
lines changed

src/server/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ add_library(mygramdb_server STATIC
44
thread_pool.cpp
55
server_stats.cpp
66
response_formatter.cpp
7+
statistics_service.cpp
8+
table_catalog.cpp
9+
connection_acceptor.cpp
10+
request_dispatcher.cpp
11+
snapshot_scheduler.cpp
712
handlers/command_handler.cpp
813
handlers/search_handler.cpp
914
handlers/document_handler.cpp

src/server/connection_acceptor.cpp

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/**
2+
* @file connection_acceptor.cpp
3+
* @brief Implementation of ConnectionAcceptor
4+
*/
5+
6+
#include "server/connection_acceptor.h"
7+
8+
#include <arpa/inet.h>
9+
#include <netinet/in.h>
10+
#include <spdlog/spdlog.h>
11+
#include <sys/socket.h>
12+
#include <unistd.h>
13+
14+
#include <cstring>
15+
16+
#include "server/server_types.h"
17+
#include "server/thread_pool.h"
18+
19+
namespace mygramdb::server {
20+
21+
namespace {
22+
/**
23+
* @brief Helper to safely cast sockaddr_in* to sockaddr* for socket API
24+
*
25+
* POSIX socket API requires sockaddr* but we use sockaddr_in for IPv4.
26+
* This helper centralizes the required reinterpret_cast to a single location.
27+
*/
28+
inline struct sockaddr* ToSockaddr(struct sockaddr_in* addr) {
29+
return reinterpret_cast<struct sockaddr*>(addr); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
30+
}
31+
} // namespace
32+
33+
ConnectionAcceptor::ConnectionAcceptor(ServerConfig config, ThreadPool* thread_pool)
34+
: config_(std::move(config)), thread_pool_(thread_pool) {
35+
if (thread_pool_ == nullptr) {
36+
spdlog::error("ConnectionAcceptor: thread_pool cannot be null");
37+
}
38+
}
39+
40+
ConnectionAcceptor::~ConnectionAcceptor() {
41+
Stop();
42+
}
43+
44+
bool ConnectionAcceptor::Start() {
45+
if (running_) {
46+
last_error_ = "ConnectionAcceptor already running";
47+
return false;
48+
}
49+
50+
// Create socket
51+
server_fd_ = socket(AF_INET, SOCK_STREAM, 0);
52+
if (server_fd_ < 0) {
53+
last_error_ = "Failed to create socket: " + std::string(strerror(errno));
54+
spdlog::error("{}", last_error_);
55+
return false;
56+
}
57+
58+
// Set socket options
59+
if (!SetSocketOptions(server_fd_)) {
60+
close(server_fd_);
61+
server_fd_ = -1;
62+
return false;
63+
}
64+
65+
// Bind
66+
struct sockaddr_in address = {};
67+
std::memset(&address, 0, sizeof(address));
68+
address.sin_family = AF_INET;
69+
address.sin_addr.s_addr = INADDR_ANY;
70+
address.sin_port = htons(config_.port);
71+
72+
if (bind(server_fd_, ToSockaddr(&address), sizeof(address)) < 0) {
73+
last_error_ = "Failed to bind to port " + std::to_string(config_.port) + ": " + std::string(strerror(errno));
74+
spdlog::error("{}", last_error_);
75+
close(server_fd_);
76+
server_fd_ = -1;
77+
return false;
78+
}
79+
80+
// Get actual port if port 0 was specified
81+
if (config_.port == 0) {
82+
socklen_t addr_len = sizeof(address);
83+
if (getsockname(server_fd_, ToSockaddr(&address), &addr_len) == 0) {
84+
actual_port_ = ntohs(address.sin_port);
85+
}
86+
} else {
87+
actual_port_ = config_.port;
88+
}
89+
90+
// Listen
91+
if (listen(server_fd_, config_.max_connections) < 0) {
92+
last_error_ = "Failed to listen: " + std::string(strerror(errno));
93+
spdlog::error("{}", last_error_);
94+
close(server_fd_);
95+
server_fd_ = -1;
96+
return false;
97+
}
98+
99+
should_stop_ = false;
100+
running_ = true;
101+
102+
// Start accept thread
103+
accept_thread_ = std::make_unique<std::thread>(&ConnectionAcceptor::AcceptLoop, this);
104+
105+
spdlog::info("ConnectionAcceptor listening on {}:{}", config_.host, actual_port_);
106+
return true;
107+
}
108+
109+
void ConnectionAcceptor::Stop() {
110+
if (!running_) {
111+
return;
112+
}
113+
114+
spdlog::info("Stopping ConnectionAcceptor...");
115+
should_stop_ = true;
116+
running_ = false;
117+
118+
// Close server socket to unblock accept()
119+
if (server_fd_ >= 0) {
120+
close(server_fd_);
121+
server_fd_ = -1;
122+
}
123+
124+
// Wait for accept thread to finish
125+
if (accept_thread_ && accept_thread_->joinable()) {
126+
accept_thread_->join();
127+
}
128+
129+
// Close all active connections
130+
{
131+
std::lock_guard<std::mutex> lock(fds_mutex_);
132+
for (int socket_fd : active_fds_) {
133+
close(socket_fd);
134+
}
135+
active_fds_.clear();
136+
}
137+
138+
spdlog::info("ConnectionAcceptor stopped");
139+
}
140+
141+
void ConnectionAcceptor::SetConnectionHandler(ConnectionHandler handler) {
142+
connection_handler_ = std::move(handler);
143+
}
144+
145+
void ConnectionAcceptor::AcceptLoop() {
146+
spdlog::info("Accept loop started");
147+
148+
while (!should_stop_) {
149+
struct sockaddr_in client_addr = {};
150+
socklen_t client_len = sizeof(client_addr);
151+
152+
int client_fd = accept(server_fd_, ToSockaddr(&client_addr), &client_len);
153+
if (client_fd < 0) {
154+
if (!should_stop_) {
155+
spdlog::error("Accept failed: {}", strerror(errno));
156+
}
157+
continue;
158+
}
159+
160+
// Track connection
161+
{
162+
std::lock_guard<std::mutex> lock(fds_mutex_);
163+
active_fds_.insert(client_fd);
164+
}
165+
166+
// Submit to thread pool
167+
if (thread_pool_ != nullptr && connection_handler_) {
168+
thread_pool_->Submit([this, client_fd]() {
169+
connection_handler_(client_fd);
170+
RemoveConnection(client_fd);
171+
});
172+
} else {
173+
spdlog::error("No connection handler or thread pool configured");
174+
close(client_fd);
175+
RemoveConnection(client_fd);
176+
}
177+
}
178+
179+
spdlog::info("Accept loop exited");
180+
}
181+
182+
bool ConnectionAcceptor::SetSocketOptions(int socket_fd) {
183+
// SO_REUSEADDR: Allow reuse of local addresses
184+
int opt = 1;
185+
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
186+
last_error_ = "Failed to set SO_REUSEADDR: " + std::string(strerror(errno));
187+
spdlog::error("{}", last_error_);
188+
return false;
189+
}
190+
191+
// SO_KEEPALIVE: Enable TCP keepalive
192+
if (setsockopt(socket_fd, SOL_SOCKET, SO_KEEPALIVE, &opt, sizeof(opt)) < 0) {
193+
last_error_ = "Failed to set SO_KEEPALIVE: " + std::string(strerror(errno));
194+
spdlog::error("{}", last_error_);
195+
return false;
196+
}
197+
198+
// SO_RCVBUF: Set receive buffer size
199+
int rcvbuf = config_.recv_buffer_size;
200+
if (setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) < 0) {
201+
spdlog::warn("Failed to set SO_RCVBUF: {}", strerror(errno));
202+
// Non-fatal, continue
203+
}
204+
205+
// SO_SNDBUF: Set send buffer size
206+
int sndbuf = config_.send_buffer_size;
207+
if (setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)) < 0) {
208+
spdlog::warn("Failed to set SO_SNDBUF: {}", strerror(errno));
209+
// Non-fatal, continue
210+
}
211+
212+
return true;
213+
}
214+
215+
void ConnectionAcceptor::RemoveConnection(int socket_fd) {
216+
std::lock_guard<std::mutex> lock(fds_mutex_);
217+
active_fds_.erase(socket_fd);
218+
}
219+
220+
} // namespace mygramdb::server

src/server/connection_acceptor.h

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/**
2+
* @file connection_acceptor.h
3+
* @brief Network connection acceptor
4+
*/
5+
6+
#pragma once
7+
8+
#include <atomic>
9+
#include <functional>
10+
#include <memory>
11+
#include <mutex>
12+
#include <set>
13+
#include <string>
14+
#include <thread>
15+
16+
#include "server/server_types.h"
17+
18+
namespace mygramdb::server {
19+
20+
// Forward declarations
21+
class ThreadPool;
22+
23+
/**
24+
* @brief Network connection acceptor
25+
*
26+
* This class handles socket creation, accept loop, and connection dispatch.
27+
* It is isolated from application logic for independent testing.
28+
*
29+
* Key responsibilities:
30+
* - Create and configure server socket
31+
* - Accept incoming connections
32+
* - Dispatch connections to thread pool
33+
* - Track active connections
34+
* - Handle graceful shutdown
35+
*
36+
* Design principles:
37+
* - Single responsibility: network I/O only
38+
* - Testable without real network (can mock handler)
39+
* - Reusable for HTTP server, gRPC, etc.
40+
* - Thread-safe connection tracking
41+
*/
42+
class ConnectionAcceptor {
43+
public:
44+
/**
45+
* @brief Connection handler callback type
46+
*
47+
* This callback is invoked for each accepted connection.
48+
* The handler should process the connection and close the file descriptor.
49+
*/
50+
using ConnectionHandler = std::function<void(int client_fd)>;
51+
52+
/**
53+
* @brief Construct a ConnectionAcceptor
54+
* @param config Server configuration
55+
* @param thread_pool Thread pool for connection handling
56+
*/
57+
ConnectionAcceptor(ServerConfig config, ThreadPool* thread_pool);
58+
59+
// Disable copy and move
60+
ConnectionAcceptor(const ConnectionAcceptor&) = delete;
61+
ConnectionAcceptor& operator=(const ConnectionAcceptor&) = delete;
62+
ConnectionAcceptor(ConnectionAcceptor&&) = delete;
63+
ConnectionAcceptor& operator=(ConnectionAcceptor&&) = delete;
64+
65+
~ConnectionAcceptor();
66+
67+
/**
68+
* @brief Start accepting connections
69+
* @return true if started successfully
70+
*/
71+
bool Start();
72+
73+
/**
74+
* @brief Stop accepting connections
75+
*
76+
* Stops the accept loop and closes all active connections.
77+
*/
78+
void Stop();
79+
80+
/**
81+
* @brief Set connection handler callback
82+
* @param handler Callback to handle accepted connections
83+
*/
84+
void SetConnectionHandler(ConnectionHandler handler);
85+
86+
/**
87+
* @brief Get actual port being listened on
88+
* @return Port number (useful when config.port = 0)
89+
*/
90+
uint16_t GetPort() const { return actual_port_; }
91+
92+
/**
93+
* @brief Check if acceptor is running
94+
* @return true if accepting connections
95+
*/
96+
bool IsRunning() const { return running_; }
97+
98+
/**
99+
* @brief Get last error message
100+
* @return Error message
101+
*/
102+
const std::string& GetLastError() const { return last_error_; }
103+
104+
private:
105+
/**
106+
* @brief Accept loop (runs in separate thread)
107+
*/
108+
void AcceptLoop();
109+
110+
/**
111+
* @brief Set socket options (SO_REUSEADDR, SO_KEEPALIVE, etc.)
112+
* @param socket_fd Socket file descriptor
113+
* @return true if successful
114+
*/
115+
bool SetSocketOptions(int socket_fd);
116+
117+
/**
118+
* @brief Remove connection from active list
119+
* @param socket_fd Socket file descriptor
120+
*/
121+
void RemoveConnection(int socket_fd);
122+
123+
ServerConfig config_;
124+
ThreadPool* thread_pool_;
125+
ConnectionHandler connection_handler_;
126+
127+
int server_fd_ = -1;
128+
uint16_t actual_port_ = 0;
129+
std::atomic<bool> running_{false};
130+
std::atomic<bool> should_stop_{false};
131+
std::unique_ptr<std::thread> accept_thread_;
132+
133+
std::set<int> active_fds_;
134+
std::mutex fds_mutex_;
135+
136+
std::string last_error_;
137+
};
138+
139+
} // namespace mygramdb::server

0 commit comments

Comments
 (0)