Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions io_wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto]
void* data,
int prio,
unsigned int timeout,
int flags)
int flags,
int exclusive)
{

/* helper macros */
Expand Down Expand Up @@ -506,16 +507,9 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto]
ep_event.events|=EPOLLOUT;
if (!already) {
again1:
#if 0
/* This is currently broken, because when using EPOLLEXCLUSIVE, the OS will
* send sequential events to the same process - thus our pseudo-dispatcher
* will no longer work, since events on a pipe will be queued by a single
* process. - razvanc
*/
#if (defined __OS_linux) && (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 24)
if (e->flags & IO_WATCH_READ)
#ifdef EPOLLEXCLUSIVE
if (e->flags & IO_WATCH_READ && exclusive == 1)
ep_event.events|=EPOLLEXCLUSIVE;
#endif
#endif
n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
if (n==-1){
Expand Down
7 changes: 7 additions & 0 deletions io_wait_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,14 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
unsigned int curr_time;

again:
#ifdef EPOLLEXCLUSIVE
/* When using EPOLLEXCLUSIVE we don't want a single wakeup to handle multiple fds at once
as it could introduce latency in handling requests.
Limit each wakeup to handling events from a single fd */
ret=n=epoll_wait(h->epfd, h->ep_array, 1, t*1000);
#else
ret=n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000);
#endif
if (n==-1){
if (errno == EINTR) {
goto again; /* signal, ignore it */
Expand Down
37 changes: 20 additions & 17 deletions ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,29 @@ void ipc_handle_job(int fd)
ipc_job job;
int n;

/* read one IPC job from the pipe; even if the read is blocking,
* we are here triggered from the reactor, on a READ event, so
* we shouldn;t ever block */
n = read(fd, &job, sizeof(job) );
if (n==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
//Process all jobs until handle is drained
while (1) {
/* read one IPC job from the pipe; even if the read is blocking,
* we are here triggered from the reactor, on a READ event, so
* we shouldn;t ever block */
n = read(fd, &job, sizeof(job) );
if (n==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
}
}

LM_DBG("received job type %d[%s] from process %d\n",
job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);
LM_DBG("received job type %d[%s] from process %d\n",
job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);

/* custom handling for RPC type */
if (job.handler_type==ipc_rpc_type) {
((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
} else {
/* generic registered type */
ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
/* custom handling for RPC type */
if (job.handler_type==ipc_rpc_type) {
((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
} else {
/* generic registered type */
ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
}
}

return;
Expand Down
34 changes: 26 additions & 8 deletions net/net_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)

switch(fm->type){
case F_UDP_READ:
n = protos[((struct socket_info*)fm->data)->proto].net.
read( fm->data /*si*/, &read);
do {
n = protos[((struct socket_info*)fm->data)->proto].net.
read( fm->data /*si*/, &read);
//Continue reading packets until we get an error
} while (n == 0);
break;
case F_TIMER_JOB:
handle_timer_job();
Expand Down Expand Up @@ -327,11 +330,15 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
post_run_handle_script_reload();

pt_become_idle();

if (n == 1) {
n = 0;
}
return n;
}


int udp_proc_reactor_init( struct socket_info *si )
int udp_proc_reactor_init( struct socket_info *si, int si_rank )
{

/* create the reactor for UDP proc */
Expand Down Expand Up @@ -359,9 +366,18 @@ int udp_proc_reactor_init( struct socket_info *si )
}

/* init: start watching the SIP UDP fd */
if (reactor_add_reader( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) {
LM_CRIT("failed to add UDP listen socket to reactor\n");
goto error;
//First child per socket becomes 'Master', will wake on every event
if (si_rank == 0) {
if (reactor_add_reader( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) {
LM_CRIT("failed to add UDP listen socket to reactor\n");
goto error;
}
} else {
//Subsequent processes are helpers, only one should be woken to help at a time
if (reactor_add_reader_exclusive( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) {
LM_CRIT("failed to add UDP listen socket to reactor\n");
goto error;
}
}

return 0;
Expand Down Expand Up @@ -389,7 +405,9 @@ static int fork_dynamic_udp_process(void *si_filter)
bind_address=si; /* shortcut */
/* we first need to init the reactor to be able to add fd
* into it in child_init routines */
if (udp_proc_reactor_init(si) < 0 ||
/* Since this is in addition to the master process, si_rank should be > 0 to enable
* exlusive polling with EPOLL */
if (udp_proc_reactor_init(si, 1) < 0 ||
init_child(10000/*FIXME*/) < 0) {
goto error;
}
Expand Down Expand Up @@ -486,7 +504,7 @@ int udp_start_processes(int *chd_rank, int *startup_done)
bind_address=si; /* shortcut */
/* we first need to init the reactor to be able to add fd
* into it in child_init routines */
if (udp_proc_reactor_init(si) < 0 ||
if (udp_proc_reactor_init(si, i) < 0 ||
init_child(*chd_rank) < 0) {
report_failure_status();
if (*chd_rank == 1 && startup_done)
Expand Down
6 changes: 3 additions & 3 deletions net/proto_udp/proto_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ static int udp_read_req(struct socket_info *si, int* bytes_read)
/* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200029 */
len=recvfrom(bind_address->socket, buf, BUF_SIZE,0,&ri.src_su.s,&fromlen);
if (len==-1){
if (errno==EAGAIN)
return 0;
if ((errno==EINTR)||(errno==EWOULDBLOCK)|| (errno==ECONNREFUSED))
if (errno==EAGAIN || errno==EWOULDBLOCK || errno==EINTR)
return 1;
if (errno==ECONNREFUSED)
return -1;
LM_ERR("recvfrom:[%d] %s\n", errno, strerror(errno));
return -2;
Expand Down
9 changes: 6 additions & 3 deletions reactor_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ int init_reactor_size(void);
init_io_wait(&_worker_io, _name, reactor_size, io_poll_method, _prio_max)

#define reactor_add_reader( _fd, _type, _prio, _data) \
io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ)
io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ, 0)

#define reactor_add_reader_exclusive( _fd, _type, _prio, _data) \
io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ, 1)

#define reactor_add_reader_with_timeout( _fd, _type, _prio, _t, _data) \
io_watch_add(&_worker_io, _fd, _type, _data, _prio, _t, IO_WATCH_READ)
io_watch_add(&_worker_io, _fd, _type, _data, _prio, _t, IO_WATCH_READ, 0)

#define reactor_add_writer( _fd, _type, _prio, _data) \
io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_WRITE)
io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_WRITE, 0)

#define reactor_del_reader( _fd, _idx, _io_flags) \
io_watch_del(&_worker_io, _fd, _idx, _io_flags, IO_WATCH_READ)
Expand Down
43 changes: 23 additions & 20 deletions timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,32 +840,35 @@ void handle_timer_job(void)
struct os_timer *t;
ssize_t l;

/* read one "os_timer" pointer from the pipe (non-blocking) */
l = read( timer_fd_out, &t, sizeof(t) );
if (l==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
/* Read events until epipe is empty */
while(1) {
/* read one "os_timer" pointer from the pipe (non-blocking) */
l = read( timer_fd_out, &t, sizeof(t) );
if (l==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
}
}

/* run the handler */
if (t->flags&TIMER_FLAG_IS_UTIMER) {
/* run the handler */
if (t->flags&TIMER_FLAG_IS_UTIMER) {

if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("utimer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.utimer_f( t->time , t->t_param);
t->trigger_time = 0;
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("utimer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.utimer_f( t->time , t->t_param);
t->trigger_time = 0;

} else {
} else {

if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("timer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.timer_f( (unsigned int)t->time , t->t_param);
t->trigger_time = 0;
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
LM_WARN("timer job <%s> has a %lld us delay in execution\n",
t->label, *ijiffies-t->trigger_time);
t->u.timer_f( (unsigned int)t->time , t->t_param);
t->trigger_time = 0;

}
}

return;
Expand Down