-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_scheduler.py
More file actions
134 lines (100 loc) · 3.55 KB
/
test_scheduler.py
File metadata and controls
134 lines (100 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from threading import Thread
from time import sleep, time
from pytest import approx
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import IntervalConfig, TimeIntervalConfig
from cognite.extractorutils.unstable.scheduling._scheduler import TaskScheduler
from .conftest import MockFunction
def test_interval_schedules() -> None:
ct = CancellationToken()
mock = MockFunction(sleep_time=1)
scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("3s")),
task=mock,
)
start = time()
Thread(target=scheduler.run).start()
sleep(7)
scheduler.stop()
assert len(mock.called_times) == 3
assert mock.called_times[0] == approx(start)
assert mock.called_times[1] == approx(mock.called_times[0] + 3)
assert mock.called_times[2] == approx(mock.called_times[1] + 3)
def test_overlapping_schedules() -> None:
"""
Test with a trigger that fires when the job is still running
Timeline:
time | 0 1 2 3 4 5 6 7 8 9 STOP
--------|-------------------------
trigger | x x x x x
job | |-----| |-----| |-----|
"""
ct = CancellationToken()
mock = MockFunction(sleep_time=3)
scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("2s")),
task=mock,
)
start = time()
Thread(target=scheduler.run).start()
sleep(9)
scheduler.stop()
assert len(mock.called_times) == 3
assert mock.called_times[0] == approx(start)
assert mock.called_times[1] == approx(mock.called_times[0] + 4)
assert mock.called_times[1] == approx(mock.called_times[1] + 4)
def test_manual() -> None:
ct = CancellationToken()
mock = MockFunction(sleep_time=0)
scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("1h")),
task=mock,
)
Thread(target=scheduler.run).start()
sleep(0.1)
scheduler.trigger("test")
sleep(0.1)
scheduler.trigger("test")
sleep(0.1)
scheduler.trigger("test")
sleep(1)
scheduler.stop()
assert len(mock.called_times) == 4
def test_manual_interval_mix() -> None:
"""
Test with a scheduled trigger mixed with manual trigger, make sure there's no overlap
Timeline:
time | 0 1 2 3 4 5 6 7 8 9 STOP
---------|-------------------------
schedule | x x x
manual | x x
job | |---| |---| |---|
"""
ct = CancellationToken()
mock = MockFunction(sleep_time=2)
scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("4s")),
task=mock,
)
start = time()
Thread(target=scheduler.run).start()
sleep(1)
first_trigger = scheduler.trigger("test")
sleep(2)
second_trigger = scheduler.trigger("test")
sleep(6)
scheduler.stop()
assert not first_trigger
assert second_trigger
assert len(mock.called_times) == 3
assert mock.called_times[0] == approx(start)
assert mock.called_times[1] == approx(start + 3)
assert mock.called_times[2] == approx(start + 8)