Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit 2d4ff40

Browse files
John Andersenpdxjohnny
authored andcommitted
test: integration: dataflow: Diagram and Merge
Signed-off-by: John Andersen <[email protected]>
1 parent f27dc41 commit 2d4ff40

File tree

5 files changed

+166
-138
lines changed

5 files changed

+166
-138
lines changed

.ci/run.sh

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,16 @@ function run_plugin() {
5858
"${PYTHON}" -m dffml service dev install
5959
./scripts/docs.sh
6060

61+
# Log skipped tests to file
62+
check_skips="$(mktemp)"
63+
TEMP_DIRS+=("${check_skips}")
64+
6165
# Run with coverage
62-
"${PYTHON}" -m coverage run setup.py test
66+
"${PYTHON}" -m coverage run setup.py test 2>&1 | tee "${check_skips}"
67+
"${PYTHON}" -m coverage report -m
68+
69+
# Fail if any tests were skipped
70+
grep -v -q -E '(skipped=.*)' "${check_skips}"
6371
fi
6472
}
6573

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2727
- MySQL packaging issue.
2828
### Removed
2929
- CLI command `operations` removed in favor of `dataflow run`
30+
- Duplicate dataflow diagram code from development service
3031

3132
## [0.3.0] - 2019-10-26
3233
### Added

dffml/service/dev.py

Lines changed: 0 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -247,141 +247,6 @@ class Entrypoints(CMD):
247247
_list = ListEntrypoints
248248

249249

250-
class Diagram(CMD):
251-
252-
arg_stages = Arg(
253-
"-stages",
254-
help="Which stages to display: (processing, cleanup, output)",
255-
nargs="+",
256-
default=[],
257-
required=False,
258-
)
259-
arg_simple = Arg(
260-
"-simple",
261-
help="Don't display input and output names",
262-
default=False,
263-
action="store_true",
264-
required=False,
265-
)
266-
arg_display = Arg(
267-
"-display",
268-
help="How to display (TD: top down, LR, RL, BT)",
269-
default="TD",
270-
required=False,
271-
)
272-
arg_dataflow = Arg("dataflow", help="File containing exported DataFlow")
273-
arg_config = Arg(
274-
"-config",
275-
help="ConfigLoader to use for importing",
276-
type=BaseConfigLoader.load,
277-
default=None,
278-
)
279-
280-
async def run(self):
281-
dataflow_path = Path(self.dataflow)
282-
config_cls = self.config
283-
if config_cls is None:
284-
config_type = dataflow_path.suffix.replace(".", "")
285-
config_cls = BaseConfigLoader.load(config_type)
286-
async with config_cls.withconfig(self.extra_config) as configloader:
287-
async with configloader() as loader:
288-
exported = await loader.loadb(dataflow_path.read_bytes())
289-
dataflow = DataFlow._fromdict(**exported)
290-
print(f"graph {self.display}")
291-
for stage in Stage:
292-
# Skip stage if not wanted
293-
if self.stages and stage.value not in self.stages:
294-
continue
295-
stage_node = hashlib.md5(
296-
("stage." + stage.value).encode()
297-
).hexdigest()
298-
if len(self.stages) != 1:
299-
print(f"subgraph {stage_node}[{stage.value.title()} Stage]")
300-
print(f"style {stage_node} fill:#afd388b5,stroke:#a4ca7a")
301-
for instance_name, operation in dataflow.operations.items():
302-
if operation.stage != stage:
303-
continue
304-
subgraph_node = hashlib.md5(
305-
("subgraph." + instance_name).encode()
306-
).hexdigest()
307-
node = hashlib.md5(instance_name.encode()).hexdigest()
308-
if not self.simple:
309-
print(f"subgraph {subgraph_node}[{instance_name}]")
310-
print(f"style {subgraph_node} fill:#fff4de,stroke:#cece71")
311-
print(f"{node}[{operation.instance_name}]")
312-
for input_name in operation.inputs.keys():
313-
input_node = hashlib.md5(
314-
("input." + instance_name + "." + input_name).encode()
315-
).hexdigest()
316-
if not self.simple:
317-
print(f"{input_node}({input_name})")
318-
print(f"{input_node} --> {node}")
319-
for output_name in operation.outputs.keys():
320-
output_node = hashlib.md5(
321-
(
322-
"output." + instance_name + "." + output_name
323-
).encode()
324-
).hexdigest()
325-
if not self.simple:
326-
print(f"{output_node}({output_name})")
327-
print(f"{node} --> {output_node}")
328-
if not self.simple:
329-
print(f"end")
330-
if len(self.stages) != 1:
331-
print(f"end")
332-
if len(self.stages) != 1:
333-
print(f"subgraph inputs[Inputs]")
334-
print(f"style inputs fill:#f6dbf9,stroke:#a178ca")
335-
for instance_name, input_flow in dataflow.flow.items():
336-
operation = dataflow.operations[instance_name]
337-
if self.stages and not operation.stage.value in self.stages:
338-
continue
339-
node = hashlib.md5(instance_name.encode()).hexdigest()
340-
for input_name, sources in input_flow.inputs.items():
341-
for source in sources:
342-
# TODO Put various sources in their own "Inputs" subgraphs
343-
if isinstance(source, str):
344-
input_definition = operation.inputs[input_name]
345-
seed_input_node = hashlib.md5(
346-
(source + "." + input_definition.name).encode()
347-
).hexdigest()
348-
print(f"{seed_input_node}({input_definition.name})")
349-
if len(self.stages) == 1:
350-
print(
351-
f"style {seed_input_node} fill:#f6dbf9,stroke:#a178ca"
352-
)
353-
if not self.simple:
354-
input_node = hashlib.md5(
355-
(
356-
"input." + instance_name + "." + input_name
357-
).encode()
358-
).hexdigest()
359-
print(f"{seed_input_node} --> {input_node}")
360-
else:
361-
print(f"{seed_input_node} --> {node}")
362-
else:
363-
if not self.simple:
364-
source_output_node = hashlib.md5(
365-
(
366-
"output."
367-
+ ".".join(list(source.items())[0])
368-
).encode()
369-
).hexdigest()
370-
input_node = hashlib.md5(
371-
(
372-
"input." + instance_name + "." + input_name
373-
).encode()
374-
).hexdigest()
375-
print(f"{source_output_node} --> {input_node}")
376-
else:
377-
source_operation_node = hashlib.md5(
378-
list(source.keys())[0].encode()
379-
).hexdigest()
380-
print(f"{source_operation_node} --> {node}")
381-
if len(self.stages) != 1:
382-
print(f"end")
383-
384-
385250
class Export(CMD):
386251

387252
arg_config = Arg(
@@ -470,7 +335,6 @@ class Develop(CMD):
470335
create = Create
471336
skel = Skeleton
472337
run = Run
473-
diagram = Diagram
474338
export = Export
475339
entrypoints = Entrypoints
476340
install = Install

docs/tutorials/operations.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ are connected.
490490

491491
.. code-block:: console
492492
493-
$ dffml service dev diagram -simple shouldi/deploy/df/shouldi.json
493+
$ dffml dataflow diagram -simple shouldi/deploy/df/shouldi.json
494494
graph TD
495495
subgraph a759a07029077edc5c37fea0326fa281[Processing Stage]
496496
style a759a07029077edc5c37fea0326fa281 fill:#afd388b5,stroke:#a4ca7a

tests/test_integration_cli.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,59 @@
22
This file contains integration tests. We use the CLI to exercise functionality of
33
various DFFML classes and constructs.
44
"""
5+
import re
6+
import os
57
import io
8+
import json
69
import inspect
710
import pathlib
11+
import asyncio
812
import contextlib
13+
import unittest.mock
914

15+
from dffml.df.types import Operation, DataFlow
1016
from dffml.cli.cli import CLI
17+
from dffml.service.dev import Develop
18+
from dffml.util.packaging import is_develop
19+
from dffml.util.entrypoint import load
1120
from dffml.util.asynctestcase import AsyncTestCase
1221

1322
from .test_cli import non_existant_tempfile
1423

1524

25+
def relative_path(*args):
26+
"""
27+
Returns a pathlib.Path object with the path relative to this file.
28+
"""
29+
target = pathlib.Path(__file__).parents[0] / args[0]
30+
for path in list(args)[1:]:
31+
target /= path
32+
return target
33+
34+
35+
@contextlib.contextmanager
36+
def relative_chdir(*args):
37+
"""
38+
Change directory to a location relative to the location of this file.
39+
"""
40+
target = relative_path(*args)
41+
orig_dir = os.getcwd()
42+
try:
43+
os.chdir(target)
44+
yield target
45+
finally:
46+
os.chdir(orig_dir)
47+
48+
1649
class IntegrationCLITestCase(AsyncTestCase):
50+
REQUIRED_PLUGINS = []
51+
1752
async def setUp(self):
1853
super().setUp()
54+
if not all(map(is_develop, self.REQUIRED_PLUGINS)):
55+
self.skipTest(
56+
f"Required plugins: {', '.join(self.REQUIRED_PLUGINS)} must be installed in development mode"
57+
)
1958
self.stdout = io.StringIO()
2059
self._stack = contextlib.ExitStack().__enter__()
2160

@@ -94,3 +133,119 @@ async def test_memory_to_csv(self):
94133
)
95134
+ "\n",
96135
)
136+
137+
138+
class TestDevelop(IntegrationCLITestCase):
139+
140+
REQUIRED_PLUGINS = ["shouldi"]
141+
142+
async def test_export(self):
143+
stdout = io.StringIO()
144+
# Use shouldi's dataflow for tests
145+
with relative_chdir("..", "examples", "shouldi"):
146+
with unittest.mock.patch("sys.stdout.buffer.write") as write:
147+
await Develop.cli("export", "shouldi.cli:DATAFLOW")
148+
DataFlow._fromdict(**json.loads(write.call_args[0][0]))
149+
150+
151+
class TestDataFlow(IntegrationCLITestCase):
152+
153+
REQUIRED_PLUGINS = ["shouldi", "dffml-config-yaml", "dffml-feature-git"]
154+
155+
async def setUp(self):
156+
await super().setUp()
157+
# Use shouldi's dataflow for tests
158+
self.DATAFLOW = list(
159+
load(
160+
"shouldi.cli:DATAFLOW",
161+
relative=relative_path("..", "examples", "shouldi"),
162+
)
163+
)[0]
164+
165+
async def test_diagram_default(self):
166+
filename = self.mktempfile() + ".json"
167+
pathlib.Path(filename).write_text(json.dumps(self.DATAFLOW.export()))
168+
with contextlib.redirect_stdout(self.stdout):
169+
await CLI.cli(
170+
"dataflow", "diagram", filename,
171+
)
172+
stdout = self.stdout.getvalue()
173+
# Check that a subgraph is being made for each operation
174+
self.assertTrue(re.findall(r"subgraph.*run_bandit", stdout))
175+
# Check that all stages are included
176+
for check in ["Processing", "Output", "Cleanup"]:
177+
self.assertIn(f"{check} Stage", stdout)
178+
179+
async def test_diagram_simple(self):
180+
filename = self.mktempfile() + ".json"
181+
pathlib.Path(filename).write_text(json.dumps(self.DATAFLOW.export()))
182+
with contextlib.redirect_stdout(self.stdout):
183+
await CLI.cli(
184+
"dataflow", "diagram", "-simple", filename,
185+
)
186+
# Check that a subgraph is not being made for each operation
187+
self.assertFalse(
188+
re.findall(r"subgraph.*run_bandit", self.stdout.getvalue())
189+
)
190+
191+
async def test_diagram_single_stage(self):
192+
filename = self.mktempfile() + ".json"
193+
pathlib.Path(filename).write_text(json.dumps(self.DATAFLOW.export()))
194+
with contextlib.redirect_stdout(self.stdout):
195+
await CLI.cli(
196+
"dataflow", "diagram", "-stages", "processing", "--", filename,
197+
)
198+
stdout = self.stdout.getvalue()
199+
# Check that the single stage is not its own subgraph
200+
for check in ["Processing", "Output", "Cleanup"]:
201+
self.assertNotIn(f"{check} Stage", stdout)
202+
203+
async def test_diagram_multi_stage(self):
204+
filename = self.mktempfile() + ".json"
205+
pathlib.Path(filename).write_text(json.dumps(self.DATAFLOW.export()))
206+
with contextlib.redirect_stdout(self.stdout):
207+
await CLI.cli(
208+
"dataflow",
209+
"diagram",
210+
"-stages",
211+
"processing",
212+
"output",
213+
"--",
214+
filename,
215+
)
216+
stdout = self.stdout.getvalue()
217+
# Check that the single stage is not its own subgraph
218+
for check in ["Processing", "Output"]:
219+
self.assertIn(f"{check} Stage", stdout)
220+
for check in ["Cleanup"]:
221+
self.assertNotIn(f"{check} Stage", stdout)
222+
223+
async def test_merge(self):
224+
# Write out shouldi dataflow
225+
orig = self.mktempfile() + ".json"
226+
pathlib.Path(orig).write_text(json.dumps(self.DATAFLOW.export()))
227+
# Import from feature/git
228+
transform_to_repo = Operation.load("dffml.mapping.create")
229+
lines_of_code_by_language, lines_of_code_to_comments = list(
230+
load(
231+
"dffml_feature_git.feature.operations:lines_of_code_by_language",
232+
"dffml_feature_git.feature.operations:lines_of_code_to_comments",
233+
relative=relative_path("..", "feature", "git"),
234+
)
235+
)
236+
# Create new dataflow
237+
override = DataFlow.auto(
238+
transform_to_repo,
239+
lines_of_code_by_language,
240+
lines_of_code_to_comments,
241+
)
242+
# TODO Modify and compare against yaml in docs example
243+
# Write out override dataflow
244+
created = self.mktempfile() + ".json"
245+
pathlib.Path(created).write_text(json.dumps(override.export()))
246+
# Merge the two
247+
with contextlib.redirect_stdout(self.stdout):
248+
await CLI.cli(
249+
"dataflow", "merge", orig, created,
250+
)
251+
DataFlow._fromdict(**json.loads(self.stdout.getvalue()))

0 commit comments

Comments
 (0)