@@ -178,6 +178,7 @@ class AMQT {
178178 }
179179 if (res) {
180180 std::string data ((char *)res->consumer_tag .bytes , res->consumer_tag .len );
181+ show_connected = true ;
181182 log::info (" Subscribed to Rate Notifications! (Consumer Tag: {})" , data);
182183 }
183184 }
@@ -248,242 +249,8 @@ class AMQT {
248249 }
249250 log::error (" Maximum reconnection attempts reached, cannot reconnect. Restart the game to reconnect to the MQTT server." );
250251 }
251-
252252};
253253
254- /*
255- class AMQT {
256- private:
257- std::unique_ptr<AMQP::Connection> m_connection;
258- std::unique_ptr<AMQP::Channel> m_channel;
259- std::string client_id;
260-
261- std::atomic<int> m_remainingAttempts = {10};
262- std::atomic<bool> m_connected{false};
263-
264- std::mutex m_mutex;
265- std::condition_variable m_cv;
266-
267- class QConnectionHandler : public AMQP::ConnectionHandler {
268- AMQT* m_handler;
269- virtual void onReady(AMQP::Connection *connection) override {
270- std::unique_lock<std::mutex> lock(m_handler->m_mutex);
271- log::info("AMQP Connection successful!");
272- m_handler->m_connected = true;
273- show_connected = true;
274- if (m_handler->m_connection) {
275- //m_handler->setupChannel();
276- }
277- m_handler->m_cv.notify_all();
278- }
279- virtual void onClosed(AMQP::Connection *connection) override {
280- m_handler->m_connected = false;
281- std::lock_guard<std::mutex> lock(m_handler->m_mutex);
282- log::info("Connection closed");
283- m_handler->m_cv.notify_all();
284- }
285- virtual void onError(AMQP::Connection *connection, const char *message) override {
286- m_handler->m_connected = false;
287- std::lock_guard<std::mutex> lock(m_handler->m_mutex);
288- log::info("Connection failed: {}", message);
289- m_handler->m_cv.notify_all();
290- }
291- virtual void onData(AMQP::Connection *connection, const char *data, size_t size) override {
292- log::info("Data {}", data);
293- }
294- public:
295- QConnectionHandler(AMQT* handler) : m_handler(handler) {}
296- };
297- std::unique_ptr<QConnectionHandler> m_handler;
298- public:
299- AMQT(const std::string& client_id) : client_id(client_id) {
300- m_handler = std::make_unique<QConnectionHandler>(this);
301- };
302- bool isConnected() {
303- return m_connected || m_connection.get()->ready();
304- }
305- void setupChannel() {
306- m_channel = std::make_unique<AMQP::Channel>(m_connection.get());
307- int flags = AMQP::durable;
308- bool showPastNotifs = Mod::get()->template getSettingValue<bool>("past-notifications");
309- if (!showPastNotifs) {
310- flags = 0;
311- }
312- m_channel->declareExchange("amq.topic", AMQP::topic, AMQP::durable).onSuccess([this, showPastNotifs, flags]() {
313- log::info("amq.topic exchange declared");
314- std::string queueName = fmt::format("amqp-queue", client_id);
315- AMQP::Table arguments;
316- if (showPastNotifs) {
317- arguments["x-queue-type"] = "quorum";
318- arguments["x-expires"] = 172800000; // 2 days
319- }
320- m_channel->declareQueue(queueName, flags, arguments).onSuccess([this](const std::string &name, uint32_t messageCount, uint32_t consumerCount) {
321- log::info("queue with name {} declared ({} msgs, {} consumers)", name, messageCount, consumerCount);
322- m_channel->bindQueue("amq.topic", name, "rate.#").onSuccess([this, name]() {
323- m_channel->consume(name, AMQP::noack).onReceived([this](const AMQP::Message &msg, uint64_t tag, bool redelivered) {
324- std::string data(msg.body(), msg.bodySize());
325- log::info("call rate event");
326- msgQueue.push(data);
327- }).onSuccess([this](const std::string &consumerTag) {
328- log::info("Subscribed to Rate Notifications (Consumer Tag {})", consumerTag);
329- }).onError([](const char *message) {
330- log::error("Error consuming: {}", message);
331- });
332- }).onError([](const char *message) {
333- log::error("Error binding queue: {}", message);
334- });
335- }).onError([](const char *message) {
336- log::error("Error declaring queue: {}", message);
337- });
338-
339- }).onError([](const char *message) {
340- log::error("Error declaring exchange: {}", message);
341- });
342- }
343- void connect() {
344-
345- // https://github.com/eclipse-paho/paho.mqtt.cpp/blob/master/examples/async_subscribe.cpp
346- while (true) {
347- log::info("Starting AMQP...");
348- int delayMs = 3000 * ((m_remainingAttempts + 1) - m_remainingAttempts);
349- if (m_remainingAttempts != 10) {
350- std::this_thread::sleep_for(std::chrono::seconds(delayMs / 1000));
351- }
352- if (!m_connection) {
353- AMQP::Address address("gdutils.clarifygdps.com", 5672, AMQP::Login("gd", "GeometryDashisahorizontalrunnerstylegamedevelopedandpublishedbyRobTopGames"), "/");
354- m_connection = std::make_unique<AMQP::Connection>(m_handler.get(), address);
355- log::info("s1 {}", m_connection.get()->heartbeat());
356- }
357- log::info("s2");
358- if (m_connection) {
359- log::info("s3");
360- if (m_connection.get()->heartbeat()) {
361- setupChannel();
362- }
363- std::unique_lock<std::mutex> lock(m_mutex);
364- if (!m_cv.wait_for(lock, std::chrono::seconds(10), [this]{
365- return m_connection.get()->initialized();
366- })) {
367- log::error("Connection timeout, assuming I can't connect!");
368- }
369- while (isConnected()) {
370- m_cv.wait_for(lock, std::chrono::seconds(1), [this] { return !isConnected(); });
371- }
372- if (m_remainingAttempts-- > 0) {
373- log::warn("Disconnected from server. Attempting to reconnect... ({} left, reconnecting in {} seconds)", m_remainingAttempts, (float)((delayMs / 1000.F)));
374- if (isConnected()) {
375- m_connection.get()->close();
376- }
377- m_connection.reset();
378- } else {
379- log::error("Maximum reconnection attempts reached, cannot reconnect. Restart the game to reconnect to the MQTT server.");
380- break;
381- }
382- }
383- }
384- }
385-
386- };*/
387-
388- /*
389- class MQTT {
390- private:
391- std::unique_ptr<mqtt::async_client> m_client;
392- mqtt::connect_options m_connOpts;
393- std::string client_id;
394-
395- std::atomic<int> m_remainingAttempts = {10};
396-
397- std::mutex m_mutex;
398- std::condition_variable m_cv;
399-
400- class Callback : public virtual mqtt::callback, public virtual mqtt::iaction_listener {
401- private:
402- MQTT* m_handler;
403-
404- public:
405- Callback(MQTT* handler) : m_handler(handler) {}
406- void connected(const std::string& reason) override {
407- std::unique_lock<std::mutex> lock(m_handler->m_mutex);
408- log::info("MQTT Connection successful!");
409- show_connected = true;
410- if (m_handler->m_client) {
411- log::info("Subscribed to Rate Notifications");
412- m_handler->m_client->subscribe("rate", 1);
413- }
414- m_handler->m_cv.notify_all();
415- }
416- void connection_lost(const std::string& reason) override {
417- std::lock_guard<std::mutex> lock(m_handler->m_mutex);
418- log::info("Connection closed: {}", reason);
419- m_handler->m_cv.notify_all();
420- }
421- void on_success(const mqtt::token& tok) override {}
422- void on_failure(const mqtt::token& tok) override {
423- log::error("Connection failed: {}", tok.get_error_message());
424- connection_lost(fmt::format("{} (Code: {})", tok.get_error_message(), tok.get_return_code()));
425- }
426-
427-
428- void message_arrived(mqtt::const_message_ptr data) override {
429- log::info("call rate event");
430- msgQueue.push(data);
431- }
432-
433- void delivery_complete(mqtt::delivery_token_ptr token) override {}
434- };
435-
436- public:
437- MQTT(const std::string& client_id) : client_id(client_id) {
438- m_connOpts.set_user_name("gd");
439- m_connOpts.set_password("GeometryDashisahorizontalrunnerstylegamedevelopedandpublishedbyRobTopGames");
440- m_connOpts.set_automatic_reconnect(true);
441- // false = retain, true = dies after disconnect
442- //m_connOpts.set_clean_session(true);
443- //m_connOpts.set_clean_session(false);
444- bool showPastNotifs = Mod::get()->template getSettingValue<bool>("past-notifications");
445- m_connOpts.set_clean_session(!showPastNotifs);
446- m_connOpts.set_connect_timeout(10);
447- }
448- void connect() {
449- // https://github.com/eclipse-paho/paho.mqtt.cpp/blob/master/examples/async_subscribe.cpp
450- Callback cb(this);
451- while (true) {
452- log::info("Starting MQTT...");
453- int delayMs = 3000 * ((m_remainingAttempts + 1) - m_remainingAttempts);
454- if (m_remainingAttempts != 10) {
455- std::this_thread::sleep_for(std::chrono::seconds(delayMs / 1000));
456- }
457- if (!m_client) {
458- m_client = std::make_unique<mqtt::async_client>("mqtt://gdutils.clarifygdps.com:1883", client_id);
459- m_client->set_callback(cb);
460- }
461- if (m_client) {
462- std::unique_lock<std::mutex> lock(m_mutex);
463- m_client->connect(m_connOpts, nullptr, cb);
464- if (!m_cv.wait_for(lock, std::chrono::seconds(10), [this]{
465- return m_client->is_connected();
466- })) {
467- log::error("Connection timeout, assuming I can't connect!");
468- }
469- while (m_client->is_connected()) {
470- m_cv.wait_for(lock, std::chrono::seconds(1), [this] { return !m_client->is_connected(); });
471- }
472- if (m_remainingAttempts-- > 0) {
473- log::warn("Disconnected from server. Attempting to reconnect... ({} left, reconnecting in {} seconds)", m_remainingAttempts, (float)((delayMs / 1000.F)));
474- if (m_client->is_connected()) {
475- m_client->disconnect();
476- }
477- m_client.reset();
478- } else {
479- log::error("Maximum reconnection attempts reached, cannot reconnect. Restart the game to reconnect to the MQTT server.");
480- break;
481- }
482- }
483- }
484- }
485- };
486- */
487254static std::unordered_map<std::string, web::WebTask> RUNNING_REQUESTS {};
488255static std::mutex lock_var;
489256
0 commit comments