Skip to content

Commit de588bd

Browse files
add shell plugin (#1)
* add shell plugin
1 parent 6685313 commit de588bd

File tree

10 files changed

+2020
-2
lines changed

10 files changed

+2020
-2
lines changed

Makefile

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
SHELL := /bin/bash
2+
3+
.PHONY: setup
4+
setup: venv lint test
5+
6+
.PHONY: clean-pyc
7+
clean-pyc:
8+
find . -name '*.pyc' -exec rm -rf {} +
9+
find . -name '*.pyo' -exec rm -rf {} +
10+
find . -name '*~' -exec rm -rf {} +
11+
find . -name '__pycache__' -exec rm -rf {} +
12+
13+
.PHONY: clean-test
14+
clean-test:
15+
rm -rf .pytest_cache
16+
rm -rf .mypy_cache
17+
rm -rf .coverage
18+
rm -rf .reports
19+
rm -rf htmlcov/
20+
rm -rf .pytest_cache
21+
22+
.PHONY: clean
23+
clean: clean-pyc clean-test
24+
25+
venv: poetry.lock
26+
poetry install
27+
28+
.PHONY: format
29+
format: venv
30+
poetry run black nodestream_plugin_shell tests
31+
poetry run isort nodestream_plugin_shell tests
32+
33+
.PHONY: lint
34+
lint: venv
35+
poetry run black nodestream_plugin_shell tests --check
36+
poetry run ruff nodestream_plugin_shell tests
37+
38+
.PHONY: test
39+
test: venv
40+
poetry run pytest

README.md

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,61 @@
1-
# nodestream-plugin-shell
2-
Nodestream Plugin Shell
1+
# Nodestream Shell Plugin
2+
3+
This plugin provides a [Nodestream](https://github.com/nodestream-proj/nodestream) shell extractor.
4+
5+
**NOTE: This plugin is currently in development and is not yet ready for production use.**
6+
7+
## Installation
8+
9+
```bash
10+
pip install nodestream-plugin-shell
11+
```
12+
13+
## For commands that return output in json format
14+
```yaml
15+
# pipeline.yaml
16+
- implementation: nodestream_plugin_shell:Shell
17+
arguments:
18+
command: curl
19+
arguments:
20+
- https://example.com/api/data/json
21+
options:
22+
X: GET
23+
header: "Accept: application/json"
24+
ignore_stdout: false #Provide json output from command to next step in pipeline.
25+
```
26+
27+
This would be equivalent to running:
28+
```
29+
curl https://example.com/api/data.json -X GET --header "Accept: application/json"
30+
```
31+
32+
## For commands that create files that you would like to read into the pipeline
33+
34+
#### Terminal command
35+
36+
37+
```yaml
38+
# pipeline.yaml
39+
- implementation: nodestream_plugin_shell:Shell
40+
arguments:
41+
command: curl
42+
arguments:
43+
- https://example.com/api/data.json
44+
options:
45+
X: GET
46+
header: "Accept: application/json"
47+
flags:
48+
- O
49+
ignore_stdout: true #Do not provide output from command to next step in pipeline.
50+
51+
- implementation: nodestream.pipeline.extractors:FileExtractor
52+
arguments:
53+
globs:
54+
- data.json
55+
```
56+
57+
This would be equivalent to running:
58+
```
59+
curl https://example.com/api/data.json -X GET --header "Accept: application/json" -O
60+
cat data.json
61+
```
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .shell import Shell
2+
3+
__all__ = ("Shell",)

nodestream_plugin_shell/shell.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import json
2+
import subprocess
3+
from io import StringIO
4+
from logging import getLogger
5+
from typing import Any, AsyncGenerator, Dict, Optional, Set
6+
7+
from nodestream.pipeline import Extractor
8+
9+
10+
class Shell(Extractor):
11+
"""Run shell command and provide output to pipeline"""
12+
13+
def __init__(
14+
self,
15+
command: str,
16+
arguments: Optional[Set[str]] = None,
17+
options: Optional[Dict[str, str]] = None,
18+
flags: Optional[Set[str]] = None,
19+
ignore_stdout: Optional[bool] = False,
20+
):
21+
"""Initializes the instance of the extractor.
22+
Expects stdout in json format.
23+
24+
Args:
25+
command (str): The command for the execution call.
26+
arguments ([str]): The arguments used in the command.
27+
options (dict): The Options used in the command.
28+
flags ([str]): Flags to use in the command.
29+
ignore_stdout (bool): Don't pass stdout to pipeline.
30+
"""
31+
self.command = command
32+
self.arguments = arguments if arguments is not None else set()
33+
self.options = options if options is not None else dict()
34+
self.flags = flags if flags is not None else set()
35+
self.ignore_stdout = ignore_stdout
36+
self.logger = getLogger(self.__class__.__name__)
37+
38+
def read_from_file(self) -> StringIO:
39+
with open(self.output_file, "r") as file:
40+
return StringIO(file.read().strip())
41+
42+
def build_command(self):
43+
cmd = [self.command]
44+
cmd.extend(self.arguments)
45+
for k, v in self.options.items():
46+
if len(k) == 1:
47+
cmd.append(f"-{k} {v}")
48+
else:
49+
cmd.append(f"--{k}={v}")
50+
for flag in self.flags:
51+
flag = f"-{flag}" if len(flag) == 1 else f"--{flag}"
52+
cmd.append(flag)
53+
return cmd
54+
55+
def run_command(self, cmd):
56+
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
57+
stdout, stderr = process.communicate()
58+
if process.returncode != 0:
59+
raise SystemError(
60+
f"Error running shell command: {' '.join(cmd)}, error: {stderr.decode()}, return code: {process.returncode}"
61+
)
62+
return stdout
63+
64+
async def extract_records(self) -> AsyncGenerator[Any, Any]:
65+
cmd = self.build_command()
66+
stdout = self.run_command(cmd)
67+
output = stdout.decode().strip()
68+
if self.ignore_stdout:
69+
yield {}
70+
else:
71+
try:
72+
results = json.loads(output)
73+
for item in results:
74+
yield item
75+
except json.JSONDecodeError:
76+
self.logger.warn(
77+
f"Stdout not in json format, step yielding no data, set ignore_stdout to True to ignore this warning. Stdout: {output}"
78+
)
79+
yield {}

0 commit comments

Comments
 (0)