Skip to content

Commit 6e5e108

Browse files
authored
Add support for pinging (#77)
* Add keepalive interval and the server timeout.
1 parent 52ba93a commit 6e5e108

File tree

7 files changed

+324
-7
lines changed

7 files changed

+324
-7
lines changed

include/signalrclient/signalr_client_config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ namespace signalr
4747
SIGNALRCLIENT_API const std::shared_ptr<scheduler>& __cdecl get_scheduler() const noexcept;
4848
SIGNALRCLIENT_API void set_handshake_timeout(std::chrono::milliseconds);
4949
SIGNALRCLIENT_API std::chrono::milliseconds get_handshake_timeout() const noexcept;
50+
SIGNALRCLIENT_API void set_server_timeout(std::chrono::milliseconds);
51+
SIGNALRCLIENT_API std::chrono::milliseconds get_server_timeout() const noexcept;
52+
SIGNALRCLIENT_API void set_keepalive_interval(std::chrono::milliseconds);
53+
SIGNALRCLIENT_API std::chrono::milliseconds get_keepalive_interval() const noexcept;
5054

5155
private:
5256
#ifdef USE_CPPRESTSDK
@@ -56,5 +60,7 @@ namespace signalr
5660
std::map<std::string, std::string> m_http_headers;
5761
std::shared_ptr<scheduler> m_scheduler;
5862
std::chrono::milliseconds m_handshake_timeout;
63+
std::chrono::milliseconds m_server_timeout;
64+
std::chrono::milliseconds m_keepalive_interval;
5965
};
6066
}

src/signalrclient/hub_connection_impl.cpp

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ namespace signalr
185185
callback(exception);
186186
}, exception);
187187
}
188+
else
189+
{
190+
connection->start_keepalive();
191+
}
188192
};
189193

190194
auto handshake_request = handshake::write_handshake(connection->m_protocol);
@@ -348,6 +352,7 @@ namespace signalr
348352
}
349353
}
350354

355+
reset_server_timeout();
351356
auto messages = m_protocol->parse_messages(response);
352357

353358
for (const auto& val : messages)
@@ -385,7 +390,10 @@ namespace signalr
385390
// Sent to server only, should not be received by client
386391
throw std::runtime_error("Received unexpected message type 'CancelInvocation'.");
387392
case message_type::ping:
388-
// TODO
393+
if (m_logger.is_enabled(trace_level::debug))
394+
{
395+
m_logger.log(trace_level::debug, "ping message received.");
396+
}
389397
break;
390398
case message_type::close:
391399
// TODO
@@ -477,6 +485,8 @@ namespace signalr
477485
}
478486
}
479487
});
488+
489+
reset_send_ping();
480490
}
481491
catch (const std::exception& e)
482492
{
@@ -510,6 +520,126 @@ namespace signalr
510520
m_disconnected = disconnected;
511521
}
512522

523+
void hub_connection_impl::reset_send_ping()
524+
{
525+
auto timeMs = (std::chrono::steady_clock::now() + m_signalr_client_config.get_keepalive_interval()).time_since_epoch();
526+
m_nextActivationSendPing.store(std::chrono::duration_cast<std::chrono::milliseconds>(timeMs).count());
527+
}
528+
529+
void hub_connection_impl::reset_server_timeout()
530+
{
531+
auto timeMs = (std::chrono::steady_clock::now() + m_signalr_client_config.get_server_timeout()).time_since_epoch();
532+
m_nextActivationServerTimeout.store(std::chrono::duration_cast<std::chrono::milliseconds>(timeMs).count());
533+
}
534+
535+
void hub_connection_impl::start_keepalive()
536+
{
537+
if (m_logger.is_enabled(trace_level::debug))
538+
{
539+
m_logger.log(trace_level::debug, "starting keep alive timer.");
540+
}
541+
542+
auto send_ping = [](std::shared_ptr<hub_connection_impl> connection)
543+
{
544+
if (!connection)
545+
{
546+
return;
547+
}
548+
549+
if (connection->get_connection_state() != connection_state::connected)
550+
{
551+
return;
552+
}
553+
554+
try
555+
{
556+
hub_message ping_msg(signalr::message_type::ping);
557+
auto message = connection->m_protocol->write_message(&ping_msg);
558+
559+
std::weak_ptr<hub_connection_impl> weak_connection = connection;
560+
connection->m_connection->send(
561+
message,
562+
connection->m_protocol->transfer_format(), [weak_connection](std::exception_ptr exception)
563+
{
564+
auto connection = weak_connection.lock();
565+
if (connection)
566+
{
567+
if (exception)
568+
{
569+
if (connection->m_logger.is_enabled(trace_level::warning))
570+
{
571+
connection->m_logger.log(trace_level::warning, "failed to send ping!");
572+
}
573+
}
574+
else
575+
{
576+
connection->reset_send_ping();
577+
}
578+
}
579+
});
580+
}
581+
catch (const std::exception& e)
582+
{
583+
if (connection->m_logger.is_enabled(trace_level::warning))
584+
{
585+
connection->m_logger.log(trace_level::warning, std::string("failed to send ping: ").append(e.what()));
586+
}
587+
}
588+
};
589+
590+
send_ping(shared_from_this());
591+
reset_server_timeout();
592+
593+
std::weak_ptr<hub_connection_impl> weak_connection = shared_from_this();
594+
timer(m_signalr_client_config.get_scheduler(),
595+
[send_ping, weak_connection](std::chrono::milliseconds)
596+
{
597+
auto connection = weak_connection.lock();
598+
599+
if (!connection)
600+
{
601+
return true;
602+
}
603+
604+
if (connection->get_connection_state() != connection_state::connected)
605+
{
606+
return true;
607+
}
608+
609+
auto timeNowmSeconds =
610+
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
611+
612+
if (timeNowmSeconds > connection->m_nextActivationServerTimeout.load())
613+
{
614+
if (connection->get_connection_state() == connection_state::connected)
615+
{
616+
auto error_msg = std::string("server timeout (")
617+
.append(std::to_string(connection->m_signalr_client_config.get_server_timeout().count()))
618+
.append(" ms) elapsed without receiving a message from the server.");
619+
if (connection->m_logger.is_enabled(trace_level::warning))
620+
{
621+
connection->m_logger.log(trace_level::warning, error_msg);
622+
}
623+
624+
connection->m_connection->stop([](std::exception_ptr)
625+
{
626+
}, std::make_exception_ptr(signalr_exception(error_msg)));
627+
}
628+
}
629+
630+
if (timeNowmSeconds > connection->m_nextActivationSendPing.load())
631+
{
632+
if (connection->m_logger.is_enabled(trace_level::debug))
633+
{
634+
connection->m_logger.log(trace_level::debug, "sending ping to server.");
635+
}
636+
send_ping(connection);
637+
}
638+
639+
return false;
640+
});
641+
}
642+
513643
// unnamed namespace makes it invisble outside this translation unit
514644
namespace
515645
{

src/signalrclient/hub_connection_impl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ namespace signalr
6565
signalr_client_config m_signalr_client_config;
6666
std::unique_ptr<hub_protocol> m_protocol;
6767

68+
std::atomic<int64_t> m_nextActivationServerTimeout;
69+
std::atomic<int64_t> m_nextActivationSendPing;
70+
6871
std::mutex m_stop_callback_lock;
6972
std::vector<std::function<void(std::exception_ptr)>> m_stop_callbacks;
7073

@@ -75,5 +78,10 @@ namespace signalr
7578
void invoke_hub_method(const std::string& method_name, const std::vector<signalr::value>& arguments, const std::string& callback_id,
7679
std::function<void()> set_completion, std::function<void(const std::exception_ptr)> set_exception) noexcept;
7780
bool invoke_callback(completion_message* completion);
81+
82+
void reset_send_ping();
83+
void reset_server_timeout();
84+
85+
void start_keepalive();
7886
};
7987
}

src/signalrclient/signalr_client_config.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ namespace signalr
4444

4545
signalr_client_config::signalr_client_config()
4646
: m_handshake_timeout(std::chrono::seconds(15))
47+
, m_server_timeout(std::chrono::seconds(30))
48+
, m_keepalive_interval(std::chrono::seconds(15))
4749
{
4850
m_scheduler = std::make_shared<signalr_default_scheduler>();
4951
}
@@ -92,4 +94,34 @@ namespace signalr
9294
{
9395
return m_handshake_timeout;
9496
}
97+
98+
void signalr_client_config::set_server_timeout(std::chrono::milliseconds timeout)
99+
{
100+
if (timeout <= std::chrono::seconds(0))
101+
{
102+
throw std::runtime_error("timeout must be greater than 0.");
103+
}
104+
105+
m_server_timeout = timeout;
106+
}
107+
108+
std::chrono::milliseconds signalr_client_config::get_server_timeout() const noexcept
109+
{
110+
return m_server_timeout;
111+
}
112+
113+
void signalr_client_config::set_keepalive_interval(std::chrono::milliseconds interval)
114+
{
115+
if (interval <= std::chrono::seconds(0))
116+
{
117+
throw std::runtime_error("interval must be greater than 0.");
118+
}
119+
120+
m_keepalive_interval = interval;
121+
}
122+
123+
std::chrono::milliseconds signalr_client_config::get_keepalive_interval() const noexcept
124+
{
125+
return m_keepalive_interval;
126+
}
95127
}

test/signalrclienttests/hub_connection_tests.cpp

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,9 +1738,9 @@ TEST(config, can_replace_scheduler)
17381738

17391739
mre.get();
17401740

1741-
// http_client->send (negotiate), websocket_client->start, handshake timeout timer, websocket_client->send, websocket_client->send, websocket_client->stop
1741+
// http_client->send (negotiate), websocket_client->start, handshake timeout timer, websocket_client->send, websocket_client->send, keep alive timer, websocket_client->send ping, websocket_client->stop
17421742
// handshake timeout timer can trigger more than once if test takes more than 1 second
1743-
ASSERT_GE(6, scheduler->schedule_count);
1743+
ASSERT_GE(scheduler->schedule_count, 8);
17441744
}
17451745

17461746
class throw_hub_protocol : public hub_protocol
@@ -1814,3 +1814,135 @@ TEST(send, throws_if_protocol_fails)
18141814

18151815
ASSERT_EQ(connection_state::connected, hub_connection->get_connection_state());
18161816
}
1817+
1818+
TEST(keepalive, sends_ping_messages)
1819+
{
1820+
signalr_client_config config;
1821+
config.set_keepalive_interval(std::chrono::seconds(1));
1822+
config.set_server_timeout(std::chrono::seconds(3));
1823+
auto ping_mre = manual_reset_event<void>();
1824+
auto messages = std::make_shared<std::deque<std::string>>();
1825+
auto websocket_client = create_test_websocket_client(
1826+
/* send function */ [messages, &ping_mre](const std::string& msg, std::function<void(std::exception_ptr)> callback)
1827+
{
1828+
if (messages->size() < 3)
1829+
{
1830+
messages->push_back(msg);
1831+
}
1832+
if (messages->size() == 3)
1833+
{
1834+
ping_mre.set();
1835+
}
1836+
callback(nullptr);
1837+
},
1838+
[](const std::string&, std::function<void(std::exception_ptr)> callback) { callback(nullptr); },
1839+
[](std::function<void(std::exception_ptr)> callback) { callback(nullptr); },
1840+
false);
1841+
auto hub_connection = create_hub_connection(websocket_client);
1842+
hub_connection.set_client_config(config);
1843+
1844+
auto mre = manual_reset_event<void>();
1845+
hub_connection.start([&mre](std::exception_ptr exception)
1846+
{
1847+
mre.set(exception);
1848+
});
1849+
1850+
ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
1851+
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
1852+
websocket_client->receive_message("{}\x1e");
1853+
1854+
mre.get();
1855+
1856+
ping_mre.get();
1857+
1858+
ASSERT_EQ(3, messages->size());
1859+
ASSERT_EQ("{\"protocol\":\"json\",\"version\":1}\x1e", (*messages)[0]);
1860+
ASSERT_EQ("{\"type\":6}\x1e", (*messages)[1]);
1861+
ASSERT_EQ("{\"type\":6}\x1e", (*messages)[2]);
1862+
ASSERT_EQ(connection_state::connected, hub_connection.get_connection_state());
1863+
}
1864+
1865+
TEST(keepalive, server_timeout_on_no_ping_from_server)
1866+
{
1867+
signalr_client_config config;
1868+
config.set_keepalive_interval(std::chrono::seconds(1));
1869+
config.set_server_timeout(std::chrono::seconds(1));
1870+
auto websocket_client = create_test_websocket_client();
1871+
auto hub_connection = create_hub_connection(websocket_client);
1872+
hub_connection.set_client_config(config);
1873+
1874+
auto disconnected_called = false;
1875+
1876+
auto disconnect_mre = manual_reset_event<void>();
1877+
hub_connection.set_disconnected([&disconnected_called, &disconnect_mre](std::exception_ptr ex)
1878+
{
1879+
disconnect_mre.set(ex);
1880+
});
1881+
1882+
auto mre = manual_reset_event<void>();
1883+
hub_connection.start([&mre](std::exception_ptr exception)
1884+
{
1885+
mre.set(exception);
1886+
});
1887+
1888+
ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
1889+
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
1890+
websocket_client->receive_message("{}\x1e");
1891+
1892+
mre.get();
1893+
1894+
try
1895+
{
1896+
disconnect_mre.get();
1897+
ASSERT_TRUE(false);
1898+
}
1899+
catch (const std::exception& ex)
1900+
{
1901+
ASSERT_STREQ("server timeout (1000 ms) elapsed without receiving a message from the server.", ex.what());
1902+
}
1903+
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
1904+
}
1905+
1906+
TEST(keepalive, resets_server_timeout_timer_on_any_message_from_server)
1907+
{
1908+
signalr_client_config config;
1909+
config.set_keepalive_interval(std::chrono::seconds(1));
1910+
config.set_server_timeout(std::chrono::seconds(1));
1911+
auto websocket_client = create_test_websocket_client();
1912+
auto hub_connection = create_hub_connection(websocket_client);
1913+
hub_connection.set_client_config(config);
1914+
1915+
auto disconnect_mre = manual_reset_event<void>();
1916+
hub_connection.set_disconnected([&disconnect_mre](std::exception_ptr ex)
1917+
{
1918+
disconnect_mre.set(ex);
1919+
});
1920+
1921+
auto mre = manual_reset_event<void>();
1922+
hub_connection.start([&mre](std::exception_ptr exception)
1923+
{
1924+
mre.set(exception);
1925+
});
1926+
1927+
ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
1928+
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
1929+
websocket_client->receive_message("{}\x1e");
1930+
1931+
mre.get();
1932+
1933+
std::this_thread::sleep_for(config.get_server_timeout() - std::chrono::milliseconds(500));
1934+
websocket_client->receive_message("{\"type\":6}\x1e");
1935+
std::this_thread::sleep_for(std::chrono::seconds(1));
1936+
ASSERT_EQ(connection_state::connected, hub_connection.get_connection_state());
1937+
1938+
try
1939+
{
1940+
disconnect_mre.get();
1941+
ASSERT_TRUE(false);
1942+
}
1943+
catch (const std::exception& ex)
1944+
{
1945+
ASSERT_STREQ("server timeout (1000 ms) elapsed without receiving a message from the server.", ex.what());
1946+
}
1947+
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
1948+
}

0 commit comments

Comments
 (0)