Skip to content

Commit 374c824

Browse files
committed
orte/iof: Generalize the fix related to always-ready fds
Reference: https://bugzilla.kernel.org/show_bug.cgi?id=15272. Work with both stdin/stdout fds that are known to be always ready using libevent timers. Such fds can not be effectively used with non-blocking I/O functions like epoll, poll, select: - for poll/select the event will be triggered immediately; - for epoll `epoll_ctl` will reject an attempt to add this fd to the working set. Reference: http://www.wangafu.net/~nickm/libevent-book/Ref4_event.html Libevent suggests to use timers over event_active for the reasons provided by the link above. Signed-off-by: Artem Polyakov <[email protected]>
1 parent d9ad918 commit 374c824

File tree

8 files changed

+174
-147
lines changed

8 files changed

+174
-147
lines changed

orte/mca/iof/base/base.h

Lines changed: 107 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
#include "orte/runtime/orte_globals.h"
5656
#include "orte/mca/rml/rml_types.h"
5757
#include "orte/util/threads.h"
58+
#include "orte/mca/errmgr/errmgr.h"
5859

5960
BEGIN_C_DECLS
6061

@@ -88,6 +89,7 @@ typedef struct {
8889
bool pending;
8990
bool always_writable;
9091
opal_event_t *ev;
92+
struct timeval tv;
9193
int fd;
9294
opal_list_t outputs;
9395
} orte_iof_write_event_t;
@@ -109,9 +111,11 @@ typedef struct {
109111
opal_object_t super;
110112
struct orte_iof_proc_t *proc;
111113
opal_event_t *ev;
114+
struct timeval tv;
112115
int fd;
113116
orte_iof_tag_t tag;
114117
bool active;
118+
bool always_readable;
115119
orte_iof_sink_t *sink;
116120
} orte_iof_read_event_t;
117121
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
@@ -145,64 +149,120 @@ struct orte_iof_base_t {
145149
};
146150
typedef struct orte_iof_base_t orte_iof_base_t;
147151

152+
/* Write event macro's */
153+
154+
static inline bool
155+
orte_iof_base_fd_always_ready(int fd)
156+
{
157+
return opal_fd_is_regular(fd) ||
158+
(opal_fd_is_chardev(fd) && !isatty(fd)) ||
159+
opal_fd_is_blkdev(fd);
160+
}
161+
162+
#define ORTE_IOF_SINK_BLOCKSIZE (1024)
163+
164+
#define ORTE_IOF_SINK_ACTIVATE(wev) \
165+
do { \
166+
struct timeval *tv = NULL; \
167+
wev->pending = true; \
168+
ORTE_POST_OBJECT(wev); \
169+
if (wev->always_writable) { \
170+
/* Regular is always write ready. Use timer to activate */ \
171+
tv = &wev->tv; \
172+
} \
173+
if (opal_event_add(wev->ev, tv)) { \
174+
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
175+
} \
176+
} while(0);
177+
148178

149179
/* define an output "sink", adding it to the provided
150180
* endpoint list for this proc */
151-
#define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
152-
do { \
153-
orte_iof_sink_t *ep; \
154-
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
155-
"defining endpt: file %s line %d fd %d",\
156-
__FILE__, __LINE__, (fid))); \
157-
ep = OBJ_NEW(orte_iof_sink_t); \
158-
ep->name.jobid = (nm)->jobid; \
159-
ep->name.vpid = (nm)->vpid; \
160-
ep->tag = (tg); \
161-
if (0 <= (fid)) { \
162-
ep->wev->fd = (fid); \
163-
ep->wev->always_writable = opal_fd_is_regular(fid) || \
164-
opal_fd_is_chardev(fid) || \
165-
opal_fd_is_blkdev(fid); \
166-
opal_event_set(orte_event_base, \
167-
ep->wev->ev, ep->wev->fd, \
168-
OPAL_EV_WRITE, \
169-
wrthndlr, ep); \
170-
opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
171-
} \
172-
*(snk) = ep; \
173-
ORTE_POST_OBJECT(ep); \
181+
#define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
182+
do { \
183+
orte_iof_sink_t *ep; \
184+
OPAL_OUTPUT_VERBOSE((1, \
185+
orte_iof_base_framework.framework_output, \
186+
"defining endpt: file %s line %d fd %d", \
187+
__FILE__, __LINE__, (fid))); \
188+
ep = OBJ_NEW(orte_iof_sink_t); \
189+
ep->name.jobid = (nm)->jobid; \
190+
ep->name.vpid = (nm)->vpid; \
191+
ep->tag = (tg); \
192+
if (0 <= (fid)) { \
193+
ep->wev->fd = (fid); \
194+
ep->wev->always_writable = \
195+
orte_iof_base_fd_always_ready(fid); \
196+
if(ep->wev->always_writable) { \
197+
opal_event_evtimer_set(orte_event_base, \
198+
ep->wev->ev, wrthndlr, ep); \
199+
} else { \
200+
opal_event_set(orte_event_base, \
201+
ep->wev->ev, ep->wev->fd, \
202+
OPAL_EV_WRITE, \
203+
wrthndlr, ep); \
204+
} \
205+
opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
206+
} \
207+
*(snk) = ep; \
208+
ORTE_POST_OBJECT(ep); \
174209
} while(0);
175210

211+
/* Read event macro's */
212+
#define ORTE_IOF_READ_ADDEV(rev) \
213+
do { \
214+
struct timeval *tv = NULL; \
215+
if (rev->always_readable) { \
216+
tv = &rev->tv; \
217+
} \
218+
if (opal_event_add(rev->ev, tv)) { \
219+
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
220+
} \
221+
} while(0);
222+
223+
#define ORTE_IOF_READ_ACTIVATE(rev) \
224+
do { \
225+
rev->active = true; \
226+
ORTE_POST_OBJECT(rev); \
227+
ORTE_IOF_READ_ADDEV(rev); \
228+
} while(0);
229+
230+
176231
/* add list of structs that has name of proc + orte_iof_tag_t - when
177232
* defining a read event, search list for proc, add flag to the tag.
178233
* when closing a read fd, find proc on list and zero out that flag
179234
* when all flags = 0, then iof is complete - set message event to
180235
* daemon processor indicating proc iof is terminated
181236
*/
182-
#define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv) \
183-
do { \
184-
orte_iof_read_event_t *rev; \
185-
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
186-
"%s defining read event for %s: %s %d", \
187-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
188-
ORTE_NAME_PRINT(&(p)->name), \
189-
__FILE__, __LINE__)); \
190-
rev = OBJ_NEW(orte_iof_read_event_t); \
191-
OBJ_RETAIN((p)); \
192-
rev->proc = (struct orte_iof_proc_t*)(p); \
193-
rev->tag = (tg); \
194-
rev->fd = (fid); \
195-
*(rv) = rev; \
196-
opal_event_set(orte_event_base, \
197-
rev->ev, (fid), \
198-
OPAL_EV_READ, \
199-
(cbfunc), rev); \
200-
opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
201-
if ((actv)) { \
202-
rev->active = true; \
203-
ORTE_POST_OBJECT(rev); \
204-
opal_event_add(rev->ev, 0); \
205-
} \
237+
#define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv) \
238+
do { \
239+
orte_iof_read_event_t *rev; \
240+
OPAL_OUTPUT_VERBOSE((1, \
241+
orte_iof_base_framework.framework_output, \
242+
"%s defining read event for %s: %s %d", \
243+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
244+
ORTE_NAME_PRINT(&(p)->name), \
245+
__FILE__, __LINE__)); \
246+
rev = OBJ_NEW(orte_iof_read_event_t); \
247+
OBJ_RETAIN((p)); \
248+
rev->proc = (struct orte_iof_proc_t*)(p); \
249+
rev->tag = (tg); \
250+
rev->fd = (fid); \
251+
rev->always_readable = orte_iof_base_fd_always_ready(fid); \
252+
*(rv) = rev; \
253+
if(rev->always_readable) { \
254+
opal_event_evtimer_set(orte_event_base, \
255+
rev->ev, (cbfunc), rev); \
256+
} else { \
257+
opal_event_set(orte_event_base, \
258+
rev->ev, (fid), \
259+
OPAL_EV_READ, \
260+
(cbfunc), rev); \
261+
} \
262+
opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
263+
if ((actv)) { \
264+
ORTE_IOF_READ_ACTIVATE(rev) \
265+
} \
206266
} while(0);
207267

208268

orte/mca/iof/base/iof_base_frame.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
270270
rev->active = false;
271271
rev->ev = opal_event_alloc();
272272
rev->sink = NULL;
273+
rev->tv.tv_sec = 0;
274+
rev->tv.tv_usec = 0;
273275
}
274276
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
275277
{
@@ -303,6 +305,8 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
303305
wev->fd = -1;
304306
OBJ_CONSTRUCT(&wev->outputs, opal_list_t);
305307
wev->ev = opal_event_alloc();
308+
wev->tv.tv_sec = 0;
309+
wev->tv.tv_usec = 0;
306310
}
307311
static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
308312
{

orte/mca/iof/base/iof_base_output.c

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -260,22 +260,11 @@ int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t s
260260

261261
/* is the write event issued? */
262262
if (!channel->pending) {
263-
int rc = -1;
264263
/* issue it */
265264
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
266265
"%s write:output adding write event",
267266
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
268-
channel->pending = true;
269-
ORTE_POST_OBJECT(channel);
270-
if (channel->always_writable) {
271-
/* Regular is always write ready. Activate the handler. */
272-
opal_event_active (channel->ev, OPAL_EV_WRITE, 1);
273-
} else {
274-
rc = opal_event_add(channel->ev, 0);
275-
if (rc) {
276-
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
277-
}
278-
}
267+
ORTE_IOF_SINK_ACTIVATE(channel);
279268
}
280269

281270
return num_buffered;
@@ -307,8 +296,7 @@ void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
307296
}
308297
}
309298

310-
#define ORTE_IOF_REGULARF_BLOCK (1024)
311-
void orte_iof_base_write_handler(int fd, short event, void *cbdata)
299+
void orte_iof_base_write_handler(int _fd, short event, void *cbdata)
312300
{
313301
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
314302
orte_iof_write_event_t *wev = sink->wev;
@@ -344,11 +332,7 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata)
344332
/* leave the write event running so it will call us again
345333
* when the fd is ready.
346334
*/
347-
if(wev->always_writable){
348-
/* Schedule another event */
349-
opal_event_active (wev->ev, OPAL_EV_WRITE, 1);
350-
}
351-
return;
335+
goto NEXT_CALL;
352336
}
353337
/* otherwise, something bad happened so all we can do is abort
354338
* this attempt
@@ -371,29 +355,23 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata)
371355
/* leave the write event running so it will call us again
372356
* when the fd is ready
373357
*/
374-
if(wev->always_writable){
375-
/* Schedule another event */
376-
opal_event_active (wev->ev, OPAL_EV_WRITE, 1);
377-
378-
}
379-
return;
358+
goto NEXT_CALL;
380359
}
381360
OBJ_RELEASE(output);
382361

383362
total_written += num_written;
384-
if(wev->always_writable && (ORTE_IOF_REGULARF_BLOCK <= total_written)){
363+
if(wev->always_writable && (ORTE_IOF_SINK_BLOCKSIZE <= total_written)){
385364
/* If this is a regular file it will never tell us it will block
386365
* Write no more than ORTE_IOF_REGULARF_BLOCK at a time allowing
387366
* other fds to progress
388367
*/
389-
opal_event_active (wev->ev, OPAL_EV_WRITE, 1);
390-
return;
368+
goto NEXT_CALL;
391369
}
392370
}
393371
ABORT:
394-
if (!wev->always_writable){
395-
opal_event_del(wev->ev);
396-
}
397372
wev->pending = false;
398373
ORTE_POST_OBJECT(wev);
374+
return;
375+
NEXT_CALL:
376+
ORTE_IOF_SINK_ACTIVATE(wev);
399377
}

0 commit comments

Comments
 (0)