Skip to content

Commit 06858d4

Browse files
committed
Adding task_grouping.py ..
1 parent a6c8f38 commit 06858d4

File tree

2 files changed

+164
-0
lines changed

2 files changed

+164
-0
lines changed

Utilities.PyPI/Utilities.PyPI.pyproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
<Compile Include="setup_simple_rest_call.py" />
4545
<Compile Include="setup_dbdatareader.py" />
4646
<Compile Include="simple_rest_call.py" />
47+
<Compile Include="task_grouping.py">
48+
<SubType>Code</SubType>
49+
</Compile>
4750
</ItemGroup>
4851
<Import Project="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets" />
4952
<!-- Uncomment the CoreCompile target to enable the Build command in

Utilities.PyPI/task_grouping.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
task_grouping - Task Grouping
4+
5+
----
6+
7+
8+
9+
|
10+
11+
| Homepage and documentation: https://github.com/DataBooster/PyWebApi
12+
| Copyright (c) 2020 Abel Cheng
13+
| License: MIT
14+
"""
15+
16+
from abc import ABCMeta, abstractmethod
17+
from typing import Union, List, Dict, Tuple, Callable, Any
18+
from concurrent.futures import ThreadPoolExecutor, as_completed
19+
20+
21+
class TaskContainer(object):
22+
23+
def __init__(self, func:Callable, merge_fn:Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]], thread_pool:ThreadPoolExecutor, **kwargs):
24+
self.task_group : List[TaskContainer] = kwargs.get('task_group', None)
25+
self.parallel : bool = kwargs.get('parallel', False)
26+
self.thread_pool = thread_pool
27+
self.timeout : float = kwargs.get('timeout', None)
28+
29+
self.func = func
30+
self.pos_args : Tuple = kwargs.get('pos_args', ())
31+
self.kw_args : Dict = kwargs.get('kw_args', {})
32+
33+
self.merge_fn = merge_fn
34+
35+
self.check_group()
36+
37+
38+
def check_group(self):
39+
if self.task_group:
40+
for t in self.task_group:
41+
self._check_task_type(t)
42+
t.check_group()
43+
44+
45+
def run(self, pipeargs:dict={}):
46+
if self.task_group is None:
47+
return self._single_run(pipeargs)
48+
else:
49+
if self.parallel and self.thread_pool and self.task_group and len(self.task_group) > 1:
50+
return self._parallel_run(pipeargs)
51+
else:
52+
return self._serial_run(pipeargs)
53+
54+
55+
def _pipe_in(self, pipeargs:Union[Dict, List[Dict]]={}) -> Dict:
56+
if pipeargs:
57+
if isinstance(pipeargs, dict):
58+
return pipeargs
59+
60+
if isinstance(pipeargs, iterable):
61+
pipe_args = {}
62+
for p in pipeargs:
63+
d = self._pipe_in(p)
64+
if d:
65+
pipe_args.update(d)
66+
return pipe_args
67+
68+
return {}
69+
70+
71+
def _single_run(self, pipeargs:dict={}):
72+
if self.func:
73+
if self.pos_args is None:
74+
self.pos_args = ()
75+
76+
if self.kw_args is None:
77+
self.kw_args = {}
78+
79+
pipe_args = self._pipe_in(pipeargs)
80+
if pipe_args and self.merge_fn:
81+
kw_args = self.merge_fn(self.kw_args, pipe_args)
82+
else:
83+
kw_args = self.kw_args
84+
85+
return self.func(*self.pos_args, **kw_args)
86+
else:
87+
return None
88+
89+
90+
def _check_task_type(self, task):
91+
if not isinstance(task, TaskContainer):
92+
raise TypeError("each task context must be an instance of the TaskContainer class")
93+
94+
95+
def _serial_run(self, pipeargs:dict={}):
96+
serial_results = []
97+
98+
for task in self.task_group:
99+
self._check_task_type(task)
100+
pipeargs = result = task.run(pipeargs)
101+
serial_results.append(result)
102+
103+
return serial_results
104+
105+
106+
def _parallel_run(self, pipeargs:dict={}):
107+
task_list = []
108+
109+
for task in self.task_group:
110+
self._check_task_type(task)
111+
task_list.append(self.thread_pool.submit(task.run, pipeargs))
112+
113+
parallel_results = []
114+
for future in as_completed(task_list, self.timeout):
115+
parallel_results.append(future.result())
116+
117+
return parallel_results
118+
119+
120+
121+
class ITaskLoader(metaclass=ABCMeta):
122+
123+
@abstractmethod
124+
def create_base_container(self) -> TaskContainer:
125+
pass
126+
127+
128+
def create_single_task(self, *args, **kwargs) -> TaskContainer:
129+
task = create_base_container()
130+
task.pos_args = args
131+
task.kw_args = kwargs
132+
task.parallel = False
133+
task.thread_pool = None
134+
135+
return task
136+
137+
138+
def _create_group_task(self, task_group:List[TaskContainer]) -> TaskContainer:
139+
if not task_group:
140+
raise ValueError("the task_group cannot be empty")
141+
142+
task = create_base_container()
143+
task.task_group = task_group
144+
task.func = None
145+
146+
task.check_group()
147+
return task
148+
149+
150+
def create_serial_group(self, task_group:List[TaskContainer]) -> TaskContainer:
151+
task = self._create_group_task(task_group)
152+
task.parallel = False
153+
return task
154+
155+
156+
def create_parallel_group(self, task_group:List[TaskContainer], timeout:float=None) -> TaskContainer:
157+
task = self._create_group_task(task_group)
158+
task.parallel = True if len(task_group) > 1 else False
159+
task.timeout = timeout
160+
return task
161+

0 commit comments

Comments
 (0)