Skip to content
This repository was archived by the owner on Mar 20, 2023. It is now read-only.

Commit 8b7b17f

Browse files
committed
Fix Task Runner regressions
- Input/output data phases not correctly triggered for multi-instance and MPI jobs - Output data was not triggered at all - Pre-exec triggering on native - Resolves #301
1 parent 07e86a3 commit 8b7b17f

File tree

5 files changed

+186
-102
lines changed

5 files changed

+186
-102
lines changed

convoy/batch.py

Lines changed: 107 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -4553,10 +4553,17 @@ def _construct_task(
45534553
env_vars = util.merge_dict(env_vars, gpu_env)
45544554
del gpu_env
45554555
taskenv = []
4556+
commands = {
4557+
'mpi': None,
4558+
'docker_exec': False,
4559+
'preexec': None,
4560+
'task': None,
4561+
'login': None,
4562+
'input': None,
4563+
'output': None,
4564+
}
45564565
# check if this is a multi-instance task
45574566
mis = None
4558-
mpi_command = None
4559-
mpi_docker_exec_command = None
45604567
if settings.is_multi_instance_task(_task):
45614568
if util.is_not_empty(task.multi_instance.coordination_command):
45624569
if native:
@@ -4598,18 +4605,27 @@ def _construct_task(
45984605
file_mode=rf.file_mode,
45994606
)
46004607
)
4608+
# set pre-exec command
4609+
if util.is_not_empty(task.multi_instance.pre_execution_command):
4610+
commands['preexec'] = [
4611+
task.multi_instance.pre_execution_command]
46014612
# set application command
46024613
if native:
46034614
if task.multi_instance.mpi is None:
4604-
task_commands = [task.command]
4615+
commands['task'] = [task.command]
46054616
else:
4606-
mpi_command, ib_env = _construct_mpi_command(pool, task)
4617+
commands['mpi'], ib_env = _construct_mpi_command(pool, task)
46074618
if util.is_not_empty(ib_env):
46084619
env_vars = util.merge_dict(env_vars, ib_env)
46094620
del ib_env
4610-
task_commands = [mpi_command]
4621+
commands['task'] = [commands['mpi']]
4622+
# insert preexec prior to task command for native
4623+
if util.is_not_empty(commands['preexec']):
4624+
commands['task'].insert(0, commands['preexec'][0])
46114625
else:
4612-
task_commands = []
4626+
commands['task'] = []
4627+
# for non-native do not set the RUNTIME so the user command is
4628+
# executed as-is
46134629
taskenv.append(
46144630
batchmodels.EnvironmentSetting(
46154631
name='SHIPYARD_ENV_EXCLUDE',
@@ -4622,62 +4638,47 @@ def _construct_task(
46224638
value=task.envfile,
46234639
)
46244640
)
4641+
taskenv.append(
4642+
batchmodels.EnvironmentSetting(
4643+
name='SHIPYARD_RUNTIME_CMD_OPTS',
4644+
value=(
4645+
' '.join(task.run_options) if is_singularity
4646+
else ' '.join(task.docker_exec_options)
4647+
),
4648+
)
4649+
)
4650+
taskenv.append(
4651+
batchmodels.EnvironmentSetting(
4652+
name='SHIPYARD_RUNTIME_CMD',
4653+
value=(
4654+
task.singularity_cmd if is_singularity else
4655+
'exec'
4656+
),
4657+
)
4658+
)
4659+
taskenv.append(
4660+
batchmodels.EnvironmentSetting(
4661+
name='SHIPYARD_CONTAINER_IMAGE_NAME',
4662+
value=(
4663+
task.singularity_image if is_singularity else
4664+
task.name # docker exec requires task name
4665+
),
4666+
)
4667+
)
4668+
if not is_singularity:
4669+
commands['docker_exec'] = True
46254670
if task.multi_instance.mpi is not None:
4626-
mpi_command, ib_env = _construct_mpi_command(pool, task)
4671+
commands['mpi'], ib_env = _construct_mpi_command(pool, task)
46274672
if util.is_not_empty(ib_env):
46284673
env_vars = util.merge_dict(env_vars, ib_env)
46294674
del ib_env
4630-
if not is_singularity:
4631-
mpi_docker_exec_command = (
4632-
'docker exec {} {} $AZ_BATCH_NODE_STARTUP_DIR/wd/'
4633-
'shipyard_task_runner.sh'.format(
4634-
' '.join(task.docker_exec_options),
4635-
task.name
4636-
)
4637-
)
4638-
else:
4639-
# if it's a multi-instance task, but not an mpi task,
4640-
# populate environment settings
4641-
taskenv.append(
4642-
batchmodels.EnvironmentSetting(
4643-
name='SHIPYARD_RUNTIME_CMD_OPTS',
4644-
value=(
4645-
' '.join(task.run_options) if is_singularity
4646-
else ' '.join(task.docker_exec_options)
4647-
),
4648-
)
4649-
)
4650-
taskenv.append(
4651-
batchmodels.EnvironmentSetting(
4652-
name='SHIPYARD_RUNTIME',
4653-
value='singularity' if is_singularity else 'docker',
4654-
)
4655-
)
4656-
taskenv.append(
4657-
batchmodels.EnvironmentSetting(
4658-
name='SHIPYARD_RUNTIME_CMD',
4659-
value=(
4660-
task.singularity_cmd if is_singularity else
4661-
'exec'
4662-
),
4663-
)
4664-
)
4665-
taskenv.append(
4666-
batchmodels.EnvironmentSetting(
4667-
name='SHIPYARD_CONTAINER_IMAGE_NAME',
4668-
value=(
4669-
task.singularity_image if is_singularity else
4670-
task.name # docker exec requires task name
4671-
),
4672-
)
4673-
)
46744675
else:
46754676
if native:
4676-
task_commands = [
4677+
commands['task'] = [
46774678
'{}'.format(' ' + task.command) if task.command else ''
46784679
]
46794680
else:
4680-
task_commands = []
4681+
commands['task'] = []
46814682
taskenv.append(
46824683
batchmodels.EnvironmentSetting(
46834684
name='SHIPYARD_ENV_EXCLUDE',
@@ -4722,70 +4723,75 @@ def _construct_task(
47224723
if (not native and allow_run_on_missing and
47234724
(len(docker_missing_images) > 0 or
47244725
len(singularity_missing_images) > 0)):
4725-
loginenv, logincmd = generate_docker_login_settings(config)
4726-
logincmd.extend(task_commands)
4726+
loginenv, commands['login'] = generate_docker_login_settings(config)
47274727
taskenv.extend(loginenv)
4728-
task_commands = logincmd
47294728
# digest any input_data
4730-
addlcmds = data.process_input_data(config, bxfile, _task, on_task=True)
4731-
if addlcmds is not None:
4729+
commands['input'] = data.process_input_data(
4730+
config, bxfile, _task, on_task=True)
4731+
if native and commands['input'] is not None:
4732+
raise RuntimeError(
4733+
'input_data at task-level is not supported on '
4734+
'native container pools')
4735+
# digest any output data
4736+
commands['output'] = data.process_output_data(config, bxfile, _task)
4737+
if commands['output'] is not None:
47324738
if native:
4733-
raise RuntimeError(
4734-
'input_data at task-level is not supported on '
4735-
'native container pools')
4736-
task_commands.insert(0, addlcmds)
4739+
output_files = commands['output']
4740+
commands['output'] = None
4741+
else:
4742+
commands['output'] = [commands['output']]
4743+
# populate task runner vars for non-native mode
47374744
if not native:
4738-
if util.is_not_empty(task_commands):
4745+
# set the correct runner script
4746+
if commands['docker_exec']:
4747+
commands['task'] = [
4748+
'$AZ_BATCH_NODE_STARTUP_DIR/wd/'
4749+
'shipyard_docker_exec_task_runner.sh'
4750+
]
4751+
else:
4752+
commands['task'] = [
4753+
'$AZ_BATCH_NODE_STARTUP_DIR/wd/shipyard_task_runner.sh'
4754+
]
4755+
# set system prologue command
4756+
sys_prologue_cmd = []
4757+
if util.is_not_empty(commands['login']):
4758+
sys_prologue_cmd.extend(commands['login'])
4759+
if util.is_not_empty(commands['input']):
4760+
sys_prologue_cmd.append(commands['input'])
4761+
if util.is_not_empty(sys_prologue_cmd):
47394762
taskenv.append(
47404763
batchmodels.EnvironmentSetting(
47414764
name='SHIPYARD_SYSTEM_PROLOGUE_CMD',
47424765
value=util.wrap_commands_in_shell(
4743-
task_commands, windows=is_windows),
4766+
sys_prologue_cmd, windows=is_windows),
47444767
)
47454768
)
4746-
task_commands = []
4747-
# execute multi instance pre-exec cmd
4748-
if (util.is_not_empty(task.multi_instance.pre_execution_command)):
4749-
task_commands.insert(0, task.multi_instance.pre_execution_command)
4750-
if not native:
4751-
if util.is_not_empty(task_commands):
4769+
del sys_prologue_cmd
4770+
# set user prologue command
4771+
if util.is_not_empty(commands['preexec']):
47524772
taskenv.append(
47534773
batchmodels.EnvironmentSetting(
47544774
name='SHIPYARD_USER_PROLOGUE_CMD',
4755-
value=util.wrap_commands(
4756-
task_commands, windows=is_windows),
4757-
)
4758-
)
4759-
task_commands = []
4760-
# digest any output data
4761-
addlcmds = data.process_output_data(config, bxfile, _task)
4762-
if addlcmds is not None:
4763-
if native:
4764-
output_files = addlcmds
4765-
else:
4766-
task_commands.append(addlcmds)
4767-
del addlcmds
4768-
if not native:
4769-
if util.is_not_empty(task_commands):
4770-
taskenv.append(
4771-
batchmodels.EnvironmentSetting(
4772-
name='SHIPYARD_SYSTEM_EPILOGUE_CMD',
47734775
value=util.wrap_commands_in_shell(
4774-
task_commands, windows=is_windows),
4776+
commands['preexec'], windows=is_windows),
47754777
)
47764778
)
4779+
# set user command (task)
47774780
taskenv.append(
47784781
batchmodels.EnvironmentSetting(
47794782
name='SHIPYARD_USER_CMD',
4780-
value=mpi_command or task.command,
4783+
value=commands['mpi'] or task.command,
47814784
)
47824785
)
4783-
if mpi_docker_exec_command is not None:
4784-
task_commands = [mpi_docker_exec_command]
4785-
else:
4786-
task_commands = [
4787-
'$AZ_BATCH_NODE_STARTUP_DIR/wd/shipyard_task_runner.sh'
4788-
]
4786+
# set epilogue command
4787+
if util.is_not_empty(commands['output']):
4788+
taskenv.append(
4789+
batchmodels.EnvironmentSetting(
4790+
name='SHIPYARD_SYSTEM_EPILOGUE_CMD',
4791+
value=util.wrap_commands_in_shell(
4792+
commands['output'], windows=is_windows),
4793+
)
4794+
)
47894795
# always add env vars in (host) task to be dumped into container
47904796
# task (if non-native)
47914797
if util.is_not_empty(env_vars):
@@ -4809,15 +4815,16 @@ def _construct_task(
48094815
)
48104816
)
48114817
# create task
4812-
if util.is_not_empty(task_commands):
4818+
if util.is_not_empty(commands['task']):
48134819
if native:
48144820
if is_windows:
4815-
tc = ' && '.join(task_commands)
4821+
tc = ' && '.join(commands['task'])
48164822
else:
4817-
tc = '; '.join(task_commands)
4823+
tc = '; '.join(commands['task'])
48184824
tc = tc.strip()
48194825
else:
4820-
tc = util.wrap_commands_in_shell(task_commands, windows=is_windows)
4826+
tc = util.wrap_commands_in_shell(
4827+
commands['task'], windows=is_windows)
48214828
else:
48224829
tc = ''
48234830
batchtask = batchmodels.TaskAddParameter(

convoy/fleet.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@
214214
'shipyard_task_runner.sh',
215215
pathlib.Path(_ROOT_PATH, 'scripts/shipyard_task_runner.sh')
216216
)
217+
_DOCKER_EXEC_TASK_RUNNER_FILE = (
218+
'shipyard_docker_exec_task_runner.sh',
219+
pathlib.Path(_ROOT_PATH, 'scripts/shipyard_docker_exec_task_runner.sh')
220+
)
217221
_GLUSTERPREP_FILE = (
218222
'shipyard_glusterfs_on_compute.sh',
219223
pathlib.Path(_ROOT_PATH, 'scripts/shipyard_glusterfs_on_compute.sh')
@@ -1381,6 +1385,7 @@ def _construct_pool_object(
13811385
_rflist.append(_NODEPREP_FILE)
13821386
if not native:
13831387
_rflist.append(_TASK_RUNNER_FILE)
1388+
_rflist.append(_DOCKER_EXEC_TASK_RUNNER_FILE)
13841389
# create start task commandline
13851390
start_task.append(
13861391
('{npf}{a}{b}{c}{d}{e}{f}{g}{i}{j}{k}{lis}{m}{n}{o}{p}{q}{r}{s}'
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
set -o pipefail
5+
6+
# environment variables used
7+
# SHIPYARD_SYSTEM_PROLOGUE_CMD: pre-exec system cmd
8+
# SHIPYARD_USER_PROLOGUE_CMD: pre-exec user cmd
9+
# SHIPYARD_SYSTEM_EPILOGUE_CMD: post-exec system cmd
10+
# SHIPYARD_ENV_EXCLUDE: environment vars to exclude
11+
# SHIPYARD_ENV_FILE: env file
12+
# SHIPYARD_RUNTIME: docker or singularity
13+
# SHIPYARD_RUNTIME_CMD: run or exec
14+
# SHIPYARD_RUNTIME_CMD_OPTS: options
15+
# SHIPYARD_CONTAINER_IMAGE_NAME: container name
16+
# SHIPYARD_USER_CMD: user command
17+
18+
## Load environment modules, if available
19+
if [ -f /etc/profile.d/modules.sh ]; then
20+
# shellcheck disable=SC1091
21+
source /etc/profile.d/modules.sh
22+
fi
23+
24+
## PRE-EXEC
25+
if [ -n "$SHIPYARD_SYSTEM_PROLOGUE_CMD" ]; then
26+
eval "$SHIPYARD_SYSTEM_PROLOGUE_CMD"
27+
fi
28+
29+
## TASK EXEC
30+
if [ -n "$SHIPYARD_ENV_EXCLUDE" ]; then
31+
env | grep -vE "$SHIPYARD_ENV_EXCLUDE" > "$SHIPYARD_ENV_FILE"
32+
else
33+
env > "$SHIPYARD_ENV_FILE"
34+
fi
35+
36+
SHIPYARD_RUNTIME_CMD_OPTS=$(eval echo "${SHIPYARD_RUNTIME_CMD_OPTS}")
37+
38+
set +e
39+
40+
# shellcheck disable=SC2086
41+
docker exec -e SHIPYARD_SYSTEM_PROLOGUE_CMD= -e SHIPYARD_SYSTEM_EPILOGUE_CMD= \
42+
$SHIPYARD_RUNTIME_CMD_OPTS $SHIPYARD_CONTAINER_IMAGE_NAME \
43+
$AZ_BATCH_NODE_STARTUP_DIR/wd/shipyard_task_runner.sh
44+
SHIPYARD_TASK_EC=$?
45+
46+
## POST EXEC
47+
if [ -n "$SHIPYARD_SYSTEM_EPILOGUE_CMD" ]; then
48+
if [ "$SHIPYARD_TASK_EC" -eq 0 ]; then
49+
export SHIPYARD_TASK_RESULT=success
50+
else
51+
export SHIPYARD_TASK_RESULT=fail
52+
fi
53+
eval "$SHIPYARD_SYSTEM_EPILOGUE_CMD"
54+
fi
55+
56+
exit $SHIPYARD_TASK_EC

scripts/shipyard_nodeprep.sh

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,17 @@ get_vm_size_from_imds() {
329329
log INFO "VmSize=$vm_size RDMA=$vm_rdma_type"
330330
}
331331

332+
set_default_pool_user_privileges() {
333+
if [ -n "$docker_group" ]; then
334+
# Modify the default pool user to have sudo privileges, which in turn
335+
# will give access to the Docker socket. This is required to allow
336+
# input/output phases via Docker containers as the default pool user.
337+
usermod -aG _azbatchsudogrp _azbatch
338+
else
339+
usermod -aG docker _azbatch
340+
fi
341+
}
342+
332343
download_file_as() {
333344
log INFO "Downloading: $1 as $2"
334345
local retries=10
@@ -1780,6 +1791,11 @@ if [ $custom_image -eq 0 ] && [ $native_mode -eq 0 ]; then
17801791
install_docker_host_engine
17811792
fi
17821793

1794+
# set default pool user privileges
1795+
if [ $native_mode -eq 0 ]; then
1796+
set_default_pool_user_privileges
1797+
fi
1798+
17831799
# login to registry servers (do not specify -e as creds have been decrypted)
17841800
./registry_login.sh
17851801

0 commit comments

Comments
 (0)