2424 * function should be used instead of join_mcast_group() directly in all
2525 * stream-related code to ensure proper timeout handling.
2626 */
27- int stream_join_mcast_group (stream_context_t * ctx ) {
27+ void stream_join_mcast_group (stream_context_t * ctx ) {
28+ if (ctx -> mcast_sock >= 0 )
29+ return ;
2830 int sock = join_mcast_group (ctx -> service );
29- if (sock > 0 ) {
31+ if (sock >= 0 ) {
3032 /* Register socket with epoll immediately after creation */
3133 struct epoll_event ev ;
3234 ev .events = EPOLLIN ; /* Level-triggered mode for read events */
@@ -44,8 +46,9 @@ int stream_join_mcast_group(stream_context_t *ctx) {
4446 int64_t now = get_time_ms ();
4547 ctx -> last_mcast_data_time = now ;
4648 ctx -> last_mcast_rejoin_time = now ;
49+
50+ ctx -> mcast_sock = sock ;
4751 }
48- return sock ;
4952}
5053
5154/*
@@ -78,7 +81,7 @@ int stream_handle_fd_event(stream_context_t *ctx, int fd, uint32_t events,
7881 socklen_t slen = sizeof (peer_addr );
7982
8083 /* Process FCC socket events */
81- if (ctx -> fcc .fcc_sock > 0 && fd == ctx -> fcc .fcc_sock ) {
84+ if (ctx -> fcc .fcc_sock >= 0 && fd == ctx -> fcc .fcc_sock ) {
8285 /* Allocate a fresh buffer from pool for this receive operation */
8386 buffer_ref_t * recv_buf = buffer_pool_alloc ();
8487 if (!recv_buf ) {
@@ -143,7 +146,7 @@ int stream_handle_fd_event(stream_context_t *ctx, int fd, uint32_t events,
143146 }
144147
145148 /* Process multicast socket events */
146- if (ctx -> mcast_sock > 0 && fd == ctx -> mcast_sock ) {
149+ if (ctx -> mcast_sock >= 0 && fd == ctx -> mcast_sock ) {
147150 /* Allocate a fresh buffer from pool for this receive operation */
148151 buffer_ref_t * recv_buf = buffer_pool_alloc ();
149152 if (!recv_buf ) {
@@ -198,7 +201,7 @@ int stream_handle_fd_event(stream_context_t *ctx, int fd, uint32_t events,
198201 }
199202
200203 /* Process RTSP socket events */
201- if (ctx -> rtsp .socket > 0 && fd == ctx -> rtsp .socket ) {
204+ if (ctx -> rtsp .socket >= 0 && fd == ctx -> rtsp .socket ) {
202205 /* Handle RTSP socket events (handshake and RTP data in PLAYING state) */
203206 int result = rtsp_handle_socket_event (& ctx -> rtsp , events );
204207 if (result < 0 ) {
@@ -218,7 +221,7 @@ int stream_handle_fd_event(stream_context_t *ctx, int fd, uint32_t events,
218221 }
219222
220223 /* Process RTSP RTP socket events (UDP mode) */
221- if (ctx -> rtsp .rtp_socket > 0 && fd == ctx -> rtsp .rtp_socket ) {
224+ if (ctx -> rtsp .rtp_socket >= 0 && fd == ctx -> rtsp .rtp_socket ) {
222225 int result = rtsp_handle_udp_rtp_data (& ctx -> rtsp , ctx -> conn );
223226 if (result < 0 ) {
224227 return -1 ; /* Error */
@@ -230,7 +233,7 @@ int stream_handle_fd_event(stream_context_t *ctx, int fd, uint32_t events,
230233 }
231234
232235 /* Handle UDP RTCP socket (for future RTCP processing) */
233- if (ctx -> rtsp .rtcp_socket > 0 && fd == ctx -> rtsp .rtcp_socket ) {
236+ if (ctx -> rtsp .rtcp_socket >= 0 && fd == ctx -> rtsp .rtcp_socket ) {
234237 /* RTCP data processing could be added here in the future */
235238 /* For now, just consume the data to prevent buffer overflow */
236239 uint8_t rtcp_buffer [RTCP_BUFFER_SIZE ];
@@ -252,6 +255,7 @@ int stream_context_init_for_worker(stream_context_t *ctx, connection_t *conn,
252255 ctx -> service = service ;
253256 ctx -> epoll_fd = epoll_fd ;
254257 ctx -> status_index = status_index ;
258+ ctx -> mcast_sock = -1 ;
255259 fcc_session_init (& ctx -> fcc );
256260 ctx -> fcc .status_index = status_index ;
257261 rtsp_session_init (& ctx -> rtsp );
@@ -319,7 +323,7 @@ int stream_context_init_for_worker(stream_context_t *ctx, connection_t *conn,
319323 /* Direct multicast join */
320324 /* Note: Both /rtp/ and /udp/ endpoints now use unified packet detection */
321325 /* Packets are automatically detected as RTP or raw UDP at receive time */
322- ctx -> mcast_sock = stream_join_mcast_group (ctx );
326+ stream_join_mcast_group (ctx );
323327 fcc_session_set_state (& ctx -> fcc , FCC_STATE_MCAST_ACTIVE ,
324328 "Direct multicast" );
325329 }
@@ -332,7 +336,7 @@ int stream_tick(stream_context_t *ctx, int64_t now) {
332336 return 0 ;
333337
334338 /* Periodic multicast rejoin (if enabled) */
335- if (config .mcast_rejoin_interval > 0 && ctx -> mcast_sock > 0 ) {
339+ if (config .mcast_rejoin_interval > 0 && ctx -> mcast_sock >= 0 ) {
336340 int64_t elapsed_ms = now - ctx -> last_mcast_rejoin_time ;
337341 if (elapsed_ms >= config .mcast_rejoin_interval * 1000 ) {
338342 logger (LOG_DEBUG , "Multicast: Periodic rejoin (interval: %d seconds)" ,
@@ -350,7 +354,7 @@ int stream_tick(stream_context_t *ctx, int64_t now) {
350354 }
351355
352356 /* Check for multicast stream timeout */
353- if (ctx -> mcast_sock > 0 ) {
357+ if (ctx -> mcast_sock >= 0 ) {
354358 int64_t elapsed_ms = now - ctx -> last_mcast_data_time ;
355359 if (elapsed_ms >= MCAST_TIMEOUT_SEC * 1000 ) {
356360 logger (LOG_ERROR ,
@@ -361,7 +365,7 @@ int stream_tick(stream_context_t *ctx, int64_t now) {
361365 }
362366
363367 /* Check for FCC timeouts */
364- if (ctx -> fcc .fcc_sock > 0 ) {
368+ if (ctx -> fcc .fcc_sock >= 0 ) {
365369 int64_t elapsed_ms = now - ctx -> last_fcc_data_time ;
366370 int timeout_ms = 0 ;
367371
@@ -383,7 +387,7 @@ int stream_tick(stream_context_t *ctx, int64_t now) {
383387 fcc_session_set_state (& ctx -> fcc , FCC_STATE_MCAST_ACTIVE ,
384388 "First unicast packet timeout" );
385389 }
386- ctx -> mcast_sock = stream_join_mcast_group (ctx );
390+ stream_join_mcast_group (ctx );
387391 }
388392 } else if (ctx -> fcc .state == FCC_STATE_UNICAST_ACTIVE ||
389393 ctx -> fcc .state == FCC_STATE_MCAST_REQUESTED ) {
@@ -397,9 +401,7 @@ int stream_tick(stream_context_t *ctx, int64_t now) {
397401 FCC_TIMEOUT_UNICAST_SEC );
398402 fcc_session_set_state (& ctx -> fcc , FCC_STATE_MCAST_ACTIVE ,
399403 "Unicast interrupted" );
400- if (!ctx -> mcast_sock ) {
401- ctx -> mcast_sock = stream_join_mcast_group (ctx );
402- }
404+ stream_join_mcast_group (ctx );
403405 }
404406
405407 /* Check if we've been waiting too long for sync notification */
@@ -487,9 +489,9 @@ int stream_context_cleanup(stream_context_t *ctx) {
487489 int rtsp_async = rtsp_session_cleanup (& ctx -> rtsp );
488490
489491 /* Close multicast socket if active (always safe to cleanup immediately) */
490- if (ctx -> mcast_sock ) {
492+ if (ctx -> mcast_sock >= 0 ) {
491493 worker_cleanup_socket_from_epoll (ctx -> epoll_fd , ctx -> mcast_sock );
492- ctx -> mcast_sock = 0 ;
494+ ctx -> mcast_sock = -1 ;
493495 logger (LOG_DEBUG , "Multicast socket closed" );
494496 }
495497
0 commit comments