@@ -77,21 +77,57 @@ std::string AMQErrorToString(amqp_rpc_reply_t reply) {
7777 return fmt::format (" {} ({})" , errorStr, AMQReplyToString (reply.reply_type ));
7878}
7979
80+ #define MAX_RECONNECT_ATTEMPTS 20
81+
8082class AMQT {
8183 private:
8284 amqp_connection_state_t m_connection;
8385 // std::unique_ptr<amqp_socket_t> m_socket;
8486 amqp_socket_t * m_socket;
8587 std::string client_id;
8688
87- std::atomic<int > m_remainingAttempts = {10 };
89+ std::atomic<int > m_remainingAttempts = {MAX_RECONNECT_ATTEMPTS };
8890 std::atomic<bool > m_connected{false };
8991
9092 std::mutex m_mutex;
9193 std::condition_variable m_cv;
9294
95+ std::chrono::steady_clock::time_point m_lastActivityTime;
96+ std::chrono::steady_clock::time_point m_lastHeartbeatTime;
97+
9398 public:
94- AMQT (const std::string& client_id) : client_id(client_id) {};
99+ AMQT (const std::string& client_id) : client_id(client_id) {
100+ m_lastActivityTime = std::chrono::steady_clock::now ();
101+ m_lastHeartbeatTime = m_lastActivityTime;
102+ };
103+ void updateActivity () {
104+ m_lastActivityTime = std::chrono::steady_clock::now ();
105+ }
106+ // since the current heartbeat does NOT work!!
107+ bool sendHeartbeat () {
108+ auto now = std::chrono::steady_clock::now ();
109+ auto lastDuration = std::chrono::duration_cast<std::chrono::seconds>(now - m_lastHeartbeatTime).count ();
110+ // every 10 seconds
111+ if (lastDuration >= 10 ) {
112+ std::lock_guard<std::mutex> lock (m_mutex);
113+ if (isConnected ()) {
114+ // because WHY CANT I USE amqp_heartbeat_send!?!? we will decide to manually construct it...
115+ amqp_frame_t frame;
116+ frame.channel = 0 ; // or 1? idk... amqp_socket.c says 0
117+ frame.frame_type = AMQP_FRAME_HEARTBEAT;
118+ auto res = amqp_send_frame (m_connection, &frame);
119+ if (AMQP_STATUS_OK != res) {
120+ log::warn (" Failed to send heartbeat: {}" , amqp_error_string2 (res));
121+ return false ;
122+ } else {
123+ // log::debug("Sent heartbeat!");
124+ m_lastHeartbeatTime = now;
125+ return true ;
126+ }
127+ }
128+ }
129+ return true ;
130+ }
95131 bool isConnected () {
96132 return m_connected.load ();
97133 }
@@ -186,13 +222,32 @@ class AMQT {
186222 }
187223 }
188224
189- for (;;) {
225+ bool forceReconnection = false ;
226+ auto lastReconnectTime = std::chrono::steady_clock::now ();
227+
228+ struct timeval timeout = {10 , 0 };
229+
230+ while (isConnected ()) {
231+ // this is just an extra so we're sure that we're always connected
232+ if (!sendHeartbeat ()) {
233+ log::error (" Couldn't send heartbeat, disconnecting..." );
234+ break ;
235+ }
236+
237+ if (forceReconnection) {
238+ auto now = std::chrono::steady_clock::now ();
239+ auto lastDuration = std::chrono::duration_cast<std::chrono::seconds>(now - lastReconnectTime).count ();
240+ if (lastDuration >= 600 ) {
241+ log::info (" Forcing a reconnection..." );
242+ break ;
243+ }
244+ }
245+
190246 amqp_envelope_t envelope;
191247 amqp_maybe_release_buffers (m_connection);
192- // struct timeval timeout = {1, 0};
193- auto ret = amqp_consume_message (m_connection, &envelope, NULL , 0 );
248+
249+ auto ret = amqp_consume_message (m_connection, &envelope, &timeout , 0 );
194250 if (AMQP_RESPONSE_NORMAL != ret.reply_type ) {
195- log::error (" Response wasn't normal! {}" , AMQErrorToString (ret));
196251 if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && AMQP_STATUS_HEARTBEAT_TIMEOUT == ret.library_error ) {
197252 log::error (" Heartbeat failed, initiating a reconnection!" );
198253 break ;
@@ -222,7 +277,12 @@ class AMQT {
222277 log::error (" (Frame ID: {}) Error consuming message: {}" , frame.payload .method .id , AMQErrorToString (ret));
223278 break ;
224279 }
280+ } else {
281+ log::error (" Response wasn't normal! {}" , AMQErrorToString (ret));
225282 }
283+ } else if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && AMQP_STATUS_TIMEOUT == ret.library_error ) {
284+ // honestly how has no one gotten this issue!?
285+ continue ;
226286 } else {
227287 log::error (" Error consuming message: {}" , AMQErrorToString (ret));
228288 break ;
@@ -241,12 +301,9 @@ class AMQT {
241301 m_remainingAttempts++;
242302 while (m_remainingAttempts-- > 0 ) {
243303 log::info (" Starting AMQP..." );
244- if (m_remainingAttempts != 10 ) {
245- if (m_connection) {
246- amqp_destroy_connection (m_connection);
247- m_connection = nullptr ;
248- }
249- int delayMs = 3000 * ((m_remainingAttempts + 1 ) - m_remainingAttempts);
304+ if (m_remainingAttempts != MAX_RECONNECT_ATTEMPTS) {
305+ int currentAttempt = MAX_RECONNECT_ATTEMPTS - m_remainingAttempts + 1 ;
306+ int delayMs = 3000 * std::pow (2 , currentAttempt - 1 );
250307 // TODO: fix it being stuck on 3 seconds
251308 log::warn (" Disconnected from server. Attempting to reconnect... ({} left, reconnecting in {} seconds)" , m_remainingAttempts, (float )((delayMs / 1000 .F )));
252309 std::this_thread::sleep_for (std::chrono::milliseconds (delayMs));
@@ -273,6 +330,7 @@ class AMQT {
273330 }
274331 m_connected = true ;
275332 setupChannel ();
333+ /*
276334 std::unique_lock<std::mutex> lock(m_mutex);
277335 if (!m_cv.wait_for(lock, std::chrono::seconds(10), [this]{
278336 return m_connection; // change thispls
@@ -281,12 +339,20 @@ class AMQT {
281339 }
282340 while (isConnected()) {
283341 m_cv.wait_for(lock, std::chrono::seconds(1), [this] { return !isConnected(); });
284- }
285-
342+ }*/
343+ // amqp_connection_close(m_connection, AMQP_REPLY_SUCCESS);
344+ disconnect ();
286345 }
287346 }
288347 log::error (" Maximum reconnection attempts reached, cannot reconnect. Restart the game to reconnect to the MQTT server." );
289348 }
349+ void disconnect () {
350+ if (m_connection) {
351+ m_connected = false ;
352+ amqp_destroy_connection (m_connection);
353+ m_connection = nullptr ;
354+ }
355+ }
290356};
291357
292358static std::unordered_map<std::string, web::WebTask> RUNNING_REQUESTS {};
@@ -518,7 +584,7 @@ class $modify(CCSprite) {
518584};
519585
520586// Child background
521- class $modify(CCScale9Sprite) {
587+ class $modify(cocos2d::extension:: CCScale9Sprite) {
522588 static cocos2d::extension::CCScale9Sprite* create (char const * name, CCRect rect) {
523589 auto ret = cocos2d::extension::CCScale9Sprite::create (name, rect);
524590 if (ret == nullptr ) return ret;
@@ -601,7 +667,6 @@ class $modify(CCScale9Sprite) {
601667 return ret;
602668 }
603669};
604-
605670// bool is_dailychest_ready = false;
606671bool is_socketserver_started = false ;
607672class $modify(MenuLayer) {
0 commit comments