diff --git a/io_wait.h b/io_wait.h index 47255440998..37d56a9a83f 100644 --- a/io_wait.h +++ b/io_wait.h @@ -347,7 +347,8 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto] void* data, int prio, unsigned int timeout, - int flags) + int flags, + int exclusive) { /* helper macros */ @@ -506,16 +507,9 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto] ep_event.events|=EPOLLOUT; if (!already) { again1: -#if 0 -/* This is currently broken, because when using EPOLLEXCLUSIVE, the OS will - * send sequential events to the same process - thus our pseudo-dispatcher - * will no longer work, since events on a pipe will be queued by a single - * process. - razvanc - */ -#if (defined __OS_linux) && (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 24) - if (e->flags & IO_WATCH_READ) +#ifdef EPOLLEXCLUSIVE + if (e->flags & IO_WATCH_READ && exclusive == 1) ep_event.events|=EPOLLEXCLUSIVE; -#endif #endif n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event); if (n==-1){ diff --git a/io_wait_loop.h b/io_wait_loop.h index d07a3be5163..cd6e296a438 100644 --- a/io_wait_loop.h +++ b/io_wait_loop.h @@ -175,7 +175,14 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat) unsigned int curr_time; again: +#ifdef EPOLLEXCLUSIVE + /* When using EPOLLEXCLUSIVE we don't want a single wakeup to handle multiple fds at once + as it could introduce latency in handling requests. + Limit each wakeup to handling events from a single fd */ + ret=n=epoll_wait(h->epfd, h->ep_array, 1, t*1000); +#else ret=n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000); +#endif if (n==-1){ if (errno == EINTR) { goto again; /* signal, ignore it */ diff --git a/ipc.c b/ipc.c index 05869da40ef..601b1225711 100644 --- a/ipc.c +++ b/ipc.c @@ -278,26 +278,29 @@ void ipc_handle_job(int fd) ipc_job job; int n; - /* read one IPC job from the pipe; even if the read is blocking, - * we are here triggered from the reactor, on a READ event, so - * we shouldn;t ever block */ - n = read(fd, &job, sizeof(job) ); - if (n==-1) { - if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + //Process all jobs until handle is drained + while (1) { + /* read one IPC job from the pipe; even if the read is blocking, + * we are here triggered from the reactor, on a READ event, so + * we shouldn;t ever block */ + n = read(fd, &job, sizeof(job) ); + if (n==-1) { + if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + return; + LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); return; - LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); - return; - } + } - LM_DBG("received job type %d[%s] from process %d\n", - job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc); + LM_DBG("received job type %d[%s] from process %d\n", + job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc); - /* custom handling for RPC type */ - if (job.handler_type==ipc_rpc_type) { - ((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2); - } else { - /* generic registered type */ - ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1); + /* custom handling for RPC type */ + if (job.handler_type==ipc_rpc_type) { + ((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2); + } else { + /* generic registered type */ + ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1); + } } return; diff --git a/net/net_udp.c b/net/net_udp.c index b9194a6b79f..50b7eb17b05 100644 --- a/net/net_udp.c +++ b/net/net_udp.c @@ -292,8 +292,11 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) switch(fm->type){ case F_UDP_READ: - n = protos[((struct socket_info*)fm->data)->proto].net. - read( fm->data /*si*/, &read); + do { + n = protos[((struct socket_info*)fm->data)->proto].net. + read( fm->data /*si*/, &read); + //Continue reading packets until we get an error + } while (n == 0); break; case F_TIMER_JOB: handle_timer_job(); @@ -327,11 +330,15 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) post_run_handle_script_reload(); pt_become_idle(); + + if (n == 1) { + n = 0; + } return n; } -int udp_proc_reactor_init( struct socket_info *si ) +int udp_proc_reactor_init( struct socket_info *si, int si_rank ) { /* create the reactor for UDP proc */ @@ -359,9 +366,18 @@ int udp_proc_reactor_init( struct socket_info *si ) } /* init: start watching the SIP UDP fd */ - if (reactor_add_reader( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) { - LM_CRIT("failed to add UDP listen socket to reactor\n"); - goto error; + //First child per socket becomes 'Master', will wake on every event + if (si_rank == 0) { + if (reactor_add_reader( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) { + LM_CRIT("failed to add UDP listen socket to reactor\n"); + goto error; + } + } else { + //Subsequent processes are helpers, only one should be woken to help at a time + if (reactor_add_reader_exclusive( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) { + LM_CRIT("failed to add UDP listen socket to reactor\n"); + goto error; + } } return 0; @@ -389,7 +405,9 @@ static int fork_dynamic_udp_process(void *si_filter) bind_address=si; /* shortcut */ /* we first need to init the reactor to be able to add fd * into it in child_init routines */ - if (udp_proc_reactor_init(si) < 0 || + /* Since this is in addition to the master process, si_rank should be > 0 to enable + * exlusive polling with EPOLL */ + if (udp_proc_reactor_init(si, 1) < 0 || init_child(10000/*FIXME*/) < 0) { goto error; } @@ -486,7 +504,7 @@ int udp_start_processes(int *chd_rank, int *startup_done) bind_address=si; /* shortcut */ /* we first need to init the reactor to be able to add fd * into it in child_init routines */ - if (udp_proc_reactor_init(si) < 0 || + if (udp_proc_reactor_init(si, i) < 0 || init_child(*chd_rank) < 0) { report_failure_status(); if (*chd_rank == 1 && startup_done) diff --git a/net/proto_udp/proto_udp.c b/net/proto_udp/proto_udp.c index 8f2c4cabdd7..56ad3f3b62b 100644 --- a/net/proto_udp/proto_udp.c +++ b/net/proto_udp/proto_udp.c @@ -135,9 +135,9 @@ static int udp_read_req(struct socket_info *si, int* bytes_read) /* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200029 */ len=recvfrom(bind_address->socket, buf, BUF_SIZE,0,&ri.src_su.s,&fromlen); if (len==-1){ - if (errno==EAGAIN) - return 0; - if ((errno==EINTR)||(errno==EWOULDBLOCK)|| (errno==ECONNREFUSED)) + if (errno==EAGAIN || errno==EWOULDBLOCK || errno==EINTR) + return 1; + if (errno==ECONNREFUSED) return -1; LM_ERR("recvfrom:[%d] %s\n", errno, strerror(errno)); return -2; diff --git a/reactor_defs.h b/reactor_defs.h index 6c167fa56fa..ef92fac8796 100644 --- a/reactor_defs.h +++ b/reactor_defs.h @@ -78,13 +78,16 @@ int init_reactor_size(void); init_io_wait(&_worker_io, _name, reactor_size, io_poll_method, _prio_max) #define reactor_add_reader( _fd, _type, _prio, _data) \ - io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ) + io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ, 0) + +#define reactor_add_reader_exclusive( _fd, _type, _prio, _data) \ + io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ, 1) #define reactor_add_reader_with_timeout( _fd, _type, _prio, _t, _data) \ - io_watch_add(&_worker_io, _fd, _type, _data, _prio, _t, IO_WATCH_READ) + io_watch_add(&_worker_io, _fd, _type, _data, _prio, _t, IO_WATCH_READ, 0) #define reactor_add_writer( _fd, _type, _prio, _data) \ - io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_WRITE) + io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_WRITE, 0) #define reactor_del_reader( _fd, _idx, _io_flags) \ io_watch_del(&_worker_io, _fd, _idx, _io_flags, IO_WATCH_READ) diff --git a/timer.c b/timer.c index d3970ffd535..32c223421bd 100644 --- a/timer.c +++ b/timer.c @@ -840,32 +840,35 @@ void handle_timer_job(void) struct os_timer *t; ssize_t l; - /* read one "os_timer" pointer from the pipe (non-blocking) */ - l = read( timer_fd_out, &t, sizeof(t) ); - if (l==-1) { - if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + /* Read events until epipe is empty */ + while(1) { + /* read one "os_timer" pointer from the pipe (non-blocking) */ + l = read( timer_fd_out, &t, sizeof(t) ); + if (l==-1) { + if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + return; + LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); return; - LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); - return; - } + } - /* run the handler */ - if (t->flags&TIMER_FLAG_IS_UTIMER) { + /* run the handler */ + if (t->flags&TIMER_FLAG_IS_UTIMER) { - if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) - LM_WARN("utimer job <%s> has a %lld us delay in execution\n", - t->label, *ijiffies-t->trigger_time); - t->u.utimer_f( t->time , t->t_param); - t->trigger_time = 0; + if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) + LM_WARN("utimer job <%s> has a %lld us delay in execution\n", + t->label, *ijiffies-t->trigger_time); + t->u.utimer_f( t->time , t->t_param); + t->trigger_time = 0; - } else { + } else { - if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) - LM_WARN("timer job <%s> has a %lld us delay in execution\n", - t->label, *ijiffies-t->trigger_time); - t->u.timer_f( (unsigned int)t->time , t->t_param); - t->trigger_time = 0; + if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) + LM_WARN("timer job <%s> has a %lld us delay in execution\n", + t->label, *ijiffies-t->trigger_time); + t->u.timer_f( (unsigned int)t->time , t->t_param); + t->trigger_time = 0; + } } return;