77#include " shim.h"
88#include < sstream>
99
10+ static constexpr int MAX_MESSAGES_PER_SECOND = 20 ;
11+ static constexpr int MAX_RATE_LIMIT_STRIKES = 3 ;
12+
13+ // Resolve the real client IP from proxy headers, falling back to peer address
14+ static std::string resolveClientIp (const drogon::HttpRequestPtr& req) {
15+ // X-Forwarded-For: client, proxy1, proxy2 - take the leftmost
16+ auto xff = req->getHeader (" X-Forwarded-For" );
17+ if (!xff.empty ()) {
18+ auto comma = xff.find (' ,' );
19+ std::string ip = (comma != std::string::npos) ? xff.substr (0 , comma) : xff;
20+ while (!ip.empty () && ip.front () == ' ' ) ip.erase (ip.begin ());
21+ while (!ip.empty () && ip.back () == ' ' ) ip.pop_back ();
22+ if (!ip.empty ()) return ip;
23+ }
24+ // X-Real-IP: single IP set by nginx/proxy
25+ auto xri = req->getHeader (" X-Real-IP" );
26+ if (!xri.empty ()) return xri;
27+ // Direct connection
28+ return req->getPeerAddr ().toIp ();
29+ }
30+
31+ static WsConnectionContext* getWsContext (const drogon::WebSocketConnectionPtr& conn) {
32+ auto ctx = conn->getContext <WsConnectionContext>();
33+ return ctx.get ();
34+ }
35+
1036void QubicRpcWebSocket::handleNewConnection (
1137 const drogon::HttpRequestPtr& req,
1238 const drogon::WebSocketConnectionPtr& wsConnPtr)
1339{
40+ std::string peerIp = req->getPeerAddr ().toIpPort ();
41+ std::string clientIp = resolveClientIp (req);
42+
43+ // Attach context to connection
44+ auto ctx = std::make_shared<WsConnectionContext>();
45+ ctx->peerIp = peerIp;
46+ ctx->clientIp = clientIp;
47+ wsConnPtr->setContext (ctx);
48+
1449 // Reject connections until bootstrap is complete
1550 if (gCurrentVerifyLoggingTick .load () <= gInitialTick .load ()) {
16- Logger::get ()->debug ( " Qubic JSON-RPC WebSocket connection rejected (bootstrap in progress) from {}" ,
17- req-> getPeerAddr (). toIpPort () );
51+ Logger::get ()->info ( " WS connection rejected (bootstrap in progress) client={} peer= {}" ,
52+ clientIp, peerIp );
1853 Json::Value error;
1954 error[" jsonrpc" ] = " 2.0" ;
2055 error[" error" ][" code" ] = -32000 ;
@@ -26,8 +61,7 @@ void QubicRpcWebSocket::handleNewConnection(
2661 return ;
2762 }
2863
29- Logger::get ()->debug (" Qubic JSON-RPC WebSocket connection from {}" ,
30- req->getPeerAddr ().toIpPort ());
64+ Logger::get ()->info (" WS connection opened client={} peer={}" , clientIp, peerIp);
3165
3266 // Register with subscription manager
3367 QubicSubscriptionManager::instance ().addClient (wsConnPtr);
@@ -36,7 +70,12 @@ void QubicRpcWebSocket::handleNewConnection(
3670void QubicRpcWebSocket::handleConnectionClosed (
3771 const drogon::WebSocketConnectionPtr& wsConnPtr)
3872{
39- Logger::get ()->debug (" Qubic JSON-RPC WebSocket connection closed" );
73+ auto * ctx = getWsContext (wsConnPtr);
74+ if (ctx) {
75+ Logger::get ()->info (" WS connection closed client={} peer={}" , ctx->clientIp , ctx->peerIp );
76+ } else {
77+ Logger::get ()->info (" WS connection closed" );
78+ }
4079
4180 // Cleanup subscriptions
4281 QubicSubscriptionManager::instance ().removeClient (wsConnPtr);
@@ -60,6 +99,36 @@ void QubicRpcWebSocket::handleNewMessage(
6099 return ;
61100 }
62101
102+ // Rate limiting
103+ auto * ctx = getWsContext (wsConnPtr);
104+ if (ctx) {
105+ auto now = std::chrono::steady_clock::now ();
106+ auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - ctx->windowStart );
107+ if (elapsed.count () >= 1 ) {
108+ // Reset window
109+ ctx->windowStart = now;
110+ ctx->messageCount = 1 ;
111+ ctx->rateLimitStrikes = 0 ;
112+ } else {
113+ ctx->messageCount ++;
114+ if (ctx->messageCount > MAX_MESSAGES_PER_SECOND) {
115+ ctx->rateLimitStrikes ++;
116+ if (ctx->rateLimitStrikes >= MAX_RATE_LIMIT_STRIKES) {
117+ Logger::get ()->warn (" WS rate limit exceeded, disconnecting client={} peer={}" ,
118+ ctx->clientIp , ctx->peerIp );
119+ sendResponse (wsConnPtr, QubicRpcHandler::makeError (Json::Value::null,
120+ QubicRpcError::LIMIT_EXCEEDED, " Rate limit exceeded, disconnecting" ));
121+ wsConnPtr->shutdown ();
122+ return ;
123+ }
124+ sendResponse (wsConnPtr, QubicRpcHandler::makeError (Json::Value::null,
125+ QubicRpcError::LIMIT_EXCEEDED, " Rate limit exceeded: max " +
126+ std::to_string (MAX_MESSAGES_PER_SECOND) + " messages/second" ));
127+ return ;
128+ }
129+ }
130+ }
131+
63132 // Parse JSON
64133 Json::Value root;
65134 Json::CharReaderBuilder builder;
@@ -162,10 +231,26 @@ Json::Value QubicRpcWebSocket::dispatchMethod(
162231 Json::Value filterParams = params.size () > 1 ? params[1 ] : Json::Value ();
163232 std::string subId = QubicRpcMethods::subscribe (conn, subType, filterParams);
164233 if (subId.empty ()) {
234+ // Check if it was a valid type but limit exceeded
235+ if (subType == " newTicks" || subType == " logs" || subType == " transfers" || subType == " tickStream" ) {
236+ auto * ctx = getWsContext (conn);
237+ Logger::get ()->warn (" WS subscription limit reached type={} client={} peer={}" ,
238+ subType,
239+ ctx ? ctx->clientIp : " unknown" ,
240+ ctx ? ctx->peerIp : " unknown" );
241+ return QubicRpcHandler::makeError (id, QubicRpcError::LIMIT_EXCEEDED,
242+ " Maximum subscriptions per connection reached" );
243+ }
165244 return QubicRpcHandler::makeError (id, QubicRpcError::INVALID_PARAMS,
166245 " Invalid subscription type: " + subType +
167246 " . Valid types: newTicks, logs, transfers, tickStream" );
168247 }
248+ // Log successful subscription with IP info
249+ auto * ctx = getWsContext (conn);
250+ Logger::get ()->info (" WS subscribe type={} id={} client={} peer={}" ,
251+ subType, subId,
252+ ctx ? ctx->clientIp : " unknown" ,
253+ ctx ? ctx->peerIp : " unknown" );
169254 return QubicRpcHandler::makeResult (id, subId);
170255 }
171256 if (method == " qubic_unsubscribe" ) {
0 commit comments