Skip to content

Commit a9d0ccf

Browse files
committed
Add scheduler to executor
Add scheduler to executor
1 parent 597216f commit a9d0ccf

File tree

4 files changed

+170
-65
lines changed

4 files changed

+170
-65
lines changed

je_load_density/utils/executor/action_executor.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import builtins
22
import sys
3-
import time
43
import types
54
from inspect import getmembers, isbuiltin
5+
from typing import Union, Any
66

77
from je_load_density.utils.exception.exception_tags import executor_data_error, add_command_exception_tag
88
from je_load_density.utils.exception.exception_tags import executor_list_error
@@ -12,6 +12,7 @@
1212
from je_load_density.utils.generate_report.generate_xml_report import generate_xml, generate_xml_report
1313
from je_load_density.utils.json.json_file.json_file import read_action_json
1414
from je_load_density.utils.package_manager.package_manager_class import package_manager
15+
from je_load_density.utils.scheduler.extend_apscheduler import scheduler_manager
1516
from je_load_density.wrapper.start_wrapper.start_test import start_test
1617

1718

@@ -26,10 +27,17 @@ def __init__(self):
2627
"generate_json_report": generate_json_report,
2728
"generate_xml": generate_xml,
2829
"generate_xml_report": generate_xml_report,
29-
# execute
30+
# Execute
3031
"execute_action": self.execute_action,
3132
"execute_files": self.execute_files,
3233
"add_package_to_executor": package_manager.add_package_to_executor,
34+
# Scheduler
35+
"scheduler_event_trigger": self.scheduler_event_trigger,
36+
"remove_blocking_scheduler_job": scheduler_manager.remove_blocking_job,
37+
"remove_nonblocking_scheduler_job": scheduler_manager.remove_nonblocking_job,
38+
"start_blocking_scheduler": scheduler_manager.start_block_scheduler,
39+
"start_nonblocking_scheduler": scheduler_manager.start_nonblocking_scheduler,
40+
"start_all_scheduler": scheduler_manager.start_all_scheduler,
3341
}
3442
# get all builtin function and add to event dict
3543
for function in getmembers(builtins, isbuiltin):
@@ -100,6 +108,16 @@ def execute_files(self, execute_files_list: list):
100108
execute_detail_list.append(self.execute_action(read_action_json(file)))
101109
return execute_detail_list
102110

111+
def scheduler_event_trigger(
112+
self, function: str, id: str = None, args: Union[list, tuple] = None,
113+
kwargs: dict = None, scheduler_type: str = "nonblocking", wait_type: str = "secondly",
114+
wait_value: int = 1, **trigger_args: Any) -> None:
115+
if scheduler_type == "nonblocking":
116+
scheduler_event = scheduler_manager.nonblocking_scheduler_event_dict.get(wait_type)
117+
else:
118+
scheduler_event = scheduler_manager.blocking_scheduler_event_dict.get(wait_type)
119+
scheduler_event(self.event_dict.get(function), id, args, kwargs, wait_value, **trigger_args)
120+
103121

104122
executor = Executor()
105123
package_manager.executor = executor
Lines changed: 141 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
from typing import Callable
1+
from datetime import datetime
2+
from typing import Callable, Any, Union
23

4+
from apscheduler.job import Job
35
from apscheduler.schedulers.background import BackgroundScheduler
46
from apscheduler.schedulers.blocking import BlockingScheduler
57
from apscheduler.util import undefined
@@ -10,119 +12,204 @@ class SchedulerManager(object):
1012
def __init__(self):
1113
self._blocking_schedulers: BlockingScheduler = BlockingScheduler()
1214
self._background_schedulers: BackgroundScheduler = BackgroundScheduler()
15+
self.blocking_scheduler_event_dict = {
16+
"secondly": self.add_interval_blocking_secondly,
17+
"minutely": self.add_interval_blocking_minutely,
18+
"hourly": self.add_interval_blocking_hourly,
19+
"daily": self.add_interval_blocking_daily,
20+
"weekly": self.add_interval_blocking_weekly,
21+
}
22+
self.nonblocking_scheduler_event_dict = {
23+
"secondly": self.add_interval_nonblocking_secondly,
24+
"minutely": self.add_interval_nonblocking_minutely,
25+
"hourly": self.add_interval_nonblocking_hourly,
26+
"daily": self.add_interval_nonblocking_daily,
27+
"weekly": self.add_interval_nonblocking_weekly,
28+
}
1329

1430
def add_blocking_job(
15-
self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
16-
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
17-
next_run_time=undefined, jobstore='default', executor='default',
18-
replace_existing=False, **trigger_args):
31+
self, func: Callable, trigger: str = None, args: Union[list, tuple] = None,
32+
kwargs: dict = None, id: str = None, name: str = None,
33+
misfire_grace_time: int = undefined, coalesce: bool = undefined, max_instances: int = undefined,
34+
next_run_time: datetime = undefined, jobstore: str = 'default', executor: str = 'default',
35+
replace_existing: bool = False, **trigger_args: Any) -> Job:
36+
"""
37+
Just an apscheduler add job wrapper.
38+
:param func: callable (or a textual reference to one) to run at the given time
39+
:param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
40+
``func`` is called
41+
:param list|tuple args: list of positional arguments to call func with
42+
:param dict kwargs: dict of keyword arguments to call func with
43+
:param str|unicode id: explicit identifier for the job (for modifying it later)
44+
:param str|unicode name: textual description of the job
45+
:param int misfire_grace_time: seconds after the designated runtime that the job is still
46+
allowed to be run (or ``None`` to allow the job to run no matter how late it is)
47+
:param bool coalesce: run once instead of many times if the scheduler determines that the
48+
job should be run more than once in succession
49+
:param int max_instances: maximum number of concurrently running instances allowed for this
50+
job
51+
:param datetime next_run_time: when to first run the job, regardless of the trigger (pass
52+
``None`` to add the job as paused)
53+
:param str|unicode jobstore: alias of the job store to store the job in
54+
:param str|unicode executor: alias of the executor to run the job with
55+
:param bool replace_existing: ``True`` to replace an existing job with the same ``id``
56+
(but retain the number of runs from the existing one)
57+
:return: Job
58+
"""
1959
params = locals()
2060
params.pop("self")
2161
trigger_args = params.pop("trigger_args")
22-
self._blocking_schedulers.add_job(**params, **trigger_args)
62+
return self._blocking_schedulers.add_job(**params, **trigger_args)
2363

2464
def add_nonblocking_job(
25-
self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
26-
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
27-
next_run_time=undefined, jobstore='default', executor='default',
28-
replace_existing=False, **trigger_args):
65+
self, func: Callable, trigger: str = None, args: Union[list, tuple] = None,
66+
kwargs: dict = None, id: str = None, name: str = None,
67+
misfire_grace_time: int = undefined, coalesce: bool = undefined, max_instances: int = undefined,
68+
next_run_time: datetime = undefined, jobstore: str = 'default', executor: str = 'default',
69+
replace_existing: bool = False, **trigger_args: Any) -> Job:
70+
"""
71+
Just an apscheduler add job wrapper.
72+
:param func: callable (or a textual reference to one) to run at the given time
73+
:param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
74+
``func`` is called
75+
:param list|tuple args: list of positional arguments to call func with
76+
:param dict kwargs: dict of keyword arguments to call func with
77+
:param str|unicode id: explicit identifier for the job (for modifying it later)
78+
:param str|unicode name: textual description of the job
79+
:param int misfire_grace_time: seconds after the designated runtime that the job is still
80+
allowed to be run (or ``None`` to allow the job to run no matter how late it is)
81+
:param bool coalesce: run once instead of many times if the scheduler determines that the
82+
job should be run more than once in succession
83+
:param int max_instances: maximum number of concurrently running instances allowed for this
84+
job
85+
:param datetime next_run_time: when to first run the job, regardless of the trigger (pass
86+
``None`` to add the job as paused)
87+
:param str|unicode jobstore: alias of the job store to store the job in
88+
:param str|unicode executor: alias of the executor to run the job with
89+
:param bool replace_existing: ``True`` to replace an existing job with the same ``id``
90+
(but retain the number of runs from the existing one)
91+
:return: Job
92+
"""
2993
params = locals()
3094
params.pop("self")
3195
trigger_args = params.pop("trigger_args")
32-
self._background_schedulers.add_job(**params, **trigger_args)
96+
return self._background_schedulers.add_job(**params, **trigger_args)
3397

34-
def get_blocking_scheduler(self):
98+
def get_blocking_scheduler(self) -> BlockingScheduler:
99+
"""
100+
Return self blocking scheduler
101+
:return: BlockingScheduler
102+
"""
35103
return self._blocking_schedulers
36104

37-
def get_nonblocking_scheduler(self):
105+
def get_nonblocking_scheduler(self) -> BackgroundScheduler:
106+
"""
107+
Return self background scheduler
108+
:return: BackgroundScheduler
109+
"""
38110
return self._background_schedulers
39111

40-
def start_block_scheduler(self, *args, **kwargs):
112+
def start_block_scheduler(self, *args: Any, **kwargs: Any) -> None:
113+
"""
114+
Start blocking scheduler
115+
:return: None
116+
"""
41117
self._blocking_schedulers.start(*args, **kwargs)
42118

43-
def start_nonblocking_scheduler(self, *args, **kwargs):
119+
def start_nonblocking_scheduler(self, *args: Any, **kwargs: Any) -> None:
120+
"""
121+
Start background scheduler
122+
:return: None
123+
"""
44124
self._background_schedulers.start(*args, **kwargs)
45125

46-
def start_all_scheduler(self, *args, **kwargs):
126+
def start_all_scheduler(self, *args: Any, **kwargs: Any) -> None:
127+
"""
128+
Start background and blocking scheduler
129+
:return: None
130+
"""
47131
self._blocking_schedulers.start(*args, **kwargs)
48132
self._background_schedulers.start(*args, **kwargs)
49133

50134
def add_interval_blocking_secondly(
51-
self, function: Callable, id: str = None, args: list = None,
52-
kwargs: dict = None, seconds: int = 1, **trigger_args):
53-
self.add_blocking_job(
135+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
136+
kwargs: dict = None, seconds: int = 1, **trigger_args: Any) -> Job:
137+
return self.add_blocking_job(
54138
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, seconds=seconds, **trigger_args)
55139

56140
def add_interval_blocking_minutely(
57-
self, function: Callable, id: str = None, args: list = None,
58-
kwargs: dict = None, minutes: int = 1, **trigger_args):
59-
self.add_blocking_job(
141+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
142+
kwargs: dict = None, minutes: int = 1, **trigger_args: Any) -> Job:
143+
return self.add_blocking_job(
60144
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, minutes=minutes, **trigger_args)
61145

62146
def add_interval_blocking_hourly(
63-
self, function: Callable, id: str = None, args: list = None,
64-
kwargs: dict = None, hours: int = 1, **trigger_args):
65-
self.add_blocking_job(
147+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
148+
kwargs: dict = None, hours: int = 1, **trigger_args: Any) -> Job:
149+
return self.add_blocking_job(
66150
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, hours=hours, **trigger_args)
67151

68152
def add_interval_blocking_daily(
69-
self, function: Callable, id: str = None, args: list = None,
70-
kwargs: dict = None, days: int = 1, **trigger_args):
71-
self.add_blocking_job(
153+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
154+
kwargs: dict = None, days: int = 1, **trigger_args: Any) -> Job:
155+
return self.add_blocking_job(
72156
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, days=days, **trigger_args)
73157

74158
def add_interval_blocking_weekly(
75-
self, function: Callable, id: str = None, args: list = None,
76-
kwargs: dict = None, weeks: int = 1, **trigger_args):
77-
self.add_blocking_job(
159+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
160+
kwargs: dict = None, weeks: int = 1, **trigger_args: Any) -> Job:
161+
return self.add_blocking_job(
78162
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, weeks=weeks, **trigger_args)
79163

80164
def add_interval_nonblocking_secondly(
81165
self, function: Callable, id: str = None, args: list = None,
82-
kwargs: dict = None, seconds: int = 1, **trigger_args):
83-
self.add_nonblocking_job(
166+
kwargs: dict = None, seconds: int = 1, **trigger_args: Any) -> Job:
167+
return self.add_nonblocking_job(
84168
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, seconds=seconds, **trigger_args)
85169

86170
def add_interval_nonblocking_minutely(
87171
self, function: Callable, id: str = None, args: list = None,
88-
kwargs: dict = None, minutes: int = 1, **trigger_args):
89-
self.add_nonblocking_job(
172+
kwargs: dict = None, minutes: int = 1, **trigger_args: Any) -> Job:
173+
return self.add_nonblocking_job(
90174
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, minutes=minutes, **trigger_args)
91175

92176
def add_interval_nonblocking_hourly(
93-
self, function: Callable, id: str = None, args: list = None,
94-
kwargs: dict = None, hours: int = 1, **trigger_args):
95-
self.add_nonblocking_job(
177+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
178+
kwargs: dict = None, hours: int = 1, **trigger_args: Any) -> Job:
179+
return self.add_nonblocking_job(
96180
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, hours=hours, **trigger_args)
97181

98182
def add_interval_nonblocking_daily(
99-
self, function: Callable, id: str = None, args: list = None,
100-
kwargs: dict = None, days: int = 1, **trigger_args):
101-
self.add_nonblocking_job(
183+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
184+
kwargs: dict = None, days: int = 1, **trigger_args: Any) -> Job:
185+
return self.add_nonblocking_job(
102186
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, days=days, **trigger_args)
103187

104188
def add_interval_nonblocking_weekly(
105-
self, function: Callable, id: str = None, args: list = None,
106-
kwargs: dict = None, weeks: int = 1, **trigger_args):
107-
self.add_nonblocking_job(
189+
self, function: Callable, id: str = None, args: Union[list, tuple] = None,
190+
kwargs: dict = None, weeks: int = 1, **trigger_args: Any) -> Job:
191+
return self.add_nonblocking_job(
108192
func=function, trigger="interval", id=id, args=args, kwargs=kwargs, weeks=weeks, **trigger_args)
109193

110194
def add_cron_blocking(
111-
self, function: Callable, id: str = None, **trigger_args):
112-
self.add_blocking_job(func=function, id=id, trigger="cron", **trigger_args)
195+
self, function: Callable, id: str = None, **trigger_args: Any) -> Job:
196+
return self.add_blocking_job(func=function, id=id, trigger="cron", **trigger_args)
113197

114198
def add_cron_nonblocking(
115-
self, function: Callable, id: str = None, **trigger_args):
116-
self.add_nonblocking_job(func=function, id=id, trigger="cron", **trigger_args)
199+
self, function: Callable, id: str = None, **trigger_args: Any) -> Job:
200+
return self.add_nonblocking_job(func=function, id=id, trigger="cron", **trigger_args)
117201

118-
def remove_blocking_job(self, id: str, jobstore: str = 'default'):
119-
self._blocking_schedulers.remove_job(job_id=id, jobstore=jobstore)
202+
def remove_blocking_job(self, id: str, jobstore: str = 'default') -> Any:
203+
return self._blocking_schedulers.remove_job(job_id=id, jobstore=jobstore)
120204

121-
def remove_nonblocking_job(self, id: str, jobstore: str = 'default'):
122-
self._background_schedulers.remove_job(job_id=id, jobstore=jobstore)
205+
def remove_nonblocking_job(self, id: str, jobstore: str = 'default') -> Any:
206+
return self._background_schedulers.remove_job(job_id=id, jobstore=jobstore)
123207

124-
def shutdown_blocking_scheduler(self, wait: bool = False):
208+
def shutdown_blocking_scheduler(self, wait: bool = False) -> None:
125209
self._blocking_schedulers.shutdown(wait=wait)
126210

127-
def shutdown_nonblocking_scheduler(self, wait: bool = False):
211+
def shutdown_nonblocking_scheduler(self, wait: bool = False) -> None:
128212
self._background_schedulers.shutdown(wait=wait)
213+
214+
215+
scheduler_manager = SchedulerManager()

pyproject.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
# Rename to stable version
2-
# This is stable version
1+
# Rename to dev version
2+
# This is dev version
33
[build-system]
44
requires = ["setuptools>=61.0"]
55
build-backend = "setuptools.build_meta"
66

77
[project]
8-
name = "je_load_density"
9-
version = "0.0.48"
8+
name = "je_load_density_dev"
9+
version = "0.0.58"
1010
authors = [
1111
{ name = "JE-Chen", email = "[email protected]" },
1212
]
@@ -40,4 +40,4 @@ content-type = "text/markdown"
4040
license-files = ["LICENSE"]
4141

4242
[tool.setuptools.packages]
43-
find = { namespaces = false }
43+
find = { namespaces = false }

dev.toml renamed to stable.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
# Rename to dev version
2-
# This is dev version
1+
# Rename to stable version
2+
# This is stable version
33
[build-system]
44
requires = ["setuptools>=61.0"]
55
build-backend = "setuptools.build_meta"
66

77
[project]
8-
name = "je_load_density_dev"
9-
version = "0.0.57"
8+
name = "je_load_density"
9+
version = "0.0.48"
1010
authors = [
1111
{ name = "JE-Chen", email = "[email protected]" },
1212
]

0 commit comments

Comments
 (0)