82
82
#include "rtpengine.h"
83
83
#include "rtpengine_funcs.h"
84
84
#include "bencode.h"
85
+ #include "../../reactor_defs.h"
86
+ #include "../../reactor_proc.h"
87
+ #include "../../io_wait.h"
85
88
86
89
#if !defined(AF_LOCAL )
87
90
#define AF_LOCAL AF_UNIX
@@ -376,6 +379,7 @@ struct rtpe_set_head **rtpe_set_list =0;
376
379
struct rtpe_set * * default_rtpe_set = 0 ;
377
380
378
381
static str rtpengine_notify_sock ;
382
+ static short rtpengine_notify_port ;
379
383
static str rtpengine_notify_event_name = str_init ("E_RTPENGINE_NOTIFICATION" );
380
384
static event_id_t rtpengine_notify_event = EVI_ERROR ;
381
385
@@ -738,7 +742,7 @@ static const dep_export_t deps = {
738
742
739
743
static const proc_export_t procs [] = {
740
744
{"RTPEngine notification receiver" , 0 , 0 , rtpengine_notify_process , 1 ,
741
- PROC_FLAG_INITCHILD },
745
+ PROC_FLAG_INITCHILD | PROC_FLAG_HAS_IPC | PROC_FLAG_NEEDS_SCRIPT },
742
746
{0 ,0 ,0 ,0 ,0 ,0 }
743
747
};
744
748
@@ -4118,6 +4122,43 @@ static void rtpengine_raise_event(int sender, void *p)
4118
4122
4119
4123
#define RTPENGINE_DGRAM_BUF 35536
4120
4124
4125
+ static int rtpengine_io_callback (int fd , void * fs , int was_timeout )
4126
+ {
4127
+ int ret ;
4128
+ char * p ;
4129
+ char buffer [RTPENGINE_DGRAM_BUF ];
4130
+
4131
+ do
4132
+ ret = read (fd , buffer , RTPENGINE_DGRAM_BUF );
4133
+ while (ret == -1 && errno == EINTR );
4134
+ if (ret < 0 ) {
4135
+ LM_ERR ("problem reading on socket %s:%u (%s:%d)\n" ,
4136
+ rtpengine_notify_sock .s , rtpengine_notify_port , strerror (errno ), errno );
4137
+ return -1 ;
4138
+ }
4139
+
4140
+ if (!evi_probe_event (rtpengine_notify_event )) {
4141
+ LM_DBG ("nothing to do - nobody is listening!\n" );
4142
+ return 0 ;
4143
+ }
4144
+
4145
+ p = shm_malloc (ret + 1 );
4146
+ if (!p ) {
4147
+ /* coverity[string_null] - false positive CID #211356 */
4148
+ LM_ERR ("could not allocate %d for buffer %.*s\n" , ret , ret , buffer );
4149
+ return -1 ;
4150
+ }
4151
+ memcpy (p , buffer , ret );
4152
+ p [ret ] = '\0' ;
4153
+
4154
+ LM_INFO ("dispatching buffer: %s\n" , p );
4155
+ if (ipc_dispatch_rpc (rtpengine_raise_event , p ) < 0 ) {
4156
+ LM_ERR ("could not dispatch notification job!\n" );
4157
+ shm_free (p );
4158
+ }
4159
+ return 0 ;
4160
+ }
4161
+
4121
4162
static void rtpengine_notify_process (int rank )
4122
4163
{
4123
4164
int ret ;
@@ -4126,7 +4167,6 @@ static void rtpengine_notify_process(int rank)
4126
4167
unsigned int port ;
4127
4168
static int rtpengine_notify_fd ;
4128
4169
union sockaddr_union ss ;
4129
- char buffer [RTPENGINE_DGRAM_BUF ];
4130
4170
4131
4171
p = q_memchr (rtpengine_notify_sock .s , ':' , rtpengine_notify_sock .len );
4132
4172
if (!p ) {
@@ -4173,37 +4213,17 @@ static void rtpengine_notify_process(int rank)
4173
4213
goto end ;
4174
4214
}
4175
4215
4176
- for (;;) {
4177
- do
4178
- ret = read (rtpengine_notify_fd , buffer , RTPENGINE_DGRAM_BUF );
4179
- while (ret == -1 && errno == EINTR );
4180
- if (ret < 0 ) {
4181
- LM_ERR ("problem reading on socket %s:%u (%s:%d)\n" ,
4182
- rtpengine_notify_sock .s , port , strerror (errno ), errno );
4183
- goto end ;
4184
- }
4185
-
4186
- if (!evi_probe_event (rtpengine_notify_event )) {
4187
- LM_DBG ("nothing to do - nobody is listening!\n" );
4188
- continue ;
4189
- }
4190
-
4191
- p = shm_malloc (ret + 1 );
4192
- if (!p ) {
4193
- /* coverity[string_null] - false positive CID #211356 */
4194
- LM_ERR ("could not allocate %d for buffer %.*s\n" , ret , ret , buffer );
4195
- continue ;
4196
- }
4197
- memcpy (p , buffer , ret );
4198
- p [ret ] = '\0' ;
4216
+ if (reactor_proc_init ("RTPengine events" ) < 0 ) {
4217
+ LM_ERR ("failed to init the RTPengine events\n" );
4218
+ goto end ;
4219
+ }
4199
4220
4200
- LM_INFO ("dispatching buffer: %s\n" , p );
4201
- if (ipc_dispatch_rpc (rtpengine_raise_event , p ) < 0 ) {
4202
- LM_ERR ("could not dispatch notification job!\n" );
4203
- shm_free (p );
4204
- }
4221
+ if (reactor_proc_add_fd (rtpengine_notify_fd , rtpengine_io_callback , NULL ) < 0 ) {
4222
+ LM_CRIT ("failed to add RTPengine listen socket to reactor\n" );
4223
+ goto end ;
4205
4224
}
4206
4225
4226
+ reactor_proc_loop ();
4207
4227
end :
4208
4228
close (rtpengine_notify_fd );
4209
4229
}
0 commit comments