44
55----
66
7+ This module provides a basic class library for task grouping, mainly includes 2 classes:
78
9+ * TaskContainer -
10+
11+ * ITaskLoader -
812
913|
1014
@@ -32,14 +36,26 @@ def __init__(self, func:Callable, merge_fn:Callable[[Dict[str, Any], Dict[str, A
3236
3337 self .merge_fn = merge_fn
3438
35- self .check_group ()
3639
40+ @property
41+ def task_group (self ):
42+ return self .__task_group
43+
44+ @task_group .setter
45+ def task_group (self , group ):
46+ if group is None :
47+ self .__task_group = None
48+ elif isinstance (group , list ):
49+ if len (group ) < 1 :
50+ raise ValueError ("task_group cannot be empty" )
3751
38- def check_group (self ):
39- if self .task_group :
40- for t in self .task_group :
41- self ._check_task_type (t )
42- t .check_group ()
52+ for t in group :
53+ if not isinstance (t , TaskContainer ):
54+ raise TypeError ("each task context must be an instance of the TaskContainer class" )
55+
56+ self .__task_group = group
57+ else :
58+ raise TypeError ("task_group must be a list of TaskContainer" )
4359
4460
4561 def run (self , pipeargs :dict = {}):
@@ -59,10 +75,12 @@ def _pipe_in(self, pipeargs:Union[Dict, List[Dict]]={}) -> Dict:
5975
6076 if isinstance (pipeargs , iterable ):
6177 pipe_args = {}
78+
6279 for p in pipeargs :
6380 d = self ._pipe_in (p )
6481 if d :
6582 pipe_args .update (d )
83+
6684 return pipe_args
6785
6886 return {}
@@ -87,16 +105,10 @@ def _single_run(self, pipeargs:dict={}):
87105 return None
88106
89107
90- def _check_task_type (self , task ):
91- if not isinstance (task , TaskContainer ):
92- raise TypeError ("each task context must be an instance of the TaskContainer class" )
93-
94-
95108 def _serial_run (self , pipeargs :dict = {}):
96109 serial_results = []
97110
98111 for task in self .task_group :
99- self ._check_task_type (task )
100112 pipeargs = result = task .run (pipeargs )
101113 serial_results .append (result )
102114
@@ -107,7 +119,6 @@ def _parallel_run(self, pipeargs:dict={}):
107119 task_list = []
108120
109121 for task in self .task_group :
110- self ._check_task_type (task )
111122 task_list .append (self .thread_pool .submit (task .run , pipeargs ))
112123
113124 parallel_results = []
@@ -125,8 +136,21 @@ def create_base_container(self) -> TaskContainer:
125136 pass
126137
127138
139+ @abstractmethod
140+ def extract_single_task (self , task_node :Dict [str , Any ]) -> Tuple [tuple , Dict [str , Any ]]:
141+ pass
142+
143+ @abstractmethod
144+ def extract_serial_group (self , task_node :Dict [str , Any ]) -> List [Dict [str , Any ]]:
145+ pass
146+
147+ @abstractmethod
148+ def extract_parallel_group (self , task_node :Dict [str , Any ]) -> List [Dict [str , Any ]]:
149+ pass
150+
151+
128152 def create_single_task (self , * args , ** kwargs ) -> TaskContainer :
129- task = create_base_container ()
153+ task = self . create_base_container ()
130154 task .pos_args = args
131155 task .kw_args = kwargs
132156 task .parallel = False
@@ -135,27 +159,42 @@ def create_single_task(self, *args, **kwargs) -> TaskContainer:
135159 return task
136160
137161
138- def _create_group_task (self , task_group :List [TaskContainer ]) -> TaskContainer :
162+ def create_group_task (self , task_group :List [TaskContainer ], parallel : bool = False ) -> TaskContainer :
139163 if not task_group :
140164 raise ValueError ("the task_group cannot be empty" )
141165
142- task = create_base_container ()
166+ task = self . create_base_container ()
143167 task .task_group = task_group
144168 task .func = None
145169
146- task .check_group ()
170+ task .parallel = parallel if len ( task_group ) > 1 else False
147171 return task
148172
149173
150- def create_serial_group (self , task_group :List [TaskContainer ]) -> TaskContainer :
151- task = self ._create_group_task (task_group )
152- task .parallel = False
153- return task
174+ def load (self , task_tree :Dict [str , Any ]) -> TaskContainer :
175+ if not isinstance (task_tree , dict ):
176+ raise TypeError ("task_tree argument must be a dictionary type: Dict[str, Any]" )
177+
178+ leaf = self .extract_single_task (task_tree )
179+ if leaf is not None :
180+ if not isinstance (leaf , tuple ) or len (leaf ) != 2 or not isinstance (leaf [0 ], tuple ) or not isinstance (leaf [1 ], dict ):
181+ raise TypeError ("extract_single_task must return Tuple[tuple, Dict[str, Any]] if the current node is a leaf task, otherwise it must return None." )
182+ return self .create_single_task (* leaf [0 ], ** leaf [1 ])
183+
184+ serial = self .extract_serial_group (task_tree )
185+ if serial is not None :
186+ if not isinstance (serial , list ) or len (serial ) < 1 or not isinstance (serial [0 ], dict ):
187+ raise TypeError ("extract_serial_group must return List[Dict[str, Any]] if the current node is a serial group, otherwise it must return None." )
188+ return self .create_group_task ([self .load (t ) for t in serial ], False )
189+
190+ parallel = self .extract_parallel_group (task_tree )
191+ if parallel is not None :
192+ if not isinstance (parallel , list ) or len (parallel ) < 1 or not isinstance (parallel [0 ], dict ):
193+ raise TypeError ("extract_parallel_group must return List[Dict[str, Any]] if the current node is a parallel group, otherwise it must return None." )
194+ return self .create_group_task ([self .load (t ) for t in parallel ], True )
195+
196+ raise TypeError (f"current node of task_tree is not a leaf task, serial task group or parallel task group.\n { repr (task_tree )} " )
154197
155198
156- def create_parallel_group (self , task_group :List [TaskContainer ], timeout :float = None ) -> TaskContainer :
157- task = self ._create_group_task (task_group )
158- task .parallel = True if len (task_group ) > 1 else False
159- task .timeout = timeout
160- return task
161199
200+ __version__ = "0.1a0.dev1"
0 commit comments