Skip to content

Commit bbe570a

Browse files
committed
opal_progress_threads: update to the API
There are now four functions and one global constant: * opal_progress_thread_name: the name of the OPAL-wide async progress thread. If you have general purpose events that you need to run in *a* progress thread, but not a *dedicated* progress thread, use this name in the functions below to glom your events on to the general OPAL-wide async progress thread. * opal_progress_thread_init(): return an event base corresponding to a progress thread of the specified name (a progress thread will be created for that name if it does not already exist). * opal_progress_thread_finalize(): decrement the refcount on the passed progress thread name. If the refcount is 0, stop the thread and destroy the event base. * opal_progress_thread_pause(): stop processing events on the event base corresponding to the progress thread name, but do not destroy the event base. * opal_progess_thread_resume(): resume processing events on the event base corresponding to a previously-paused progress thread name. (cherry picked from commit open-mpi/ompi@99fa054)
1 parent 7fe13ac commit bbe570a

File tree

2 files changed

+204
-116
lines changed

2 files changed

+204
-116
lines changed

opal/runtime/opal_progress_threads.c

Lines changed: 155 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/*
22
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
3+
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
34
* $COPYRIGHT$
45
*
56
* Additional copyrights may follow
@@ -22,66 +23,79 @@
2223

2324
#include "opal/runtime/opal_progress_threads.h"
2425

26+
2527
/* create a tracking object for progress threads */
2628
typedef struct {
2729
opal_list_item_t super;
30+
2831
int refcount;
2932
char *name;
33+
3034
opal_event_base_t *ev_base;
35+
36+
/* This will be set to false when it is time for the progress
37+
thread to exit */
3138
volatile bool ev_active;
32-
bool block_active;
39+
40+
/* This event will always be set on the ev_base (so that the
41+
ev_base is not empty!) */
3342
opal_event_t block;
34-
bool engine_defined;
43+
44+
bool engine_constructed;
3545
opal_thread_t engine;
36-
int pipe[2];
3746
} opal_progress_tracker_t;
38-
static void trkcon(opal_progress_tracker_t *p)
47+
48+
static void tracker_constructor(opal_progress_tracker_t *p)
3949
{
4050
p->refcount = 1; // start at one since someone created it
4151
p->name = NULL;
4252
p->ev_base = NULL;
43-
p->ev_active = true;
44-
p->block_active = false;
45-
p->engine_defined = false;
46-
p->pipe[0] = -1;
47-
p->pipe[1] = -1;
53+
p->ev_active = false;
54+
p->engine_constructed = false;
4855
}
49-
static void trkdes(opal_progress_tracker_t *p)
56+
57+
static void tracker_destructor(opal_progress_tracker_t *p)
5058
{
59+
opal_event_del(&p->block);
60+
5161
if (NULL != p->name) {
5262
free(p->name);
5363
}
54-
if (p->block_active) {
55-
opal_event_del(&p->block);
56-
}
5764
if (NULL != p->ev_base) {
5865
opal_event_base_free(p->ev_base);
5966
}
60-
if (0 <= p->pipe[0]) {
61-
close(p->pipe[0]);
62-
}
63-
if (0 <= p->pipe[1]) {
64-
close(p->pipe[1]);
65-
}
66-
if (p->engine_defined) {
67+
if (p->engine_constructed) {
6768
OBJ_DESTRUCT(&p->engine);
6869
}
6970
}
71+
7072
static OBJ_CLASS_INSTANCE(opal_progress_tracker_t,
7173
opal_list_item_t,
72-
trkcon, trkdes);
74+
tracker_constructor,
75+
tracker_destructor);
7376

74-
static opal_list_t tracking;
7577
static bool inited = false;
76-
static void wakeup(int fd, short args, void *cbdata)
78+
static opal_list_t tracking;
79+
static struct timeval long_timeout = {
80+
.tv_sec = 3600,
81+
.tv_usec = 0
82+
};
83+
static const char *shared_thread_name = "OPAL-wide async progress thread";
84+
85+
/*
86+
* If this event is fired, just restart it so that this event base
87+
* continues to have something to block on.
88+
*/
89+
static void dummy_timeout_cb(int fd, short args, void *cbdata)
7790
{
7891
opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata;
7992

80-
/* if this event fired, then the blocker event will
81-
* be deleted from the event base by libevent, so flag
82-
* it so we don't try to delete it again */
83-
trk->block_active = false;
93+
opal_event_add(&trk->block, &long_timeout);
8494
}
95+
96+
/*
97+
* Main for the progress thread
98+
*/
8599
static void* progress_engine(opal_object_t *obj)
86100
{
87101
opal_thread_t *t = (opal_thread_t*)obj;
@@ -90,11 +104,41 @@ static void* progress_engine(opal_object_t *obj)
90104
while (trk->ev_active) {
91105
opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE);
92106
}
107+
93108
return OPAL_THREAD_CANCELLED;
94109
}
95110

96-
opal_event_base_t *opal_start_progress_thread(char *name,
97-
bool create_block)
111+
static void stop_progress_engine(opal_progress_tracker_t *trk)
112+
{
113+
assert(trk->ev_active);
114+
trk->ev_active = false;
115+
116+
/* break the event loop - this will cause the loop to exit upon
117+
completion of any current event */
118+
opal_event_base_loopbreak(trk->ev_base);
119+
120+
opal_thread_join(&trk->engine, NULL);
121+
}
122+
123+
static int start_progress_engine(opal_progress_tracker_t *trk)
124+
{
125+
assert(!trk->ev_active);
126+
trk->ev_active = true;
127+
128+
/* fork off a thread to progress it */
129+
trk->engine.t_run = progress_engine;
130+
trk->engine.t_arg = trk;
131+
132+
int rc = opal_thread_start(&trk->engine);
133+
if (OPAL_SUCCESS != rc) {
134+
OPAL_ERROR_LOG(rc);
135+
OBJ_RELEASE(trk);
136+
}
137+
138+
return rc;
139+
}
140+
141+
opal_event_base_t *opal_progress_thread_init(const char *name)
98142
{
99143
opal_progress_tracker_t *trk;
100144
int rc;
@@ -104,6 +148,10 @@ opal_event_base_t *opal_start_progress_thread(char *name,
104148
inited = true;
105149
}
106150

151+
if (NULL == name) {
152+
name = shared_thread_name;
153+
}
154+
107155
/* check if we already have this thread */
108156
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
109157
if (0 == strcmp(name, trk->name)) {
@@ -115,129 +163,132 @@ opal_event_base_t *opal_start_progress_thread(char *name,
115163
}
116164

117165
trk = OBJ_NEW(opal_progress_tracker_t);
166+
if (NULL == trk) {
167+
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
168+
return NULL;
169+
}
170+
118171
trk->name = strdup(name);
119-
if (NULL == (trk->ev_base = opal_event_base_create())) {
172+
if (NULL == trk->name) {
120173
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
121174
OBJ_RELEASE(trk);
122175
return NULL;
123176
}
124177

125-
if (create_block) {
126-
/* add an event it can block on */
127-
if (0 > pipe(trk->pipe)) {
128-
OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
129-
OBJ_RELEASE(trk);
130-
return NULL;
131-
}
132-
/* Make sure the pipe FDs are set to close-on-exec so that
133-
they don't leak into children */
134-
if (opal_fd_set_cloexec(trk->pipe[0]) != OPAL_SUCCESS ||
135-
opal_fd_set_cloexec(trk->pipe[1]) != OPAL_SUCCESS) {
136-
OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
137-
OBJ_RELEASE(trk);
138-
return NULL;
139-
}
140-
opal_event_set(trk->ev_base, &trk->block, trk->pipe[0], OPAL_EV_READ, wakeup, trk);
141-
opal_event_add(&trk->block, 0);
142-
trk->block_active = true;
178+
if (NULL == (trk->ev_base = opal_event_base_create())) {
179+
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
180+
OBJ_RELEASE(trk);
181+
return NULL;
143182
}
144183

184+
/* add an event to the new event base (if there are no events,
185+
opal_event_loop() will return immediately) */
186+
opal_event_set(trk->ev_base, &trk->block, -1, OPAL_EV_PERSIST,
187+
dummy_timeout_cb, trk);
188+
opal_event_add(&trk->block, &long_timeout);
189+
145190
/* construct the thread object */
146191
OBJ_CONSTRUCT(&trk->engine, opal_thread_t);
147-
trk->engine_defined = true;
148-
/* fork off a thread to progress it */
149-
trk->engine.t_run = progress_engine;
150-
trk->engine.t_arg = trk;
151-
if (OPAL_SUCCESS != (rc = opal_thread_start(&trk->engine))) {
192+
trk->engine_constructed = true;
193+
if (OPAL_SUCCESS != (rc = start_progress_engine(trk))) {
152194
OPAL_ERROR_LOG(rc);
153195
OBJ_RELEASE(trk);
154196
return NULL;
155197
}
156198
opal_list_append(&tracking, &trk->super);
199+
157200
return trk->ev_base;
158201
}
159202

160-
void opal_stop_progress_thread(char *name, bool cleanup)
203+
int opal_progress_thread_finalize(const char *name)
161204
{
162205
opal_progress_tracker_t *trk;
163-
int i;
164206

165207
if (!inited) {
166208
/* nothing we can do */
167-
return;
209+
return OPAL_ERR_NOT_FOUND;
210+
}
211+
212+
if (NULL == name) {
213+
name = shared_thread_name;
168214
}
169215

170216
/* find the specified engine */
171217
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
172218
if (0 == strcmp(name, trk->name)) {
173-
/* if it is already inactive, then just cleanup if that
174-
* is the request */
175-
if (!trk->ev_active) {
176-
if (cleanup) {
177-
opal_list_remove_item(&tracking, &trk->super);
178-
OBJ_RELEASE(trk);
179-
}
180-
return;
181-
}
182219
/* decrement the refcount */
183220
--trk->refcount;
184-
/* if we have reached zero, then it's time to stop it */
185-
if (0 < trk->refcount) {
186-
return;
187-
}
188-
/* mark it as inactive */
189-
trk->ev_active = false;
190-
/* break the event loop - this will cause the loop to exit
191-
* upon completion of any current event */
192-
opal_event_base_loopbreak(trk->ev_base);
193-
/* if present, use the block to break it loose just in
194-
* case the thread is blocked in a call to select for
195-
* a long time */
196-
if (trk->block_active) {
197-
i=1;
198-
write(trk->pipe[1], &i, sizeof(int));
221+
222+
/* If the refcount is still above 0, we're done here */
223+
if (trk->refcount > 0) {
224+
return OPAL_SUCCESS;
199225
}
200-
/* wait for thread to exit */
201-
opal_thread_join(&trk->engine, NULL);
202-
/* cleanup, if they indicated they are done with this event base */
203-
if (cleanup) {
204-
opal_list_remove_item(&tracking, &trk->super);
205-
OBJ_RELEASE(trk);
226+
227+
/* If the progress thread is active, stop it */
228+
if (trk->ev_active) {
229+
stop_progress_engine(trk);
206230
}
207-
return;
231+
232+
opal_list_remove_item(&tracking, &trk->super);
233+
OBJ_RELEASE(trk);
234+
return OPAL_SUCCESS;
208235
}
209236
}
237+
238+
return OPAL_ERR_NOT_FOUND;
210239
}
211240

212-
int opal_restart_progress_thread(char *name)
241+
/*
242+
* Stop the progress thread, but don't delete the tracker (or event base)
243+
*/
244+
int opal_progress_thread_pause(const char *name)
213245
{
214246
opal_progress_tracker_t *trk;
215-
int rc;
216247

217248
if (!inited) {
218249
/* nothing we can do */
219-
return OPAL_ERROR;
250+
return OPAL_ERR_NOT_FOUND;
251+
}
252+
253+
if (NULL == name) {
254+
name = shared_thread_name;
220255
}
221256

222257
/* find the specified engine */
223258
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
224259
if (0 == strcmp(name, trk->name)) {
225-
if (!trk->engine_defined) {
226-
OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED);
227-
return OPAL_ERR_NOT_SUPPORTED;
228-
}
229-
/* up the refcount */
230-
++trk->refcount;
231-
/* ensure the block is set, if requested */
232-
if (0 <= trk->pipe[0] && !trk->block_active) {
233-
opal_event_add(&trk->block, 0);
234-
trk->block_active = true;
260+
if (trk->ev_active) {
261+
stop_progress_engine(trk);
235262
}
236-
/* start the thread again */
237-
if (OPAL_SUCCESS != (rc = opal_thread_start(&trk->engine))) {
238-
OPAL_ERROR_LOG(rc);
239-
return rc;
263+
264+
return OPAL_SUCCESS;
265+
}
266+
}
267+
268+
return OPAL_ERR_NOT_FOUND;
269+
}
270+
271+
int opal_progress_thread_resume(const char *name)
272+
{
273+
opal_progress_tracker_t *trk;
274+
275+
if (!inited) {
276+
/* nothing we can do */
277+
return OPAL_ERR_NOT_FOUND;
278+
}
279+
280+
if (NULL == name) {
281+
name = shared_thread_name;
282+
}
283+
284+
/* find the specified engine */
285+
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
286+
if (0 == strcmp(name, trk->name)) {
287+
if (trk->ev_active) {
288+
return OPAL_ERR_RESOURCE_BUSY;
240289
}
290+
291+
return start_progress_engine(trk);
241292
}
242293
}
243294

0 commit comments

Comments
 (0)