Skip to content

Commit 6150a05

Browse files
committed
add cron job
1 parent a58906f commit 6150a05

File tree

7 files changed

+493
-26
lines changed

7 files changed

+493
-26
lines changed

executor/engine/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .core import Engine, EngineSetting
22
from .job import LocalJob, ThreadJob, ProcessJob
33

4-
__version__ = '0.2.8'
4+
__version__ = '0.2.9'
55

66
__all__ = [
77
'Engine', 'EngineSetting',

executor/engine/core.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ class EngineSetting:
3737
if not set, will create a cache directory in
3838
.executor/{engine.id}.
3939
print_traceback: Whether to print traceback when job failed.
40+
kwargs_inject_key: Key to inject engine to job kwargs.
41+
If set, the engine will be injected to job kwargs
42+
with the key.
43+
default is "__engine__".
4044
"""
4145
max_thread_jobs: T.Optional[int] = None
4246
max_process_jobs: T.Optional[int] = None
@@ -45,6 +49,7 @@ class EngineSetting:
4549
cache_type: T.Literal["diskcache", "none"] = "none"
4650
cache_path: T.Optional[str] = None
4751
print_traceback: bool = True
52+
kwargs_inject_key: str = "__engine__"
4853

4954

5055
@dataclass
@@ -193,6 +198,9 @@ async def submit_async(self, *jobs: Job):
193198
if job.status == "created":
194199
job.engine = self
195200
job._status = "pending"
201+
func_var_names = job.func.__code__.co_varnames
202+
if self.setting.kwargs_inject_key in func_var_names:
203+
job.kwargs[self.setting.kwargs_inject_key] = self
196204
self.jobs.add(job)
197205
else:
198206
job.status = "pending"

executor/engine/job/condition.py

Lines changed: 223 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import typing as T
2-
from datetime import datetime
2+
from datetime import datetime, timedelta
33
from dataclasses import dataclass
44

55
from .utils import JobStatusType
@@ -16,6 +16,12 @@ def satisfy(self, engine: "Engine") -> bool: # pragma: no cover
1616
"""Check if the condition is satisfied."""
1717
return True
1818

19+
def __or__(self, other: "Condition") -> "AnySatisfied":
20+
return AnySatisfied(conditions=[self, other])
21+
22+
def __and__(self, other: "Condition") -> "AllSatisfied":
23+
return AllSatisfied(conditions=[self, other])
24+
1925

2026
@dataclass
2127
class AfterAnother(Condition):
@@ -30,8 +36,7 @@ class AfterAnother(Condition):
3036
job_id: str
3137
statuses: T.Iterable[JobStatusType] = ("done", "failed", "cancelled")
3238

33-
def satisfy(self, engine):
34-
"""Check if the condition is satisfied."""
39+
def satisfy(self, engine) -> bool:
3540
try:
3641
another = engine.jobs.get_job_by_id(self.job_id)
3742
except Exception:
@@ -59,8 +64,7 @@ class AfterOthers(Condition):
5964
statuses: T.Iterable[JobStatusType] = ("done", "failed", "cancelled")
6065
mode: T.Literal['all', 'any'] = "all"
6166

62-
def satisfy(self, engine):
63-
"""Check if the condition is satisfied."""
67+
def satisfy(self, engine) -> bool:
6468
other_job_satisfy = []
6569
for id_ in self.job_ids:
6670
try:
@@ -76,24 +80,6 @@ def satisfy(self, engine):
7680
return any(other_job_satisfy)
7781

7882

79-
@dataclass
80-
class AfterTimepoint(Condition):
81-
"""Condition that the job is executed after a timepoint.
82-
83-
Attributes:
84-
timepoint: The timepoint.
85-
"""
86-
87-
timepoint: datetime
88-
89-
def satisfy(self, engine):
90-
"""Check if the condition is satisfied."""
91-
if datetime.now() > self.timepoint:
92-
return True
93-
else:
94-
return False
95-
96-
9783
@dataclass
9884
class Combination(Condition):
9985
"""Base class for combination of conditions.
@@ -134,3 +120,217 @@ class AnySatisfied(Combination):
134120
def satisfy(self, engine):
135121
"""Check if the condition is satisfied."""
136122
return any([c.satisfy(engine) for c in self.conditions])
123+
124+
125+
def _parse_clock_str(time_str: str) -> T.Tuple[int, int, int]:
126+
hour: int
127+
minute: int
128+
second: int
129+
if time_str.count(":") == 1:
130+
hour, minute = time_str.split(":")
131+
hour, minute = int(hour), int(minute)
132+
second = 0
133+
elif time_str.count(":") == 2:
134+
hour, minute, second = time_str.split(":")
135+
hour, minute, second = int(hour), int(minute), int(second)
136+
else:
137+
hour = int(time_str)
138+
minute = 0
139+
second = 0
140+
return hour, minute, second
141+
142+
143+
def _parse_weekday_str(weekday_str: str) -> int:
144+
if weekday_str.lower() in ("monday", "mon"):
145+
return 0
146+
elif weekday_str.lower() in ("tuesday", "tue"):
147+
return 1
148+
elif weekday_str.lower() in ("wednesday", "wed"):
149+
return 2
150+
elif weekday_str.lower() in ("thursday", "thu"):
151+
return 3
152+
elif weekday_str.lower() in ("friday", "fri"):
153+
return 4
154+
elif weekday_str.lower() in ("saturday", "sat"):
155+
return 5
156+
elif weekday_str.lower() in ("sunday", "sun"):
157+
return 6
158+
raise ValueError(f"Invalid weekday string: {weekday_str}")
159+
160+
161+
def _parse_period_str(period_str: str) -> timedelta:
162+
if period_str.endswith("d"):
163+
return timedelta(days=float(period_str[:-1]))
164+
elif period_str.endswith("h"):
165+
return timedelta(hours=float(period_str[:-1]))
166+
elif period_str.endswith("m"):
167+
return timedelta(minutes=float(period_str[:-1]))
168+
elif period_str.endswith("s"):
169+
return timedelta(seconds=float(period_str[:-1]))
170+
raise ValueError(f"Invalid period string: {period_str}")
171+
172+
173+
@dataclass
174+
class TimeCondition(Condition):
175+
pass
176+
177+
178+
@dataclass
179+
class EveryPeriod(TimeCondition):
180+
"""Every period.
181+
182+
Attributes:
183+
period_str: The period string.
184+
format: "1d", "1h", "1m", "1s"
185+
last_submitted_at: The last submitted time.
186+
immediate: Whether to submit the job immediately.
187+
"""
188+
period_str: str
189+
last_submitted_at: T.Optional[datetime] = None
190+
immediate: bool = True
191+
192+
def satisfy(self, _) -> bool:
193+
period = _parse_period_str(self.period_str)
194+
res: bool
195+
if self.last_submitted_at is None:
196+
self.last_submitted_at = datetime.now()
197+
res = self.immediate
198+
else:
199+
res = (datetime.now() - self.last_submitted_at >= period)
200+
if res:
201+
self.last_submitted_at = datetime.now()
202+
return res
203+
204+
205+
@dataclass
206+
class AfterClock(TimeCondition):
207+
"""After a specific clock.
208+
209+
Attributes:
210+
time_str: The time string.
211+
format: "12:00", "12:00:00"
212+
"""
213+
time_str: str
214+
215+
def satisfy(self, _) -> bool:
216+
hour, minute, second = _parse_clock_str(self.time_str)
217+
now = datetime.now()
218+
res = (
219+
(now.hour >= hour) &
220+
(now.minute >= minute) &
221+
(now.second >= second)
222+
)
223+
return res
224+
225+
226+
@dataclass
227+
class BeforeClock(TimeCondition):
228+
"""Before a specific clock.
229+
230+
Attributes:
231+
time_str: The time string.
232+
format: "12:00", "12:00:00"
233+
"""
234+
time_str: str
235+
236+
def satisfy(self, _) -> bool:
237+
hour, minute, second = _parse_clock_str(self.time_str)
238+
now = datetime.now()
239+
res = (
240+
(now.hour <= hour) &
241+
(now.minute <= minute) &
242+
(now.second <= second)
243+
)
244+
return res
245+
246+
247+
@dataclass
248+
class AfterWeekday(TimeCondition):
249+
"""After a specific weekday.
250+
251+
Attributes:
252+
weekday_str: The weekday string.
253+
format: "monday", "mon", "tuesday", "tue", "wednesday", "wed",
254+
"thursday", "thu", "friday", "fri", "saturday", "sat",
255+
"sunday", "sun"
256+
"""
257+
weekday_str: str
258+
259+
def satisfy(self, _) -> bool:
260+
weekday = _parse_weekday_str(self.weekday_str)
261+
now = datetime.now()
262+
res = (now.weekday() >= weekday)
263+
return res
264+
265+
266+
@dataclass
267+
class BeforeWeekday(TimeCondition):
268+
"""Before a specific weekday.
269+
270+
Attributes:
271+
weekday_str: The weekday string.
272+
format: "monday", "mon", "tuesday", "tue", "wednesday", "wed",
273+
"thursday", "thu", "friday", "fri", "saturday", "sat",
274+
"sunday", "sun"
275+
"""
276+
weekday_str: str
277+
278+
def satisfy(self, _) -> bool:
279+
weekday = _parse_weekday_str(self.weekday_str)
280+
now = datetime.now()
281+
res = (now.weekday() <= weekday)
282+
return res
283+
284+
285+
_valid_timepoint_fields = ("year", "month", "day", "hour", "minute", "second")
286+
287+
288+
@dataclass
289+
class AfterTimepoint(TimeCondition):
290+
"""After a timepoint.
291+
292+
Attributes:
293+
timepoint: The timepoint.
294+
compare_fields: The fields to compare.
295+
Fields: "year", "month", "day", "hour", "minute", "second"
296+
"""
297+
298+
timepoint: datetime
299+
compare_fields: T.Optional[T.List[str]] = None
300+
301+
def satisfy(self, _) -> bool:
302+
if self.compare_fields is None:
303+
return datetime.now() > self.timepoint
304+
else:
305+
for field in self.compare_fields:
306+
if field not in _valid_timepoint_fields:
307+
raise ValueError(f"Invalid field: {field}")
308+
if getattr(datetime.now(), field) >= \
309+
getattr(self.timepoint, field):
310+
return False
311+
return True
312+
313+
314+
@dataclass
315+
class BeforeTimepoint(TimeCondition):
316+
"""Before a timepoint.
317+
318+
Attributes:
319+
timepoint: The timepoint.
320+
compare_fields: The fields to compare.
321+
Fields: "year", "month", "day", "hour", "minute", "second"
322+
"""
323+
timepoint: datetime
324+
compare_fields: T.Optional[T.List[str]] = None
325+
326+
def satisfy(self, _) -> bool:
327+
if self.compare_fields is None:
328+
return datetime.now() < self.timepoint
329+
else:
330+
for field in self.compare_fields:
331+
if field not in _valid_timepoint_fields:
332+
raise ValueError(f"Invalid field: {field}")
333+
if getattr(datetime.now(), field) <= \
334+
getattr(self.timepoint, field):
335+
return False
336+
return True

executor/engine/job/extend/cron.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import typing as T
2+
from datetime import datetime
3+
4+
from .. import Job
5+
from ..condition import (
6+
TimeCondition,
7+
AllSatisfied, AfterTimepoint, BeforeTimepoint,
8+
EveryPeriod, BeforeClock, AfterClock,
9+
AfterWeekday, BeforeWeekday
10+
)
11+
from .sentinel import SentinelJob
12+
13+
14+
every = EveryPeriod
15+
daily = EveryPeriod("1d")
16+
hourly = EveryPeriod("1h")
17+
monthly = EveryPeriod("1m")
18+
19+
before_clock = BeforeClock
20+
after_clock = AfterClock
21+
22+
after_weekday = AfterWeekday
23+
before_weekday = BeforeWeekday
24+
25+
after_timepoint = AfterTimepoint
26+
before_timepoint = BeforeTimepoint
27+
28+
29+
def between_clock(start: str, end: str) -> AllSatisfied:
30+
return AllSatisfied([BeforeClock(end), AfterClock(start)])
31+
32+
33+
def between_weekday(start: str, end: str) -> AllSatisfied:
34+
return AllSatisfied([BeforeWeekday(end), AfterWeekday(start)])
35+
36+
37+
def between_timepoint(start: datetime, end: datetime) -> AllSatisfied:
38+
return AllSatisfied([BeforeTimepoint(end), AfterTimepoint(start)])
39+
40+
41+
def CronJob(
42+
func: T.Callable,
43+
time_condition: TimeCondition,
44+
job_type: T.Union[str, T.Type[Job]] = "process",
45+
time_delta: float = 0.01,
46+
sentinel_attrs: T.Optional[dict] = None,
47+
**attrs
48+
):
49+
"""Submit a job periodically.
50+
51+
Args:
52+
func: The function to be executed.
53+
time_period: The time period.
54+
job_type: The type of the job.
55+
time_delta: The time delta between each
56+
check of the sentinel condition.
57+
sentinel_attrs: The attributes of the sentinel job.
58+
**attrs: The attributes of the job.
59+
"""
60+
sentinel_attrs = sentinel_attrs or {}
61+
if "name" not in sentinel_attrs:
62+
sentinel_attrs["name"] = f"cron-sentinel-{func.__name__}"
63+
sentinel_job = SentinelJob(
64+
func,
65+
time_condition,
66+
job_type,
67+
time_delta,
68+
sentinel_attrs,
69+
**attrs
70+
)
71+
return sentinel_job

0 commit comments

Comments
 (0)