Skip to content

Commit 09cbe32

Browse files
committed
Updating task_grouping.py ...
1 parent 4278ea3 commit 09cbe32

File tree

1 file changed

+102
-13
lines changed

1 file changed

+102
-13
lines changed

Utilities.PyPI/task_grouping.py

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44
55
----
66
7-
This module provides a basic class library for task grouping, mainly includes 2 classes:
7+
This module provides a basic class library for task grouping, includes 2 classes:
88
9-
* TaskContainer - Organizes a batch of task groups, including the serial/parallel structures, and carries the arguments information for each task unit to run.
9+
- **TaskContainer**
1010
11-
* ITaskLoader - This is an abstract base class for implementing a concrete loader class to load a task tree from Dict/JSON to TaskContainer.
11+
Organizes a batch of task groups, including the serial/parallel structures, and carries the arguments information for each task unit to run.
1212
13-
|
13+
- **ITaskLoader**
14+
15+
This is an abstract base class for implementing a concrete loader class to load a task tree from Dict/JSON to TaskContainer.
16+
17+
----
1418
1519
| Homepage and documentation: https://github.com/DataBooster/PyWebApi
1620
| Copyright (c) 2020 Abel Cheng
@@ -23,7 +27,12 @@
2327

2428

2529
class TaskContainer(object):
30+
"""Organizes a batch of task groups, including the serial/parallel structures, and carries the arguments information for each task unit to run.
2631
32+
:param func: A callable to be executed by the task.
33+
:param merge_fn: A function for merging pipeline arguments (a dictionary ``[Dict[str, Any]``) with user input arguments (a dictionary ``[Dict[str, Any]``), it must return a new dictionary ``[Dict[str, Any]``.
34+
:param thread_pool: An instance of ``ThreadPoolExecutor`` for executing any parallel task group. This argument is required, otherwise any parallel task group will actually be executed serially.
35+
"""
2736
def __init__(self, func:Callable, merge_fn:Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]], thread_pool:ThreadPoolExecutor, **kwargs):
2837
self.task_group : List[TaskContainer] = kwargs.get('task_group', None)
2938
self.parallel : bool = kwargs.get('parallel', False)
@@ -34,7 +43,7 @@ def __init__(self, func:Callable, merge_fn:Callable[[Dict[str, Any], Dict[str, A
3443
self.pos_args : Tuple = kwargs.get('pos_args', ())
3544
self.kw_args : Dict = kwargs.get('kw_args', {})
3645

37-
self.merge_fn = merge_fn
46+
self.merge_fn = merge_fn # A function for merging pipeline arguments with user input kw_args
3847

3948

4049
@property
@@ -59,6 +68,15 @@ def task_group(self, group):
5968

6069

6170
def run(self, pipeargs:dict={}):
71+
"""Execute all tasks and task groups in the specified order (serial/parallel) and assemble their results into a tree structure corresponding to the input payload.
72+
73+
:param pipeargs: (optional) If the result of the previous task is a dictionary ``[Dict[str, Any]``, it can be piped to current task or task group at run time and merged into the user input arguments.
74+
75+
- If current task is a serial group, the first subtask will receive the pipeline arguments, and the result of the first subtask will be used as the pipeline arguments of the second subtask, and so on.
76+
- If current task is a parallel group, all subtasks will receive this same pipeline arguments.
77+
78+
:return: All results will be assembled into a tree structure corresponding to the input payload.
79+
"""
6280
if self.task_group is None:
6381
return self._single_run(pipeargs)
6482
else:
@@ -94,9 +112,12 @@ def _single_run(self, pipeargs:dict={}):
94112
if self.kw_args is None:
95113
self.kw_args = {}
96114

97-
pipe_args = self._pipe_in(pipeargs)
98-
if pipe_args and self.merge_fn:
99-
kw_args = self.merge_fn(self.kw_args, pipe_args)
115+
if self.merge_fn:
116+
pipe_args = self._pipe_in(pipeargs)
117+
if pipe_args:
118+
kw_args = self.merge_fn(self.kw_args, pipe_args)
119+
else:
120+
kw_args = self.kw_args
100121
else:
101122
kw_args = self.kw_args
102123

@@ -130,36 +151,97 @@ def _parallel_run(self, pipeargs:dict={}):
130151

131152

132153
class ITaskLoader(metaclass=ABCMeta):
154+
"""This is an abstract base class for implementing a concrete loader class to load a task tree from Dict/JSON to ``TaskContainer``."""
133155

134156
@abstractmethod
135157
def create_base_container(self) -> TaskContainer:
158+
"""This method is used to create a new container and initialize the most basic properties of ``TaskContainer``: ``func``, ``merge_fn``, ``thread_pool``, etc.
159+
For an example:
160+
161+
.. code-block:: python
162+
163+
def create_base_container(self) -> TaskContainer:
164+
return TaskContainer(self.task_func, self.pipemerge_fn, self.thread_pool, **{'timeout':self.timeout})
165+
"""
136166
pass
137167

138168

139169
@abstractmethod
140-
def extract_single_task(self, task_node:Dict[str, Any]) -> Tuple[tuple, Dict[str, Any]]:
170+
def extract_single_task(self, task_node:Dict[str, Any]) -> Tuple[tuple, Dict[str, Any], bool]:
171+
"""This method is used to determine whether the task node is a leaf task (single task).
172+
If so, it should return a tuple containing three elements:
173+
174+
1. The first element must be a tuple containing the positional arguments to be passed to the task. If the position argument is not needed at all, please put ``()``;
175+
2. The second element must be a dictionary containing keyworded arguments to be passed to the task. If there are no arguments, please put ``{}``;
176+
3. The third element must be a Boolean value to indicate whether to merge the execution result of the previous task as a pipeline argument into the user input keyworded arguments.
177+
178+
If the task node is NOT a leaf task, this method should return ``None``.
179+
180+
:param task_node: A node of task tree - ``Dict[str, Any]``.
181+
:return: ``Tuple[tuple, Dict[str, Any], bool]``
182+
"""
141183
pass
142184

185+
143186
@abstractmethod
144187
def extract_serial_group(self, task_node:Dict[str, Any]) -> List[Dict[str, Any]]:
188+
"""This method is used to determine whether the task node is a serial task group.
189+
190+
If so, a list of child nodes ``List[Dict[str, Any]]`` should be returned for recursive extraction;
191+
192+
Otherwise, ``None`` should be returned.
193+
194+
:param task_node: A node of task tree - ``Dict[str, Any]``.
195+
:return: ``List[Dict[str, Any]]``
196+
"""
145197
pass
146198

199+
147200
@abstractmethod
148201
def extract_parallel_group(self, task_node:Dict[str, Any]) -> List[Dict[str, Any]]:
202+
"""This method is used to determine whether the task node is a parallel task group.
203+
204+
If so, a list of child nodes ``List[Dict[str, Any]]`` should be returned for recursive extraction;
205+
206+
Otherwise, ``None`` should be returned.
207+
208+
:param task_node: A node of task tree - ``Dict[str, Any]``.
209+
:return: ``List[Dict[str, Any]]``
210+
"""
149211
pass
150212

151213

152-
def create_single_task(self, *args, **kwargs) -> TaskContainer:
214+
def create_single_task(self, with_pipe:bool=False, *args, **kwargs) -> TaskContainer:
215+
"""This method creates an instance of ``TaskContainer`` for a single (leaf) task.
216+
217+
:param with_pipe: A Boolean value indicates that the task accepts pipeline parameters from the result of the previous task.
218+
219+
*When the pipeline arguments and user input arguments are both dictionary types, the arguments of these two parts can be merged together.*
220+
221+
:param args: Positional arguments to be passed to the task.
222+
:param kwargs: keyworded arguments to be passed to the task.
223+
:return: An instance of ``TaskContainer``
224+
"""
153225
task = self.create_base_container()
154226
task.pos_args = args
155227
task.kw_args = kwargs
156228
task.parallel = False
157229
task.thread_pool = None
158230

231+
if not with_pipe:
232+
task.merge_fn = None
233+
159234
return task
160235

161236

162237
def create_group_task(self, task_group:List[TaskContainer], parallel:bool=False) -> TaskContainer:
238+
"""This method creates an instance of ``TaskContainer`` for a task group.
239+
240+
:param task_group: A list of subtasks, each subtask is presented as a ``TaskContainer``.
241+
242+
:param parallel: A Boolean value indicates that its first-level subtasks should be executed in parallel (True value) or serial (False value).
243+
:return: An instance of ``TaskContainer``
244+
"""
163245
if not task_group:
164246
raise ValueError("the task_group cannot be empty")
165247

@@ -172,14 +254,21 @@ def create_group_task(self, task_group:List[TaskContainer], parallel:bool=False)
172254

173255

174256
def load(self, task_tree:Dict[str, Any]) -> TaskContainer:
257+
"""This method is used to load a task tree from Dict/JSON to a ``TaskContainer``
258+
:param task_tree: A task tree ``Dict[str, Any]`` containing all task groups and user input arguments.
259+
260+
Usually, this dictionary tree comes from the client JSON payload.
261+
262+
:return: An instance of ``TaskContainer``
263+
"""
175264
if not isinstance(task_tree, dict):
176265
raise TypeError("task_tree argument must be a dictionary type: Dict[str, Any]")
177266

178267
leaf = self.extract_single_task(task_tree)
179268
if leaf is not None:
180-
if not isinstance(leaf, tuple) or len(leaf) != 2 or not isinstance(leaf[0], tuple) or not isinstance(leaf[1], dict):
181-
raise TypeError("extract_single_task must return Tuple[tuple, Dict[str, Any]] if the current node is a leaf task, otherwise it must return None.")
182-
return self.create_single_task(*leaf[0], **leaf[1])
269+
if not isinstance(leaf, tuple) or len(leaf) != 3 or not isinstance(leaf[0], tuple) or not isinstance(leaf[1], dict) or not isinstance(leaf[2], bool):
270+
raise TypeError("extract_single_task must return Tuple[tuple, Dict[str, Any], bool] if the current node is a leaf task, otherwise it must return None.")
271+
return self.create_single_task(leaf[2], *leaf[0], **leaf[1])
183272

184273
serial = self.extract_serial_group(task_tree)
185274
if serial is not None:

0 commit comments

Comments
 (0)