88import sys
99import time
1010import types
11+ from collections import namedtuple
1112from functools import partial
1213from itertools import cycle
1314from multiprocessing import Pool , cpu_count
2223import pytest
2324
2425from graphtik import network
25- from graphtik .base import AbortedException , IncompleteExecutionError
26+ from graphtik .base import AbortedException , IncompleteExecutionError , Operation
2627from graphtik .config import (
2728 abort_run ,
2829 debug_enabled ,
3435 operations_reschedullled ,
3536 tasks_marshalled ,
3637)
37- from graphtik .execution import Solution , task_context
38- from graphtik .base import Operation
38+ from graphtik .execution import Solution , _OpTask , task_context
3939from graphtik .modifiers import dep_renamed , optional , sfx , sfxed , vararg
4040from graphtik .op import NO_RESULT , NO_RESULT_BUT_SFX , operation
4141from graphtik .pipeline import NULL_OP , Pipeline , compose
4949_parallel = pytest .mark .parallel
5050_marshal = pytest .mark .marshal
5151
52+ _ExeParams = namedtuple ("_ExeParams" , "parallel, proc, marshal" )
53+ _exe_params = _ExeParams (None , None , None )
54+
5255
5356@pytest .fixture (
5457 params = [
55- # PARALLEL?, Proc/ Thread?, Marshalled?
58+ # PARALLEL?, Thread/Proc ?, Marshalled?
5659 (None , None , None ),
5760 pytest .param ((1 , 0 , 0 ), marks = (_parallel , _thread )),
5861 pytest .param ((1 , 0 , 1 ), marks = (_parallel , _thread , _marshal )),
7073)
7174def exemethod (request ):
7275 """Returns (exemethod, marshal) combinations"""
76+ global _exe_params
7377 parallel , proc_pool , marshal = request .param
78+ _exe_params = _ExeParams (* request .param )
79+
7480 nsharks = None # number of pool swimmers....
7581
7682 with tasks_marshalled (marshal ):
@@ -118,6 +124,21 @@ def abspow(a, p):
118124 return c
119125
120126
127+ def test_serialize_pipeline (samplenet , ser_method ):
128+ def eq (pipe1 , pipe2 ):
129+ return pipe1 .name == pipe2 .name and pipe1 .ops == pipe2 .ops
130+
131+ assert eq (ser_method (samplenet ), samplenet )
132+
133+
134+ def test_serialize_OpTask (ser_method ):
135+ def eq (o1 , o2 ):
136+ return all (getattr (o1 , a ) == getattr (o2 , a ) for a in _OpTask .__slots__ )
137+
138+ ot = _OpTask (1 , 2 , 3 , 4 )
139+ assert eq (ser_method (ot ), ot )
140+
141+
121142def test_solution_finalized ():
122143 sol = Solution (MagicMock (), {})
123144
@@ -285,7 +306,7 @@ def powers_in_range(a, exponent):
285306 assert sol == exp
286307
287308
288- def test_task_context (exemethod ):
309+ def test_task_context (exemethod , request ):
289310 def check_task_context ():
290311 assert task_context .get ().op == next (iop )
291312
@@ -296,10 +317,17 @@ def check_task_context():
296317 parallel = exemethod ,
297318 )
298319 iop = iter (pipe .ops )
299- if exemethod and is_marshal_tasks ():
300- with pytest .raises (AssertionError , match = "^assert FunctionalOperation" ):
320+
321+ print (_exe_params )
322+ err = None
323+ if _exe_params .parallel and _exe_params .marshal :
324+ err = AssertionError ("^assert FunctionalOperation" )
325+ if _exe_params .proc and _exe_params .marshal :
326+ err = Exception ("^Error sending result" )
327+ if err :
328+ with pytest .raises (type (err ), match = str (err )):
301329 pipe .compute ()
302- raise pytest .xfail ("Cannot marshal `task_context` :-(." )
330+ raise pytest .xfail ("Cannot marshal parallel processes with `task_context` :-(." )
303331 else :
304332 pipe .compute ()
305333 with pytest .raises (StopIteration ):
0 commit comments