|
| 1 | +import os |
| 2 | + |
1 | 3 | import numpy as np |
2 | 4 | import pytest |
3 | 5 | import platform |
4 | 6 |
|
5 | 7 | import ansys.dpf.core.operators as op |
6 | 8 | import conftest |
7 | 9 | from ansys import dpf |
| 10 | +from ansys.dpf.core import misc |
| 11 | + |
| 12 | +if misc.module_exists("graphviz"): |
| 13 | + HAS_GRAPHVIZ = True |
| 14 | +else: |
| 15 | + HAS_GRAPHVIZ = False |
8 | 16 |
|
9 | 17 |
|
10 | 18 | def test_create_workflow(server_type): |
11 | 19 | wf = dpf.core.Workflow(server=server_type) |
12 | 20 | assert wf._internal_obj |
13 | 21 |
|
14 | 22 |
|
| 23 | +@pytest.fixture() |
| 24 | +def remove_dot_file(request): |
| 25 | + """Cleanup a testing directory once we are finished.""" |
| 26 | + |
| 27 | + dot_path = os.path.join(os.getcwd(), "test.dot") |
| 28 | + png_path = os.path.join(os.getcwd(), "test.png") |
| 29 | + png_path1 = os.path.join(os.getcwd(), "test1.png") |
| 30 | + |
| 31 | + def remove_files(): |
| 32 | + if os.path.exists(dot_path): |
| 33 | + os.remove(os.path.join(os.getcwd(), dot_path)) |
| 34 | + if os.path.exists(png_path): |
| 35 | + os.remove(os.path.join(os.getcwd(), png_path)) |
| 36 | + if os.path.exists(png_path1): |
| 37 | + os.remove(os.path.join(os.getcwd(), png_path1)) |
| 38 | + |
| 39 | + request.addfinalizer(remove_files) |
| 40 | + |
| 41 | + |
| 42 | +@pytest.mark.skipif(not HAS_GRAPHVIZ, reason="Please install pyvista") |
| 43 | +def test_workflow_view(server_in_process, remove_dot_file): |
| 44 | + pre_wf = dpf.core.Workflow(server=server_in_process) |
| 45 | + pre_op = dpf.core.operators.utility.forward(server=server_in_process) |
| 46 | + pre_wf.add_operator(pre_op) |
| 47 | + pre_wf.set_input_name("prewf_input", pre_op.inputs.any) |
| 48 | + pre_wf.set_output_name("prewf_output", pre_op.outputs.any) |
| 49 | + |
| 50 | + wf = dpf.core.Workflow(server=server_in_process) |
| 51 | + forward_op = dpf.core.operators.utility.forward(server=server_in_process) |
| 52 | + wf.add_operator(forward_op) |
| 53 | + wf.set_input_name("wf_input", forward_op.inputs.any) |
| 54 | + wf.set_output_name("wf_output", forward_op.outputs.any) |
| 55 | + |
| 56 | + wf.connect_with(pre_wf, {"prewf_output": "wf_input"}) |
| 57 | + wf.view(off_screen=True, title="test1") |
| 58 | + assert not os.path.exists("test1.dot") |
| 59 | + assert os.path.exists("test1.png") |
| 60 | + wf.view(off_screen=True, save_as="test.png", keep_dot_file=True) |
| 61 | + assert os.path.exists("test.dot") |
| 62 | + assert os.path.exists("test.png") |
| 63 | + |
| 64 | + |
15 | 65 | def test_connect_field_workflow(server_type): |
16 | 66 | wf = dpf.core.Workflow(server=server_type) |
17 | 67 | wf.progress_bar = False |
|
0 commit comments