Skip to content

Commit 5138f37

Browse files
cl-mentClément GérouvilleethourisMikolaj Malecki
authored
[core] Cleanup SRT state after a fork() (issue #3177) (#3179)
* [core] Cleanup SRT state after a fork() (issue #3177) * Free socket memory without calling the destructor. * Remove srt_cleanupAtFork() from the API. * Make it compile on systems that don't support pthread_atfork() * Remove a typo * Avoid to send shutdown packet when cleaning up after a fork. * Close the dangling UDP sockets, Free memory. * Add TODO for freeing the Send Queue after refacttoring it. * Ensure that CThread is joinable before join() * Try fix iOS-cxxsyncOFF * Replace the mutex pointer by a mutex reference. * Refactor the Multiplexer initialization. * Fix SIGSEGV * Fix Compilation error on a Debug Log * Rollback to cleaner code for the multiplexer initialization. * Add compatibility with C++11 Sync. * Apply code review changes * Replace the resetThread() macro by an inline function. Co-authored-by: Sektor van Skijlen <ethouris@gmail.com> * Reset m_CGStopCond in CUDTUnited * Rework of resetThread() * Ensure Garbage collector is in the right state after fork. * Protect ~CMultiplexer() against NULL pointers. * Protect resetAtFork() and stop() against NULL pointers. * Added fork example to the repository * Remove french comments. --------- Co-authored-by: Clément Gérouville <cgerouville@haivision.com> Co-authored-by: Sektor van Skijlen <ethouris@gmail.com> Co-authored-by: Mikolaj Malecki <mmalecki@haivision.com>
1 parent 265d0e4 commit 5138f37

File tree

14 files changed

+426
-36
lines changed

14 files changed

+426
-36
lines changed

CMakeLists.txt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -966,10 +966,16 @@ else ()
966966
message(STATUS "Pthread library: ${PTHREAD_LIBRARY}")
967967
endif()
968968

969+
list(PREPEND CMAKE_REQUIRED_LIBRARIES "${PTHREAD_LIBRARY}")
970+
unset(CMAKE_REQUIRED_QUIET)
971+
972+
check_symbol_exists(pthread_atfork "pthread.h" HAVE_PTHREAD_ATFORK)
973+
if ("${HAVE_PTHREAD_ATFORK}" STREQUAL "1")
974+
add_definitions(-DHAVE_PTHREAD_ATFORK=1)
975+
endif ()
976+
969977
# To avoid the need for other judgments when ENABLE_STDCXX_SYNC is OFF in the future, this is a separate conditional statement.
970978
if (NOT ENABLE_STDCXX_SYNC AND ENABLE_MONOTONIC_CLOCK)
971-
list(PREPEND CMAKE_REQUIRED_LIBRARIES "${PTHREAD_LIBRARY}")
972-
unset(CMAKE_REQUIRED_QUIET)
973979
check_symbol_exists(pthread_condattr_setclock "pthread.h" HAVE_PTHREAD_CONDATTR_SETCLOCK)
974980
message(STATUS "Checking pthread_condattr_setclock: '${HAVE_PTHREAD_CONDATTR_SETCLOCK}'")
975981
if ("${HAVE_PTHREAD_CONDATTR_SETCLOCK}" STREQUAL "1")

examples/fork-test/Makefile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
TARGETS=srt_server srt_client
2+
3+
all: $(TARGETS)
4+
5+
%: %.c
6+
$(CC) $< `pkg-config --cflags --libs srt` -o `basename $< .c`
7+
8+
clean:
9+
rm -f $(TARGETS)

examples/fork-test/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
The `srt_server` and `srt_client` apps should be compiled using
2+
external installation of SRT (e.g. in the local directory), each
3+
one as a single program. This is not compiled as a part of SRT.
4+
5+
If you want to use a local installation, simply set `PKG_CONFIG_PATH`
6+
environment variable to point to the local installation directory with
7+
"lib/pkgconfig" or "lib64/pkgconfig" suffix.

examples/fork-test/srt_client.c

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#include <srt/srt.h>
2+
#include <stdio.h>
3+
#include <stdlib.h>
4+
#include <string.h>
5+
#include <arpa/inet.h>
6+
#include <unistd.h>
7+
#include <sys/types.h>
8+
#include <sys/wait.h>
9+
#define SERVER_IP "127.0.0.1"
10+
#define SERVER_PORT 9000
11+
int main() {
12+
if (srt_startup() != 0) {
13+
fprintf(stderr, "Error initializing SRT.\n");
14+
return 1;
15+
}
16+
SRTSOCKET client_sock = srt_create_socket();
17+
if (client_sock == SRT_INVALID_SOCK) {
18+
fprintf(stderr, "Error creating a socket: %s\n", srt_getlasterror_str());
19+
return 1;
20+
}
21+
struct sockaddr_in sa;
22+
memset(&sa, 0, sizeof sa);
23+
sa.sin_family = AF_INET;
24+
sa.sin_port = htons(SERVER_PORT);
25+
inet_pton(AF_INET, SERVER_IP, &sa.sin_addr);
26+
if (srt_connect(client_sock, (struct sockaddr*)&sa, sizeof(sa)) == SRT_ERROR) {
27+
fprintf(stderr, "Error: srt_connect: %s\n", srt_getlasterror_str());
28+
return 1;
29+
}
30+
printf("Connected to SRT server %s:%d\n", SERVER_IP, SERVER_PORT);
31+
const char* message = "Hello from SRT client!";
32+
int bytes = srt_send(client_sock, message, strlen(message));
33+
if (bytes == SRT_ERROR) {
34+
fprintf(stderr, "Sending error: %s\n", srt_getlasterror_str());
35+
} else {
36+
printf("Message sent: %s\n", message);
37+
}
38+
39+
while (1)
40+
{
41+
char buffer[1500];
42+
int nb = srt_recv(client_sock, buffer, sizeof(buffer));
43+
if (nb <= 0)
44+
{
45+
printf("Closed from the server !\n");
46+
srt_close(client_sock);
47+
break;
48+
}
49+
buffer[nb] = 0;
50+
printf("Server has sent: %s\n", buffer);
51+
}
52+
srt_cleanup();
53+
return 0;
54+
}
55+

examples/fork-test/srt_server.c

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#include <srt/srt.h>
2+
#include <stdio.h>
3+
#include <stdlib.h>
4+
#include <string.h>
5+
#include <unistd.h>
6+
#include <sys/types.h>
7+
#include <sys/wait.h>
8+
#include <pthread.h>
9+
#define PORT 9000
10+
11+
int run(char *command) {
12+
pid_t pid = fork();
13+
if (pid < 0) {
14+
perror("fork (intermediate)");
15+
exit(EXIT_FAILURE);
16+
}
17+
if (pid > 0) {
18+
// Parent process
19+
printf("[GRANDPARENT %d] waiting for grand-child process pid=%d to finish...\n",
20+
getpid(), pid);
21+
waitpid(pid, NULL, 0); // Wait for intermediate child
22+
printf("[GRANDPARENT] returning\n");
23+
return 0;
24+
}
25+
// Intermediate process
26+
//srt_cleanup();
27+
if (setsid() < 0) {
28+
perror("setsid");
29+
exit(EXIT_FAILURE);
30+
}
31+
pid_t grandchild_pid = fork();
32+
if (grandchild_pid < 0) {
33+
perror("fork (grandchild)");
34+
exit(EXIT_FAILURE);
35+
}
36+
if (grandchild_pid > 0) {
37+
printf("[PARENT %d] waiting for 10s with child process pid=%d ...\n",
38+
getpid(), grandchild_pid);
39+
// Intermediate process exits immediately
40+
sleep(10);
41+
printf("[PARENT] exitting\n");
42+
exit(0);
43+
}
44+
// Grandchild process
45+
// Redirect stdin to /dev/null
46+
printf("[CHILD %d] Preparing descriptors...\n", getpid());
47+
int devnull = open("/dev/null", O_RDONLY);
48+
if (devnull >= 0) {
49+
dup2(devnull, STDIN_FILENO);
50+
close(devnull);
51+
} else {
52+
perror("open /dev/null");
53+
}
54+
// Redirect stdout to stderr
55+
dup2(STDERR_FILENO, STDOUT_FILENO);
56+
// Execute the command
57+
printf("[CHILD] Executing process '%s'...\n", command);
58+
execl("/bin/sh", "sh", "-c", command, (char *)NULL);
59+
// If execl fails
60+
perror("execl");
61+
exit(EXIT_FAILURE);
62+
}
63+
64+
int main() {
65+
if (srt_startup() != 0) {
66+
fprintf(stderr, "Error initializing SRT.\n");
67+
return 1;
68+
}
69+
70+
SRTSOCKET serv_sock = srt_create_socket();
71+
if (serv_sock == SRT_INVALID_SOCK) {
72+
fprintf(stderr, "Error creating SRT socket: %s\n", srt_getlasterror_str());
73+
return 1;
74+
}
75+
struct sockaddr_in sa;
76+
memset(&sa, 0, sizeof sa);
77+
sa.sin_family = AF_INET;
78+
sa.sin_port = htons(PORT);
79+
sa.sin_addr.s_addr = INADDR_ANY;
80+
if (srt_bind(serv_sock, (struct sockaddr*)&sa, sizeof sa) == SRT_ERROR) {
81+
fprintf(stderr, "Error: srt_bind: %s\n", srt_getlasterror_str());
82+
return 1;
83+
}
84+
if (srt_listen(serv_sock, 5) == SRT_ERROR) {
85+
fprintf(stderr, "Error: srt_listen: %s\n", srt_getlasterror_str());
86+
return 1;
87+
}
88+
printf("SRT server is listening on port %d...\n", PORT);
89+
struct sockaddr_in client_addr;
90+
int addr_len = sizeof(client_addr);
91+
SRTSOCKET client_sock = srt_accept(serv_sock, (struct sockaddr*)&client_addr, &addr_len);
92+
if (client_sock == SRT_INVALID_SOCK) {
93+
fprintf(stderr, "Error: srt_accept: %s\n", srt_getlasterror_str());
94+
return 1;
95+
}
96+
printf("Client connected via SRT !\n");
97+
char buffer[1500];
98+
int bytes = srt_recv(client_sock, buffer, sizeof(buffer));
99+
if (bytes > 0) {
100+
buffer[bytes] = '\0';
101+
printf("Message received: %s\n", buffer);
102+
const char resp [] = "We read you!";
103+
srt_send(client_sock, resp, (sizeof resp)-1);
104+
} else {
105+
printf("Error: reading from srt_recv: %s.\n", srt_getlasterror_str());
106+
}
107+
run("date > /tmp/res");
108+
printf("Server: sleep(1)\n");
109+
sleep(1);
110+
// Nettoyage
111+
printf("Server: closing SRT sockets\n");
112+
srt_close(client_sock);
113+
srt_close(serv_sock);
114+
printf("Server: cleanup\n");
115+
srt_cleanup();
116+
printf("Server: exit\n");
117+
return 0;
118+
}

srtcore/api.cpp

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ srt::CUDTSocket::~CUDTSocket()
9999
releaseMutex(m_ControlLock);
100100
}
101101

102+
void srt::CUDTSocket::resetAtFork()
103+
{
104+
m_UDT.resetAtFork();
105+
resetCond(m_AcceptCond);
106+
}
107+
102108
SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
103109
{
104110
// TTL in CRendezvousQueue::updateConnStatus() will set m_bConnecting to false.
@@ -275,6 +281,51 @@ void srt::CUDTUnited::stopGarbageCollector()
275281
}
276282
}
277283

284+
void srt::CUDTUnited::cleanupAllSockets()
285+
{
286+
for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i)
287+
{
288+
CUDTSocket* s = i->second;
289+
290+
#if ENABLE_BONDING
291+
if (s->m_GroupOf)
292+
{
293+
s->removeFromGroup(false);
294+
}
295+
#endif
296+
297+
// remove from listener's queue
298+
sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
299+
if (ls == m_Sockets.end())
300+
{
301+
ls = m_ClosedSockets.find(s->m_ListenSocket);
302+
}
303+
if (ls != m_ClosedSockets.end())
304+
{
305+
ls->second->m_QueuedSockets.erase(s->m_SocketID);
306+
}
307+
s->core().closeAtFork();
308+
s->resetAtFork();
309+
delete(s);
310+
}
311+
m_Sockets.clear();
312+
313+
#if ENABLE_BONDING
314+
for (groups_t::iterator j = m_Groups.begin(); j != m_Groups.end(); ++j)
315+
{
316+
delete j->second;
317+
}
318+
m_Groups.clear();
319+
#endif
320+
for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i)
321+
{
322+
CMultiplexer &multiplexer = i->second;
323+
multiplexer.resetAtFork();
324+
}
325+
m_mMultiplexer.clear();
326+
}
327+
328+
278329
void srt::CUDTUnited::closeAllSockets()
279330
{
280331
// remove all sockets and multiplexers
@@ -370,6 +421,18 @@ int srt::CUDTUnited::startup()
370421
return startGarbageCollector() ? 0 : -1;
371422
}
372423

424+
int srt::CUDTUnited::cleanupAtFork()
425+
{
426+
cleanupAllSockets();
427+
resetThread(&m_GCThread);
428+
resetCond(m_GCStopCond);
429+
m_GCStopLock.unlock();
430+
setupCond(m_GCStopCond, "GCStop");
431+
m_iInstanceCount=0;
432+
m_bGCStatus = false;
433+
return 0;
434+
}
435+
373436
int srt::CUDTUnited::cleanup()
374437
{
375438
// IMPORTANT!!!
@@ -3076,8 +3139,9 @@ bool srt::CUDTUnited::channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, cons
30763139

30773140
void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, const UDPSOCKET* udpsock /*[[nullable]]*/)
30783141
{
3142+
const int port = reqaddr.hport();
30793143
ExclusiveLock cg(m_GlobControlLock);
3080-
3144+
30813145
// If udpsock is provided, then this socket will be simply
30823146
// taken for binding as a good deal. It would be nice to make
30833147
// a sanity check to see if this UDP socket isn't already installed
@@ -3087,7 +3151,6 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons
30873151
{
30883152
// If not, we need to see if there exist already a multiplexer bound
30893153
// to the same endpoint.
3090-
const int port = reqaddr.hport();
30913154
const CSrtConfig& cfgSocket = s->core().m_config;
30923155

30933156
// This loop is going to check the attempted binding of
@@ -3305,6 +3368,8 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons
33053368
}
33063369
}
33073370

3371+
3372+
33083373
// a new multiplexer is needed
33093374
CMultiplexer m;
33103375
configureMuxer((m), s, reqaddr.family());
@@ -3360,7 +3425,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons
33603425
// Rewrite the port here, as it might be only known upon return
33613426
// from CChannel::open.
33623427
m.m_iPort = installMuxer((s), m);
3363-
m_mMultiplexer[m.m_iID] = m;
3428+
swap(m_mMultiplexer[m.m_iID],m);
33643429
}
33653430
catch (const CUDTException&)
33663431
{
@@ -3373,7 +3438,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons
33733438
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
33743439
}
33753440

3376-
HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << m.m_iPort);
3441+
HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << port);
33773442
}
33783443

33793444
// This function is going to find a multiplexer for the port contained
@@ -3492,6 +3557,14 @@ void* srt::CUDTUnited::garbageCollect(void* p)
34923557

34933558
int srt::CUDT::startup()
34943559
{
3560+
#if HAVE_PTHREAD_ATFORK
3561+
static bool registered = false;
3562+
if (!registered)
3563+
{
3564+
pthread_atfork(NULL, NULL, (void (*)()) srt::CUDT::cleanupAtFork);
3565+
registered = true;
3566+
}
3567+
#endif
34953568
return uglobal().startup();
34963569
}
34973570

@@ -3500,6 +3573,15 @@ int srt::CUDT::cleanup()
35003573
return uglobal().cleanup();
35013574
}
35023575

3576+
int srt::CUDT::cleanupAtFork()
3577+
{
3578+
CUDTUnited &context = uglobal();
3579+
context.cleanupAtFork();
3580+
new (&context) CUDTUnited();
3581+
3582+
return context.startup();
3583+
}
3584+
35033585
SRTSOCKET srt::CUDT::socket()
35043586
{
35053587
try

srtcore/api.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ class CUDTSocket
120120
}
121121

122122
~CUDTSocket();
123+
void resetAtFork();
123124

124125
void construct();
125126

@@ -263,6 +264,7 @@ class CUDTUnited
263264
/// release the UDT library.
264265
/// @return 0 if success, otherwise -1 is returned.
265266
int cleanup();
267+
int cleanupAtFork();
266268

267269
/// Create a new UDT socket.
268270
/// @param [out] pps Variable (optional) to which the new socket will be written, if succeeded
@@ -464,6 +466,7 @@ class CUDTUnited
464466
bool acquireSocket(CUDTSocket* s);
465467
bool startGarbageCollector();
466468
void stopGarbageCollector();
469+
void cleanupAllSockets();
467470
void closeAllSockets();
468471

469472
public:

0 commit comments

Comments
 (0)