diff --git a/Makefile b/Makefile index 3c96dbf4..3f95f883 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,8 @@ test: tests/cli/test_cli_exit_codes.sh &&\ tests/cli/test_cli_logs.sh &&\ tests/cli/test_custom_formatters.sh &&\ - tests/cli/test_flow.sh + tests/cli/test_flow.sh &&\ + tests/cli/test_exec.sh version: @echo $(VERSION) diff --git a/README.md b/README.md index 8e6e7ce7..112d6d18 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,27 @@ my-flow: You can run the pipeline using `dpp run my-flow`. +### Executing a subprocess + +You can integrate system executables or shell scripts into the pipeline using `exec`. +This can be used for example to set permissions / run external binaries / install system dependencies. +The execution output is logged in real-time to standard logging at `info` logging level. + +Exec commands must be placed before the pipeline steps + +``` +process-data: + pipeline: + - exec: [sh, set_permissions.sh] + - exec: [ls, -lah] + - exec: | + curl http://example.com/data.zip > my-data.zip + unzip my-data.zip + - flow: .. + - run: .. + .. +``` + ## The Standard Processor Library A few built in processors are provided with the library. diff --git a/datapackage_pipelines/lib/exec.py b/datapackage_pipelines/lib/exec.py new file mode 100644 index 00000000..821e7852 --- /dev/null +++ b/datapackage_pipelines/lib/exec.py @@ -0,0 +1,16 @@ +from datapackage_pipelines.wrapper import ingest +import subprocess +import logging +import os + + +with ingest() as ctx: + parameters, datapackage, resources = ctx + assert len(datapackage['resources']) == 0, 'exec processor does not support input data' + cmd = parameters.pop('__exec') + os.environ['__EXEC_PROCESSOR_PATH'] = parameters.pop('__path') + with subprocess.Popen(cmd, shell=isinstance(cmd, str), stdout=subprocess.PIPE) as p: + for line in p.stdout: + logging.info(line.decode()) + p.wait() + assert p.returncode == 0, f'exec failed, returncode = {p.returncode}' diff --git a/datapackage_pipelines/specs/resolver.py b/datapackage_pipelines/specs/resolver.py index 3de96798..8503dcef 100644 --- a/datapackage_pipelines/specs/resolver.py +++ b/datapackage_pipelines/specs/resolver.py @@ -81,6 +81,10 @@ def resolve_executor(step, path, errors): step['run'] = 'flow' step.setdefault('parameters', {}).update(__flow=step.pop('flow'), __path=path) + elif 'exec' in step: + step['run'] = 'exec' + step.setdefault('parameters', {}).update(__exec=step.pop('exec'), + __path=path) executor = step['run'] back_up, parts = convert_dot_notation(executor) diff --git a/datapackage_pipelines/specs/schemas/pipeline-spec.schema.json b/datapackage_pipelines/specs/schemas/pipeline-spec.schema.json index 83b263bb..2768226b 100644 --- a/datapackage_pipelines/specs/schemas/pipeline-spec.schema.json +++ b/datapackage_pipelines/specs/schemas/pipeline-spec.schema.json @@ -34,6 +34,11 @@ "required": [ "flow" ] + }, + { + "required": [ + "exec" + ] } ], "properties": { diff --git a/tests/cli/pipeline-spec.yaml b/tests/cli/pipeline-spec.yaml index 26da3def..ed38dd0e 100644 --- a/tests/cli/pipeline-spec.yaml +++ b/tests/cli/pipeline-spec.yaml @@ -93,3 +93,19 @@ dataflows: - run: dump_to_path parameters: out-path: test_flow_data + +exec: + pipeline: + - exec: [ls, -lah] + - exec: [bash, -c, 'echo __EXEC_PROCESSOR_PATH = $__EXEC_PROCESSOR_PATH'] + - run: load_resource + parameters: + url: ../data/datapackage.json + resource: my-spiffy-resource + +exec_shell: + pipeline: + - exec: | + TEMPFILE=`mktemp` + echo test > $TEMPFILE + echo $TEMPFILE > test_exec_shell_tempfile diff --git a/tests/cli/test_exec.sh b/tests/cli/test_exec.sh new file mode 100755 index 00000000..61020dd3 --- /dev/null +++ b/tests/cli/test_exec.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +cd tests/cli + +TEMPFILE=`mktemp` + +set -o pipefail +! dpp run --verbose ./exec >/dev/stdout 2>&1 | tee $TEMPFILE && echo failed to run exec pipeline && exit 1 +set +o pipefail + +! cat "${TEMPFILE}" | grep "test_exec.sh" && echo exec output is missing && exit 1 +! cat "${TEMPFILE}" | grep "__EXEC_PROCESSOR_PATH" && echo exec output is missing && exit 1 +rm $TEMPFILE + +TEMPFILE=`mktemp` + +rm -f test_exec_shell_tempfile + +set -o pipefail +! dpp run --verbose ./exec_shell >/dev/stdout 2>&1 | tee $TEMPFILE && echo failed to run exec_shell pipeline && exit 1 +set +o pipefail + +! [ "$(cat $(cat test_exec_shell_tempfile))" == "test" ] \ + && echo unexecpted data && exit 1 + +rm $(cat test_exec_shell_tempfile) +rm test_exec_shell_tempfile + +echo Great Success +exit 0