-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_base.py
More file actions
159 lines (133 loc) · 4.84 KB
/
test_base.py
File metadata and controls
159 lines (133 loc) · 4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from time import sleep
import pytest
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
IntervalConfig,
LogConsoleHandlerConfig,
LogLevel,
TimeIntervalConfig,
)
from cognite.extractorutils.unstable.core.base import FullConfig
from cognite.extractorutils.unstable.core.tasks import ScheduledTask, TaskContext
from cognite.extractorutils.util import now
from .conftest import MockFunction, TestConfig, TestExtractor
@pytest.mark.parametrize("checkin_between", [True, False])
def test_simple_task_report(
connection_config: ConnectionConfig,
application_config: TestConfig,
checkin_between: bool,
) -> None:
mock = MockFunction(5)
# Create a simple test extractor
extractor = TestExtractor(
FullConfig(
connection_config=connection_config,
application_config=application_config,
current_config_revision=1,
)
)
extractor.add_task(
ScheduledTask(
name="TestTask",
target=lambda _t: mock(),
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("15m")),
)
)
# Do parts of a startup routine
start_time = now()
extractor._report_extractor_info()
assert extractor._task_updates == []
# Manually trigger task, wait a bit to make sure it has started
extractor._scheduler.trigger("TestTask")
sleep(1)
# Test that the start of the task was tracked correctly
assert len(extractor._task_updates) == 1
assert extractor._task_updates[0].type == "started"
assert extractor._task_updates[0].name == "TestTask"
assert start_time <= extractor._task_updates[0].timestamp < now()
if checkin_between:
assert len(extractor._task_updates) == 1
extractor._checkin()
# Check that the update queue is cleared on a successful checkin
assert len(extractor._task_updates) == 0
mid_way = now()
sleep(5)
if checkin_between:
assert len(extractor._task_updates) == 1
else:
assert len(extractor._task_updates) == 2
end_time = now()
# Test that the end of the task was tracked correctly
assert extractor._task_updates[-1].type == "ended"
assert extractor._task_updates[-1].name == "TestTask"
assert mid_way < extractor._task_updates[-1].timestamp < end_time
# Make sure all the changes are checked in
extractor._checkin()
assert extractor._task_updates == []
# Test that the task run is entered into the history for that task
res = extractor.cognite_client.get(
f"/api/v1/projects/{extractor.cognite_client.config.project}/integrations/history?integration={connection_config.integration.external_id}&taskName=TestTask",
headers={"cdf-version": "alpha"},
).json()
assert len(res["items"]) == 1
assert res["items"][0]["taskName"] == "TestTask"
assert res["items"][0]["errorCount"] == 0
assert start_time <= res["items"][0]["startTime"] < mid_way
assert mid_way < res["items"][0]["endTime"] < end_time
@pytest.mark.parametrize(
"config_level, override_level, expected_logs, unexpected_logs",
[
(
"INFO",
None,
["This is an info message.", "This is a warning message."],
["This is a debug message."],
),
(
"INFO",
"DEBUG",
["This is a debug message.", "This is an info message.", "This is a warning message."],
[],
),
(
"INFO",
"WARNING",
["This is a warning message."],
["This is a debug message.", "This is an info message."],
),
],
)
def test_log_level_override(
capsys: pytest.CaptureFixture[str],
connection_config: ConnectionConfig,
config_level: str,
override_level: str | None,
expected_logs: list[str],
unexpected_logs: list[str],
) -> None:
"""
Tests that the log level override parameter correctly overrides the log level
set in the application configuration.
"""
app_config = TestConfig(
parameter_one=1,
parameter_two="a",
log_handlers=[LogConsoleHandlerConfig(type="console", level=LogLevel(config_level))],
)
full_config = FullConfig(
connection_config=connection_config,
application_config=app_config,
current_config_revision=1,
log_level_override=override_level,
)
extractor = TestExtractor(full_config)
with extractor:
startup_task = next(t for t in extractor._tasks if t.name == "log_task")
task_context = TaskContext(task=startup_task, extractor=extractor)
startup_task.target(task_context)
captured = capsys.readouterr()
console_output = captured.err
for log in expected_logs:
assert log in console_output
for log in unexpected_logs:
assert log not in console_output