1
+ import typing as ty
2
+ import attrs
1
3
from pydra .compose import python , workflow , shell
2
- from fileformats .generic import File
4
+ from fileformats .text import TextFile
3
5
from pydra .utils .general import task_class_as_dict , task_class_from_dict , task_fields
4
- from pydra .utils .tests .utils import SpecificFuncTask , Concatenate
5
-
6
-
7
- def test_python_task_class_as_dict ():
8
-
9
- @python .define (outputs = ["out_int" ], xor = ["b" , "c" ])
10
- def Add (a : int , b : int | None = None , c : int | None = None ) -> int :
11
- """
12
- Parameters
13
- ----------
14
- a: int
15
- the first arg
16
- b : int, optional
17
- the optional second arg
18
- c : int, optional
19
- the optional third arg
20
-
21
- Returns
22
- -------
23
- out_int : int
24
- the sum of a and b
25
- """
26
- return a + (b if b is not None else c )
6
+ from pydra .utils .tests .utils import Concatenate
7
+
8
+
9
+ @python .define (outputs = ["out_int" ], xor = ["b" , "c" ])
10
+ def Add (a : int , b : int | None = None , c : int | None = None ) -> int :
11
+ """
12
+ Parameters
13
+ ----------
14
+ a: int
15
+ the first arg
16
+ b : int, optional
17
+ the optional second arg
18
+ c : int, optional
19
+ the optional third arg
20
+
21
+ Returns
22
+ -------
23
+ out_int : int
24
+ the sum of a and b
25
+ """
26
+ return a + (b if b is not None else c )
27
+
28
+
29
+ def test_python_task_class_as_dict (tmp_path ):
27
30
28
31
dct = task_class_as_dict (Add )
29
32
Reloaded = task_class_from_dict (dct )
30
33
assert task_fields (Add ) == task_fields (Reloaded )
31
34
35
+ add = Reloaded (a = 1 , b = 2 )
36
+ assert add (cache_root = tmp_path / "cache" ).out_int == 3
37
+
32
38
33
39
def test_shell_task_class_as_dict ():
34
40
@@ -41,18 +47,37 @@ def test_shell_task_class_as_dict():
41
47
assert task_fields (MyCmd ) == task_fields (Reloaded )
42
48
43
49
44
- def test_workflow_task_class_as_dict ():
50
+ def test_workflow_task_class_as_dict (tmp_path ):
45
51
46
- @workflow .define
47
- def AWorkflow (in_file : File , a_param : int ) -> tuple [File , File ]:
48
- spec_func = workflow .add (SpecificFuncTask (in_file ))
52
+ @workflow .define (outputs = ["out_file" ])
53
+ def AWorkflow (in_file : TextFile , a_param : int ) -> TextFile :
49
54
concatenate = workflow .add (
50
- Concatenate (
51
- in_file1 = in_file , in_file2 = spec_func .out_file , duplicates = a_param
52
- )
55
+ Concatenate (in_file1 = in_file , in_file2 = in_file , duplicates = a_param )
53
56
)
54
57
return concatenate .out_file
55
58
56
59
dct = task_class_as_dict (AWorkflow )
57
60
Reloaded = task_class_from_dict (dct )
58
61
assert task_fields (AWorkflow ) == task_fields (Reloaded )
62
+
63
+ foo_file = tmp_path / "file1.txt"
64
+ foo_file .write_text ("foo" )
65
+
66
+ outputs = Reloaded (in_file = foo_file , a_param = 2 )(cache_root = tmp_path / "cache" )
67
+ assert outputs .out_file .contents == "foo\n foo\n foo\n foo"
68
+
69
+
70
+ def test_task_class_as_dict_with_value_serializer ():
71
+
72
+ def frozen_set_to_list_serializer (
73
+ mock_class : ty .Any , atr : attrs .Attribute , value : ty .Any
74
+ ) -> ty .Any :
75
+ # This is just a dummy serializer
76
+ if isinstance (value , frozenset ):
77
+ return list (
78
+ frozen_set_to_list_serializer (mock_class , atr , v ) for v in value
79
+ )
80
+ return value
81
+
82
+ dct = task_class_as_dict (Add , value_serializer = frozen_set_to_list_serializer )
83
+ assert dct ["xor" ] == [["b" , "c" ]] or dct ["xor" ] == [["c" , "b" ]]
0 commit comments