Skip to content

Commit 42d44a6

Browse files
committed
Optimize IO handling by draining FDs on every wake-up.
A single IO wake-up can correspond to multiple actual IO events/waiting IO. Currently, after handling a single event we go back to waiting on the FD, where we will be immediatly woke again because of the already waiting IO. This increases context switches and can increase latency. By handling all the IO possible on every wakeup before waiting again we can reduce both of these.
1 parent 5a8a915 commit 42d44a6

File tree

4 files changed

+55
-42
lines changed

4 files changed

+55
-42
lines changed

ipc.c

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -238,26 +238,29 @@ void ipc_handle_job(int fd)
238238
ipc_job job;
239239
int n;
240240

241-
/* read one IPC job from the pipe; even if the read is blocking,
242-
* we are here triggered from the reactor, on a READ event, so
243-
* we shouldn;t ever block */
244-
n = read(fd, &job, sizeof(job) );
245-
if (n==-1) {
246-
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
241+
//Process all jobs until handle is drained
242+
while (1) {
243+
/* read one IPC job from the pipe; even if the read is blocking,
244+
* we are here triggered from the reactor, on a READ event, so
245+
* we shouldn;t ever block */
246+
n = read(fd, &job, sizeof(job) );
247+
if (n==-1) {
248+
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
249+
return;
250+
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
247251
return;
248-
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
249-
return;
250-
}
252+
}
251253

252-
LM_DBG("received job type %d[%s] from process %d\n",
253-
job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);
254+
LM_DBG("received job type %d[%s] from process %d\n",
255+
job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);
254256

255-
/* custom handling for RPC type */
256-
if (job.handler_type==ipc_rpc_type) {
257-
((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
258-
} else {
259-
/* generic registered type */
260-
ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
257+
/* custom handling for RPC type */
258+
if (job.handler_type==ipc_rpc_type) {
259+
((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
260+
} else {
261+
/* generic registered type */
262+
ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
263+
}
261264
}
262265

263266
return;

net/net_udp.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,11 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
269269

270270
switch(fm->type){
271271
case F_UDP_READ:
272-
n = protos[((struct socket_info*)fm->data)->proto].net.
273-
read( fm->data /*si*/, &read);
272+
do {
273+
n = protos[((struct socket_info*)fm->data)->proto].net.
274+
read( fm->data /*si*/, &read);
275+
//Continue reading packets until we get an error
276+
} while (n == 0);
274277
break;
275278
case F_TIMER_JOB:
276279
handle_timer_job();
@@ -304,6 +307,10 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
304307
post_run_handle_script_reload();
305308

306309
pt_become_idle();
310+
311+
if (n == 1) {
312+
n = 0;
313+
}
307314
return n;
308315
}
309316

net/proto_udp/proto_udp.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ static int udp_read_req(struct socket_info *si, int* bytes_read)
133133
fromlen=sockaddru_len(si->su);
134134
len=recvfrom(bind_address->socket, buf, BUF_SIZE,0,&ri.src_su.s,&fromlen);
135135
if (len==-1){
136-
if (errno==EAGAIN)
137-
return 0;
138-
if ((errno==EINTR)||(errno==EWOULDBLOCK)|| (errno==ECONNREFUSED))
136+
if (errno==EAGAIN || errno==EWOULDBLOCK || errno==EINTR)
137+
return 1;
138+
if (errno==ECONNREFUSED)
139139
return -1;
140140
LM_ERR("recvfrom:[%d] %s\n", errno, strerror(errno));
141141
return -2;

timer.c

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -842,32 +842,35 @@ void handle_timer_job(void)
842842
struct os_timer *t;
843843
ssize_t l;
844844

845-
/* read one "os_timer" pointer from the pipe (non-blocking) */
846-
l = read( timer_fd_out, &t, sizeof(t) );
847-
if (l==-1) {
848-
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
845+
/* Read events until epipe is empty */
846+
while(1) {
847+
/* read one "os_timer" pointer from the pipe (non-blocking) */
848+
l = read( timer_fd_out, &t, sizeof(t) );
849+
if (l==-1) {
850+
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
851+
return;
852+
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
849853
return;
850-
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
851-
return;
852-
}
854+
}
853855

854-
/* run the handler */
855-
if (t->flags&TIMER_FLAG_IS_UTIMER) {
856+
/* run the handler */
857+
if (t->flags&TIMER_FLAG_IS_UTIMER) {
856858

857-
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
858-
LM_WARN("utimer job <%s> has a %lld us delay in execution\n",
859-
t->label, *ijiffies-t->trigger_time);
860-
t->u.utimer_f( t->time , t->t_param);
861-
t->trigger_time = 0;
859+
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
860+
LM_WARN("utimer job <%s> has a %lld us delay in execution\n",
861+
t->label, *ijiffies-t->trigger_time);
862+
t->u.utimer_f( t->time , t->t_param);
863+
t->trigger_time = 0;
862864

863-
} else {
865+
} else {
864866

865-
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
866-
LM_WARN("timer job <%s> has a %lld us delay in execution\n",
867-
t->label, *ijiffies-t->trigger_time);
868-
t->u.timer_f( (unsigned int)t->time , t->t_param);
869-
t->trigger_time = 0;
867+
if (t->trigger_time<(*ijiffies-ITIMER_TICK) )
868+
LM_WARN("timer job <%s> has a %lld us delay in execution\n",
869+
t->label, *ijiffies-t->trigger_time);
870+
t->u.timer_f( (unsigned int)t->time , t->t_param);
871+
t->trigger_time = 0;
870872

873+
}
871874
}
872875

873876
return;

0 commit comments

Comments
 (0)