14
14
* All rights reserved.
15
15
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
16
16
* Copyright (c) 2017 IBM Corporation. All rights reserved.
17
+ * Copyright (c) 2017 Mellanox Technologies. All rights reserved.
17
18
* $COPYRIGHT$
18
19
*
19
20
* Additional copyrights may follow
48
49
#include "opal/class/opal_bitmap.h"
49
50
#include "orte/mca/mca.h"
50
51
#include "opal/mca/event/event.h"
52
+ #include "opal/util/fd.h"
51
53
52
54
#include "orte/mca/iof/iof.h"
53
55
#include "orte/runtime/orte_globals.h"
54
56
#include "orte/mca/rml/rml_types.h"
55
57
#include "orte/util/threads.h"
58
+ #include "orte/mca/errmgr/errmgr.h"
56
59
57
60
BEGIN_C_DECLS
58
61
@@ -84,7 +87,9 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_job_t);
84
87
typedef struct {
85
88
opal_list_item_t super ;
86
89
bool pending ;
90
+ bool always_writable ;
87
91
opal_event_t * ev ;
92
+ struct timeval tv ;
88
93
int fd ;
89
94
opal_list_t outputs ;
90
95
} orte_iof_write_event_t ;
@@ -106,9 +111,11 @@ typedef struct {
106
111
opal_object_t super ;
107
112
struct orte_iof_proc_t * proc ;
108
113
opal_event_t * ev ;
114
+ struct timeval tv ;
109
115
int fd ;
110
116
orte_iof_tag_t tag ;
111
117
bool active ;
118
+ bool always_readable ;
112
119
orte_iof_sink_t * sink ;
113
120
} orte_iof_read_event_t ;
114
121
ORTE_DECLSPEC OBJ_CLASS_DECLARATION (orte_iof_read_event_t );
@@ -142,61 +149,120 @@ struct orte_iof_base_t {
142
149
};
143
150
typedef struct orte_iof_base_t orte_iof_base_t ;
144
151
152
+ /* Write event macro's */
153
+
154
+ static inline bool
155
+ orte_iof_base_fd_always_ready (int fd )
156
+ {
157
+ return opal_fd_is_regular (fd ) ||
158
+ (opal_fd_is_chardev (fd ) && !isatty (fd )) ||
159
+ opal_fd_is_blkdev (fd );
160
+ }
161
+
162
+ #define ORTE_IOF_SINK_BLOCKSIZE (1024)
163
+
164
+ #define ORTE_IOF_SINK_ACTIVATE (wev ) \
165
+ do { \
166
+ struct timeval *tv = NULL; \
167
+ wev->pending = true; \
168
+ ORTE_POST_OBJECT(wev); \
169
+ if (wev->always_writable) { \
170
+ /* Regular is always write ready. Use timer to activate */ \
171
+ tv = & wev -> tv ; \
172
+ } \
173
+ if (opal_event_add (wev -> ev , tv )) { \
174
+ ORTE_ERROR_LOG (ORTE_ERR_BAD_PARAM ); \
175
+ } \
176
+ } while (0 );
177
+
145
178
146
179
/* define an output "sink", adding it to the provided
147
180
* endpoint list for this proc */
148
- #define ORTE_IOF_SINK_DEFINE (snk , nm , fid , tg , wrthndlr ) \
149
- do { \
150
- orte_iof_sink_t *ep; \
151
- OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
152
- "defining endpt: file %s line %d fd %d",\
153
- __FILE__, __LINE__, (fid))); \
154
- ep = OBJ_NEW(orte_iof_sink_t); \
155
- ep->name.jobid = (nm)->jobid; \
156
- ep->name.vpid = (nm)->vpid; \
157
- ep->tag = (tg); \
158
- if (0 <= (fid)) { \
159
- ep->wev->fd = (fid); \
160
- opal_event_set(orte_event_base, \
161
- ep->wev->ev, ep->wev->fd, \
162
- OPAL_EV_WRITE, \
163
- wrthndlr, ep); \
164
- opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
165
- } \
166
- *(snk) = ep; \
167
- ORTE_POST_OBJECT(ep); \
181
+ #define ORTE_IOF_SINK_DEFINE (snk , nm , fid , tg , wrthndlr ) \
182
+ do { \
183
+ orte_iof_sink_t *ep; \
184
+ OPAL_OUTPUT_VERBOSE((1, \
185
+ orte_iof_base_framework.framework_output, \
186
+ "defining endpt: file %s line %d fd %d", \
187
+ __FILE__, __LINE__, (fid))); \
188
+ ep = OBJ_NEW(orte_iof_sink_t); \
189
+ ep->name.jobid = (nm)->jobid; \
190
+ ep->name.vpid = (nm)->vpid; \
191
+ ep->tag = (tg); \
192
+ if (0 <= (fid)) { \
193
+ ep->wev->fd = (fid); \
194
+ ep->wev->always_writable = \
195
+ orte_iof_base_fd_always_ready(fid); \
196
+ if(ep->wev->always_writable) { \
197
+ opal_event_evtimer_set(orte_event_base, \
198
+ ep->wev->ev, wrthndlr, ep); \
199
+ } else { \
200
+ opal_event_set(orte_event_base, \
201
+ ep->wev->ev, ep->wev->fd, \
202
+ OPAL_EV_WRITE, \
203
+ wrthndlr, ep); \
204
+ } \
205
+ opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
206
+ } \
207
+ *(snk) = ep; \
208
+ ORTE_POST_OBJECT(ep); \
168
209
} while(0);
169
210
211
+ /* Read event macro's */
212
+ #define ORTE_IOF_READ_ADDEV (rev ) \
213
+ do { \
214
+ struct timeval *tv = NULL; \
215
+ if (rev->always_readable) { \
216
+ tv = &rev->tv; \
217
+ } \
218
+ if (opal_event_add(rev->ev, tv)) { \
219
+ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
220
+ } \
221
+ } while(0);
222
+
223
+ #define ORTE_IOF_READ_ACTIVATE (rev ) \
224
+ do { \
225
+ rev->active = true; \
226
+ ORTE_POST_OBJECT(rev); \
227
+ ORTE_IOF_READ_ADDEV(rev); \
228
+ } while(0);
229
+
230
+
170
231
/* add list of structs that has name of proc + orte_iof_tag_t - when
171
232
* defining a read event, search list for proc, add flag to the tag.
172
233
* when closing a read fd, find proc on list and zero out that flag
173
234
* when all flags = 0, then iof is complete - set message event to
174
235
* daemon processor indicating proc iof is terminated
175
236
*/
176
- #define ORTE_IOF_READ_EVENT (rv , p , fid , tg , cbfunc , actv ) \
177
- do { \
178
- orte_iof_read_event_t *rev; \
179
- OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
180
- "%s defining read event for %s: %s %d", \
181
- ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
182
- ORTE_NAME_PRINT(&(p)->name), \
183
- __FILE__, __LINE__)); \
184
- rev = OBJ_NEW(orte_iof_read_event_t); \
185
- OBJ_RETAIN((p)); \
186
- rev->proc = (struct orte_iof_proc_t*)(p); \
187
- rev->tag = (tg); \
188
- rev->fd = (fid); \
189
- *(rv) = rev; \
190
- opal_event_set(orte_event_base, \
191
- rev->ev, (fid), \
192
- OPAL_EV_READ, \
193
- (cbfunc), rev); \
194
- opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
195
- if ((actv)) { \
196
- rev->active = true; \
197
- ORTE_POST_OBJECT(rev); \
198
- opal_event_add(rev->ev, 0); \
199
- } \
237
+ #define ORTE_IOF_READ_EVENT (rv , p , fid , tg , cbfunc , actv ) \
238
+ do { \
239
+ orte_iof_read_event_t *rev; \
240
+ OPAL_OUTPUT_VERBOSE((1, \
241
+ orte_iof_base_framework.framework_output, \
242
+ "%s defining read event for %s: %s %d", \
243
+ ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
244
+ ORTE_NAME_PRINT(&(p)->name), \
245
+ __FILE__, __LINE__)); \
246
+ rev = OBJ_NEW(orte_iof_read_event_t); \
247
+ OBJ_RETAIN((p)); \
248
+ rev->proc = (struct orte_iof_proc_t*)(p); \
249
+ rev->tag = (tg); \
250
+ rev->fd = (fid); \
251
+ rev->always_readable = orte_iof_base_fd_always_ready(fid); \
252
+ *(rv) = rev; \
253
+ if(rev->always_readable) { \
254
+ opal_event_evtimer_set(orte_event_base, \
255
+ rev->ev, (cbfunc), rev); \
256
+ } else { \
257
+ opal_event_set(orte_event_base, \
258
+ rev->ev, (fid), \
259
+ OPAL_EV_READ, \
260
+ (cbfunc), rev); \
261
+ } \
262
+ opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
263
+ if ((actv)) { \
264
+ ORTE_IOF_READ_ACTIVATE(rev) \
265
+ } \
200
266
} while(0);
201
267
202
268
0 commit comments