Skip to content

Commit fb27bd1

Browse files
author
Ralph Castain
authored
Merge pull request #3143 from rhc54/topic/odls
Enable parallel fork/exec of local procs by providing the option of multiple odls progress threads
2 parents 3afadba + 70591bf commit fb27bd1

File tree

9 files changed

+426
-378
lines changed

9 files changed

+426
-378
lines changed

orte/mca/iof/base/iof_base_setup.c

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,6 @@ orte_iof_base_setup_parent(const orte_process_name_t* name,
219219
{
220220
int ret;
221221

222-
close(opts->p_stdin[0]);
223-
close(opts->p_stdout[1]);
224-
close(opts->p_stderr[1]);
225-
close(opts->p_internal[1]);
226-
227222
/* connect stdin endpoint */
228223
if (opts->connect_stdin) {
229224
/* and connect the pty to stdin */

orte/mca/odls/base/odls_base_default_fns.c

Lines changed: 300 additions & 275 deletions
Large diffs are not rendered by default.

orte/mca/odls/base/odls_base_frame.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* All rights reserved.
1616
* Copyright (c) 2014-2015 Research Organization for Information Science
1717
* and Technology (RIST). All rights reserved.
18+
* Copyright (c) 2017 Intel, Inc. All rights reserved.
1819
* $COPYRIGHT$
1920
*
2021
* Additional copyrights may follow
@@ -32,6 +33,7 @@
3233
#include "orte/mca/mca.h"
3334
#include "opal/mca/base/base.h"
3435
#include "opal/mca/hwloc/hwloc-internal.h"
36+
#include "opal/runtime/opal_progress_threads.h"
3537
#include "opal/util/output.h"
3638
#include "opal/util/path.h"
3739
#include "opal/util/argv.h"
@@ -76,6 +78,14 @@ static int orte_odls_base_register(mca_base_register_flag_t flags)
7678
MCA_BASE_VAR_SCOPE_READONLY,
7779
&orte_odls_globals.timeout_before_sigkill);
7880

81+
orte_odls_globals.num_threads = 0;
82+
(void) mca_base_var_register("orte", "odls", "base", "num_threads",
83+
"Number of threads to use for spawning local procs",
84+
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
85+
OPAL_INFO_LVL_9,
86+
MCA_BASE_VAR_SCOPE_READONLY,
87+
&orte_odls_globals.num_threads);
88+
7989
return ORTE_SUCCESS;
8090
}
8191

@@ -99,6 +109,15 @@ static int orte_odls_base_close(void)
99109
}
100110
OBJ_RELEASE(orte_local_children);
101111

112+
if (0 < orte_odls_globals.num_threads) {
113+
/* stop the progress threads */
114+
for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) {
115+
opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]);
116+
}
117+
}
118+
free(orte_odls_globals.ev_bases);
119+
opal_argv_free(orte_odls_globals.ev_threads);
120+
102121
return mca_base_framework_components_close(&orte_odls_base_framework, NULL);
103122
}
104123

@@ -174,6 +193,25 @@ static int orte_odls_base_open(mca_base_open_flag_t flags)
174193
opal_argv_append_nosize(&orte_odls_globals.xtermcmd, "-e");
175194
}
176195

196+
/* setup the pool of worker threads */
197+
orte_odls_globals.ev_threads = NULL;
198+
orte_odls_globals.next_base = 0;
199+
if (0 == orte_odls_globals.num_threads) {
200+
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
201+
/* use the default event base */
202+
orte_odls_globals.ev_bases[0] = orte_event_base;
203+
} else {
204+
orte_odls_globals.ev_bases =
205+
(opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*));
206+
for (i=0; i < orte_odls_globals.num_threads; i++) {
207+
asprintf(&tmp, "ORTE-ODLS-%d", i);
208+
orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp);
209+
opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp);
210+
free(tmp);
211+
}
212+
213+
}
214+
177215
/* Open up all available components */
178216
return mca_base_framework_components_open(&orte_odls_base_framework, flags);
179217
}
@@ -197,3 +235,11 @@ OBJ_CLASS_INSTANCE(orte_odls_launch_local_t,
197235
opal_object_t,
198236
launch_local_const,
199237
launch_local_dest);
238+
239+
static void sccon(orte_odls_spawn_caddy_t *p)
240+
{
241+
memset(&p->opts, 0, sizeof(orte_iof_base_io_conf_t));
242+
}
243+
OBJ_CLASS_INSTANCE(orte_odls_spawn_caddy_t,
244+
opal_object_t,
245+
sccon, NULL);

orte/mca/odls/base/odls_private.h

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
1313
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights
1414
* reserved.
15-
* Copyright (c) 2016 Intel, Inc. All rights reserved.
15+
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
1616
* $COPYRIGHT$
1717
*
1818
* Additional copyrights may follow
@@ -36,6 +36,7 @@
3636
#include "opal/class/opal_bitmap.h"
3737
#include "opal/dss/dss_types.h"
3838

39+
#include "orte/mca/iof/base/iof_base_setup.h"
3940
#include "orte/mca/rml/rml_types.h"
4041
#include "orte/runtime/orte_globals.h"
4142

@@ -56,11 +57,15 @@ typedef struct {
5657
opal_list_t xterm_ranks;
5758
/* the xterm cmd to be used */
5859
char **xtermcmd;
60+
/* thread pool */
61+
int num_threads;
62+
opal_event_base_t **ev_bases; // event base array for progress threads
63+
char** ev_threads; // event progress thread names
64+
int next_base; // counter to load-level thread use
5965
} orte_odls_globals_t;
6066

6167
ORTE_DECLSPEC extern orte_odls_globals_t orte_odls_globals;
6268

63-
6469
/*
6570
* Default functions that are common to most environments - can
6671
* be overridden by specific environments if they need something
@@ -74,11 +79,27 @@ ORTE_DECLSPEC int
7479
orte_odls_base_default_construct_child_list(opal_buffer_t *data,
7580
orte_jobid_t *job);
7681

82+
ORTE_DECLSPEC void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata);
83+
7784
/* define a function that will fork a local proc */
78-
typedef int (*orte_odls_base_fork_local_proc_fn_t)(orte_app_context_t *context,
79-
orte_proc_t *child,
85+
typedef int (*orte_odls_base_fork_local_proc_fn_t)(orte_proc_t *child,
86+
char *app, char **argv,
8087
char **environ_copy,
81-
orte_job_t *jdata);
88+
orte_job_t *jdata,
89+
orte_iof_base_io_conf_t opts);
90+
91+
/* define an object for fork/exec the local proc */
92+
typedef struct {
93+
opal_object_t super;
94+
opal_event_t ev;
95+
orte_job_t *jdata;
96+
orte_app_context_t *app;
97+
orte_proc_t *child;
98+
bool index_argv;
99+
orte_iof_base_io_conf_t opts;
100+
orte_odls_base_fork_local_proc_fn_t fork_local;
101+
} orte_odls_spawn_caddy_t;
102+
OBJ_CLASS_DECLARATION(orte_odls_spawn_caddy_t);
82103

83104
/* define an object for starting local launch */
84105
typedef struct {

orte/mca/odls/default/odls_default_module.c

Lines changed: 31 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* Copyright (c) 2010 IBM Corporation. All rights reserved.
1616
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
1717
* reserved.
18-
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved
18+
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
1919
*
2020
* $COPYRIGHT$
2121
*
@@ -144,8 +144,9 @@ static int orte_odls_default_restart_proc(orte_proc_t *child);
144144
static void send_error_show_help(int fd, int exit_status,
145145
const char *file, const char *topic, ...)
146146
__opal_attribute_noreturn__;
147-
static int do_child(orte_app_context_t* context,
148-
orte_proc_t *child,
147+
148+
static int do_child(orte_proc_t *child,
149+
char *cmd, char **argv,
149150
char **environ_copy,
150151
orte_job_t *jobdat, int write_fd,
151152
orte_iof_base_io_conf_t opts)
@@ -318,16 +319,15 @@ static int close_open_file_descriptors(int write_fd,
318319
return ORTE_SUCCESS;
319320
}
320321

321-
static int do_child(orte_app_context_t* context,
322-
orte_proc_t *child,
322+
static int do_child(orte_proc_t *child,
323+
char *app, char **argv,
323324
char **environ_copy,
324325
orte_job_t *jobdat, int write_fd,
325326
orte_iof_base_io_conf_t opts)
326327
{
327-
int i, rc;
328+
int i;
328329
sigset_t sigs;
329330
long fd, fdmax = sysconf(_SC_OPEN_MAX);
330-
char *param, *msg;
331331

332332
#if HAVE_SETPGID
333333
/* Set a new process group for this child, so that any
@@ -359,7 +359,7 @@ static int do_child(orte_app_context_t* context,
359359
send_error_show_help(write_fd, 1,
360360
"help-orte-odls-default.txt",
361361
"iof setup failed",
362-
orte_process_info.nodename, context->app);
362+
orte_process_info.nodename, app);
363363
/* Does not return */
364364
}
365365
}
@@ -384,18 +384,6 @@ static int do_child(orte_app_context_t* context,
384384
close(fdnull);
385385
}
386386

387-
/* if the user requested it, set the system resource limits */
388-
if (OPAL_SUCCESS != (rc = opal_util_init_sys_limits(&msg))) {
389-
send_error_show_help(write_fd, 1, "help-orte-odls-default.txt",
390-
"set limit",
391-
orte_process_info.nodename, context->app,
392-
__FILE__, __LINE__, msg);
393-
}
394-
/* ensure we only do this once */
395-
(void) mca_base_var_env_name("opal_set_max_sys_limits", &param);
396-
opal_unsetenv(param, &environ_copy);
397-
free(param);
398-
399387
/* close all open file descriptors w/ exception of stdin/stdout/stderr,
400388
the pipe used for the IOF INTERNAL messages, and the pipe up to
401389
the parent. */
@@ -408,10 +396,10 @@ static int do_child(orte_app_context_t* context,
408396
}
409397
}
410398

411-
if (context->argv == NULL) {
412-
context->argv = malloc(sizeof(char*)*2);
413-
context->argv[0] = strdup(context->app);
414-
context->argv[1] = NULL;
399+
if (argv == NULL) {
400+
argv = malloc(sizeof(char*)*2);
401+
argv[0] = strdup(app);
402+
argv[1] = NULL;
415403
}
416404

417405
/* Set signal handlers back to the default. Do this close to
@@ -436,16 +424,16 @@ static int do_child(orte_app_context_t* context,
436424

437425
/* Exec the new executable */
438426

439-
execve(context->app, context->argv, environ_copy);
427+
execve(app, argv, environ_copy);
440428
send_error_show_help(write_fd, 1,
441429
"help-orte-odls-default.txt", "execve error",
442-
orte_process_info.nodename, context->app, strerror(errno));
430+
orte_process_info.nodename, app, strerror(errno));
443431
/* Does not return */
444432
}
445433

446434

447-
static int do_parent(orte_app_context_t* context,
448-
orte_proc_t *child,
435+
static int do_parent(orte_proc_t *child,
436+
char *app, char **argv,
449437
char **environ_copy,
450438
orte_job_t *jobdat, int read_fd,
451439
orte_iof_base_io_conf_t opts)
@@ -454,19 +442,10 @@ static int do_parent(orte_app_context_t* context,
454442
orte_odls_pipe_err_msg_t msg;
455443
char file[ORTE_ODLS_MAX_FILE_LEN + 1], topic[ORTE_ODLS_MAX_TOPIC_LEN + 1], *str = NULL;
456444

457-
if (NULL != child && ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_FORWARD_OUTPUT)) {
458-
/* connect endpoints IOF */
459-
rc = orte_iof_base_setup_parent(&child->name, &opts);
460-
if (ORTE_SUCCESS != rc) {
461-
ORTE_ERROR_LOG(rc);
462-
close(read_fd);
463-
464-
if (NULL != child) {
465-
child->state = ORTE_PROC_STATE_UNDEF;
466-
}
467-
return rc;
468-
}
469-
}
445+
close(opts.p_stdin[0]);
446+
close(opts.p_stdout[1]);
447+
close(opts.p_stderr[1]);
448+
close(opts.p_internal[1]);
470449

471450
/* Block reading a message from the pipe */
472451
while (1) {
@@ -503,7 +482,7 @@ static int do_parent(orte_app_context_t* context,
503482
if (OPAL_SUCCESS != rc) {
504483
orte_show_help("help-orte-odls-default.txt", "syscall fail",
505484
true,
506-
orte_process_info.nodename, context->app,
485+
orte_process_info.nodename, app,
507486
"opal_fd_read", __FILE__, __LINE__);
508487
if (NULL != child) {
509488
child->state = ORTE_PROC_STATE_UNDEF;
@@ -517,7 +496,7 @@ static int do_parent(orte_app_context_t* context,
517496
if (OPAL_SUCCESS != rc) {
518497
orte_show_help("help-orte-odls-default.txt", "syscall fail",
519498
true,
520-
orte_process_info.nodename, context->app,
499+
orte_process_info.nodename, app,
521500
"opal_fd_read", __FILE__, __LINE__);
522501
if (NULL != child) {
523502
child->state = ORTE_PROC_STATE_UNDEF;
@@ -531,7 +510,7 @@ static int do_parent(orte_app_context_t* context,
531510
if (NULL == str) {
532511
orte_show_help("help-orte-odls-default.txt", "syscall fail",
533512
true,
534-
orte_process_info.nodename, context->app,
513+
orte_process_info.nodename, app,
535514
"opal_fd_read", __FILE__, __LINE__);
536515
if (NULL != child) {
537516
child->state = ORTE_PROC_STATE_UNDEF;
@@ -580,39 +559,16 @@ static int do_parent(orte_app_context_t* context,
580559
/**
581560
* Fork/exec the specified processes
582561
*/
583-
static int odls_default_fork_local_proc(orte_app_context_t* context,
584-
orte_proc_t *child,
562+
static int odls_default_fork_local_proc(orte_proc_t *child,
563+
char *app,
564+
char **argv,
585565
char **environ_copy,
586-
orte_job_t *jobdat)
566+
orte_job_t *jobdat,
567+
orte_iof_base_io_conf_t opts)
587568
{
588-
orte_iof_base_io_conf_t opts = {0};
589-
int rc, p[2];
569+
int p[2];
590570
pid_t pid;
591571

592-
if (NULL != child) {
593-
/* should pull this information from MPIRUN instead of going with
594-
default */
595-
opts.usepty = OPAL_ENABLE_PTY_SUPPORT;
596-
597-
/* do we want to setup stdin? */
598-
if (NULL != child &&
599-
(jobdat->stdin_target == ORTE_VPID_WILDCARD ||
600-
child->name.vpid == jobdat->stdin_target)) {
601-
opts.connect_stdin = true;
602-
} else {
603-
opts.connect_stdin = false;
604-
}
605-
606-
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_prefork(&opts))) {
607-
ORTE_ERROR_LOG(rc);
608-
if (NULL != child) {
609-
child->state = ORTE_PROC_STATE_FAILED_TO_START;
610-
child->exit_code = rc;
611-
}
612-
return rc;
613-
}
614-
}
615-
616572
/* A pipe is used to communicate between the parent and child to
617573
indicate whether the exec ultimately succeeded or failed. The
618574
child sets the pipe to be close-on-exec; the child only ever
@@ -647,12 +603,12 @@ static int odls_default_fork_local_proc(orte_app_context_t* context,
647603

648604
if (pid == 0) {
649605
close(p[0]);
650-
do_child(context, child, environ_copy, jobdat, p[1], opts);
606+
do_child(child, app, argv, environ_copy, jobdat, p[1], opts);
651607
/* Does not return */
652608
}
653609

654610
close(p[1]);
655-
return do_parent(context, child, environ_copy, jobdat, p[0], opts);
611+
return do_parent(child, app, argv, environ_copy, jobdat, p[0], opts);
656612
}
657613

658614

orte/mca/schizo/base/base.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ ORTE_DECLSPEC int orte_schizo_base_setup_fork(orte_job_t *jdata,
7373
orte_app_context_t *context);
7474
ORTE_DECLSPEC int orte_schizo_base_setup_child(orte_job_t *jobdat,
7575
orte_proc_t *child,
76-
orte_app_context_t *app);
76+
orte_app_context_t *app,
77+
char ***env);
7778
ORTE_DECLSPEC orte_schizo_launch_environ_t orte_schizo_base_check_launch_environment(void);
7879
ORTE_DECLSPEC long orte_schizo_base_get_remaining_time(void);
7980
ORTE_DECLSPEC void orte_schizo_base_finalize(void);

0 commit comments

Comments
 (0)