Skip to content

Commit 032b055

Browse files
committed
Add multicast stress test
1 parent 529c17b commit 032b055

File tree

5 files changed

+282
-0
lines changed

5 files changed

+282
-0
lines changed

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ include(CTest)
44
enable_testing()
55

66
add_subdirectory(bit_stream)
7+
add_subdirectory(socket_extensions)

tests/bit_stream/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ add_executable(bit_stream_stress stress.cpp)
22
target_link_libraries(bit_stream_stress PRIVATE nalchi)
33
target_compile_options(bit_stream_stress PRIVATE ${nalchi_compile_options})
44
target_link_options(bit_stream_stress PRIVATE ${nalchi_link_options})
5+
nalchi_copy_runtime_dependencies(bit_stream_stress)
56

67
add_test(test_bit_stream_stress bit_stream_stress)
78
set_tests_properties(test_bit_stream_stress PROPERTIES TIMEOUT 0)

tests/init_and_kill.hpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
3+
#include <steam/steamnetworkingtypes.h>
4+
5+
#ifdef STEAMNETWORKINGSOCKETS_OPENSOURCE
6+
#include <steam/steamnetworkingsockets.h>
7+
#else // Steamworks SDK
8+
#include <steam/steam_api.h>
9+
#endif
10+
11+
#include <stdlib.h>
12+
#if defined(_WIN32)
13+
#define NALCHI_TESTS_PUTENV _putenv
14+
#else
15+
#define NALCHI_TESTS_PUTENV putenv
16+
#endif
17+
18+
namespace nalchi::tests
19+
{
20+
21+
inline bool gns_init()
22+
{
23+
#ifdef STEAMNETWORKINGSOCKETS_OPENSOURCE
24+
SteamNetworkingErrMsg err_msg;
25+
return GameNetworkingSockets_Init(nullptr, err_msg);
26+
#else // Steamworks SDK
27+
char app_id[] = "SteamAppId=480";
28+
char game_id[] = "SteamGameId=480";
29+
NALCHI_TESTS_PUTENV(app_id);
30+
NALCHI_TESTS_PUTENV(game_id);
31+
32+
return SteamAPI_Init();
33+
#endif
34+
}
35+
36+
inline void gns_kill()
37+
{
38+
#ifdef STEAMNETWORKINGSOCKETS_OPENSOURCE
39+
GameNetworkingSockets_Kill();
40+
#else // Steamworks SDK
41+
SteamAPI_Shutdown();
42+
#endif
43+
}
44+
45+
} // namespace nalchi::tests
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
add_executable(multicast_stress multicast_stress.cpp)
2+
target_link_libraries(multicast_stress PRIVATE nalchi)
3+
target_compile_options(multicast_stress PRIVATE ${nalchi_compile_options})
4+
target_link_options(multicast_stress PRIVATE ${nalchi_link_options})
5+
nalchi_copy_runtime_dependencies(multicast_stress)
6+
7+
add_test(test_multicast_stress multicast_stress)
8+
set_tests_properties(test_multicast_stress PROPERTIES TIMEOUT 0)
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
#include "../assert.hpp"
2+
#include "../init_and_kill.hpp"
3+
4+
#include <nalchi/socket_extensions.hpp>
5+
6+
#include <steam/isteamnetworkingsockets.h>
7+
#include <steam/isteamnetworkingutils.h>
8+
9+
#include <algorithm>
10+
#include <array>
11+
#include <atomic>
12+
#include <chrono>
13+
#include <cstddef>
14+
#include <cstdlib>
15+
#include <iostream>
16+
#include <thread>
17+
#include <vector>
18+
19+
#ifndef MC_MESSAGES
20+
#define MC_MESSAGES 5000
21+
#endif
22+
23+
namespace nalchi::tests
24+
{
25+
26+
using msg_t = int;
27+
28+
constexpr auto sleep_duration = std::chrono::milliseconds(16);
29+
30+
int g_clients;
31+
int g_messages;
32+
33+
void do_server(const std::vector<HSteamNetConnection>& conns)
34+
{
35+
std::vector<std::int64_t> out_message_number_or_result(conns.size());
36+
37+
for (int i = 0; i < g_messages; ++i)
38+
{
39+
// Prepare a payload to send
40+
auto payload = shared_payload::allocate(sizeof(msg_t));
41+
NALCHI_TESTS_ASSERT(payload.ptr, "Payload allocation failed");
42+
43+
int& data = *static_cast<int*>(payload.ptr);
44+
data = i;
45+
46+
// Multicast the payload
47+
socket_extensions::multicast(SteamNetworkingSockets(), conns, payload, sizeof(msg_t),
48+
k_nSteamNetworkingSend_Reliable, out_message_number_or_result);
49+
50+
// Check the out message numbers
51+
for (const auto number : out_message_number_or_result)
52+
NALCHI_TESTS_ASSERT(number >= 0, "Multicast failed with ", -number);
53+
}
54+
}
55+
56+
void do_client(const HSteamNetConnection conn)
57+
{
58+
static constexpr std::size_t BATCH_MSGS = 256;
59+
std::array<SteamNetworkingMessage_t*, BATCH_MSGS> msgs;
60+
61+
for (int msg_idx = 0; msg_idx < g_messages;)
62+
{
63+
const int recv_cnt = SteamNetworkingSockets()->ReceiveMessagesOnConnection(conn, msgs.data(), BATCH_MSGS);
64+
NALCHI_TESTS_ASSERT(recv_cnt != -1);
65+
66+
for (int i = 0; i < recv_cnt; ++i)
67+
{
68+
NALCHI_TESTS_ASSERT(msgs[i]->m_cbSize == sizeof(msg_t));
69+
const int data = *((int*)msgs[i]->m_pData);
70+
NALCHI_TESTS_ASSERT(data == msg_idx + i, "Received ", data, ", expected ", msg_idx + i);
71+
72+
msgs[i]->Release();
73+
}
74+
75+
msg_idx += recv_cnt;
76+
}
77+
}
78+
79+
void configure_buffer_sizes()
80+
{
81+
auto& net_utils = *SteamNetworkingUtils();
82+
std::size_t config_value_size = sizeof(int);
83+
84+
// Configure send buffer size
85+
int send_buffer_size;
86+
auto get_config_result =
87+
net_utils.GetConfigValue(k_ESteamNetworkingConfig_SendBufferSize, k_ESteamNetworkingConfig_Global, 0, nullptr,
88+
&send_buffer_size, &config_value_size);
89+
NALCHI_TESTS_ASSERT(get_config_result == k_ESteamNetworkingGetConfigValue_OK ||
90+
get_config_result == k_ESteamNetworkingGetConfigValue_OKInherited,
91+
"Get send buffer size failed: ", (int)get_config_result);
92+
NALCHI_TESTS_ASSERT(config_value_size == sizeof(int));
93+
94+
if (static_cast<std::size_t>(send_buffer_size) < sizeof(msg_t) * g_messages)
95+
{
96+
const bool set_config_result =
97+
net_utils.SetGlobalConfigValueInt32(k_ESteamNetworkingConfig_SendBufferSize, sizeof(msg_t) * g_messages);
98+
NALCHI_TESTS_ASSERT(set_config_result, "Set send buffer size failed");
99+
std::cout << "Send buffer size changed: " << sizeof(msg_t) * g_messages << " (was " << send_buffer_size
100+
<< ")\n";
101+
}
102+
else
103+
std::cout << "Send buffer size: " << send_buffer_size << '\n';
104+
105+
// Configure recv buffer size
106+
int recv_buffer_size;
107+
get_config_result =
108+
net_utils.GetConfigValue(k_ESteamNetworkingConfig_RecvBufferSize, k_ESteamNetworkingConfig_Global, 0, nullptr,
109+
&recv_buffer_size, &config_value_size);
110+
NALCHI_TESTS_ASSERT(get_config_result == k_ESteamNetworkingGetConfigValue_OK ||
111+
get_config_result == k_ESteamNetworkingGetConfigValue_OKInherited,
112+
"Get recv buffer size failed: ", (int)get_config_result);
113+
NALCHI_TESTS_ASSERT(config_value_size == sizeof(int));
114+
115+
if (static_cast<std::size_t>(recv_buffer_size) < sizeof(msg_t) * g_messages)
116+
{
117+
const bool set_config_result =
118+
net_utils.SetGlobalConfigValueInt32(k_ESteamNetworkingConfig_RecvBufferSize, sizeof(msg_t) * g_messages);
119+
NALCHI_TESTS_ASSERT(set_config_result, "Set recv buffer size failed");
120+
std::cout << "Recv buffer size changed: " << sizeof(msg_t) * g_messages << " (was " << recv_buffer_size
121+
<< ")\n";
122+
}
123+
else
124+
std::cout << "Recv buffer size: " << recv_buffer_size << '\n';
125+
126+
// Configure recv messages count
127+
int recv_msg_count;
128+
get_config_result =
129+
net_utils.GetConfigValue(k_ESteamNetworkingConfig_RecvBufferMessages, k_ESteamNetworkingConfig_Global, 0,
130+
nullptr, &recv_msg_count, &config_value_size);
131+
NALCHI_TESTS_ASSERT(get_config_result == k_ESteamNetworkingGetConfigValue_OK ||
132+
get_config_result == k_ESteamNetworkingGetConfigValue_OKInherited,
133+
"Get recv messages count failed: ", (int)get_config_result);
134+
NALCHI_TESTS_ASSERT(config_value_size == sizeof(int));
135+
136+
if (recv_msg_count < g_messages)
137+
{
138+
const bool set_config_result =
139+
net_utils.SetGlobalConfigValueInt32(k_ESteamNetworkingConfig_RecvBufferMessages, g_messages);
140+
NALCHI_TESTS_ASSERT(set_config_result, "Set recv msg count failed");
141+
std::cout << "Max recv msg count changed: " << g_messages << " (was " << recv_msg_count << ")\n";
142+
}
143+
else
144+
std::cout << "Max recv msg count: " << recv_msg_count << '\n';
145+
}
146+
147+
} // namespace nalchi::tests
148+
149+
int main(int argc, char** argv)
150+
{
151+
if (argc > 2)
152+
{
153+
std::cout << "=== Usage ===\n";
154+
std::cout << "`./test_multicast_stress`\n";
155+
std::cout << '\t' << "Test multicasting " << MC_MESSAGES << " messages.\n";
156+
std::cout << "`./test_multicast_stress` <count>\n";
157+
std::cout << '\t' << "Test multicasting <count> messages.\n";
158+
return 2;
159+
}
160+
161+
using namespace nalchi;
162+
using namespace nalchi::tests;
163+
164+
std::cout << "=== multicast stress test ===\n";
165+
166+
g_messages = (argc == 2) ? std::atoi(argv[1]) : MC_MESSAGES;
167+
g_clients = std::max(2, static_cast<int>(std::thread::hardware_concurrency()) - 2);
168+
169+
std::cout << "Prepare to send " << g_messages << " messages to " << g_clients << " client threads...\n";
170+
171+
NALCHI_TESTS_ASSERT(gns_init(), "GNS init failed");
172+
173+
std::atomic<bool> stop_callback;
174+
175+
std::thread callback_worker([&stop_callback]() {
176+
while (!stop_callback.load(std::memory_order_acquire))
177+
{
178+
#ifdef STEAMNETWORKINGSOCKETS_OPENSOURCE
179+
SteamNetworkingSockets()->RunCallbacks();
180+
#else // Steamworks SDK
181+
SteamAPI_RunCallbacks();
182+
#endif
183+
std::this_thread::sleep_for(sleep_duration);
184+
}
185+
});
186+
187+
configure_buffer_sizes();
188+
189+
// Create connections
190+
std::vector<HSteamNetConnection> server_side_connections(g_clients);
191+
std::vector<HSteamNetConnection> client_side_connections(g_clients);
192+
193+
for (int i = 0; i < g_clients; ++i)
194+
{
195+
auto& server = server_side_connections[i];
196+
auto& client = client_side_connections[i];
197+
198+
const bool created = SteamNetworkingSockets()->CreateSocketPair(&server, &client, false, nullptr, nullptr);
199+
NALCHI_TESTS_ASSERT(created, "Connection creation failed");
200+
}
201+
202+
// Create worker threads
203+
std::vector<std::thread> workers;
204+
workers.reserve(1 + g_clients);
205+
206+
workers.emplace_back(do_server, server_side_connections);
207+
for (int i = 0; i < g_clients; ++i)
208+
workers.emplace_back(do_client, client_side_connections[i]);
209+
210+
// Wait for the jobs to complete
211+
for (auto& worker : workers)
212+
worker.join();
213+
214+
// Clean up
215+
for (int i = 0; i < g_clients; ++i)
216+
{
217+
SteamNetworkingSockets()->CloseConnection(client_side_connections[i], 0, nullptr, false);
218+
SteamNetworkingSockets()->CloseConnection(server_side_connections[i], 0, nullptr, false);
219+
}
220+
221+
stop_callback.store(true, std::memory_order_release);
222+
callback_worker.join();
223+
224+
gns_kill();
225+
226+
std::cout << "multicast stress test succeeded" << std::endl;
227+
}

0 commit comments

Comments
 (0)