-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_runtime.py
More file actions
121 lines (94 loc) · 4.11 KB
/
test_runtime.py
File metadata and controls
121 lines (94 loc) · 4.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import time
from argparse import Namespace
from collections.abc import Generator
from pathlib import Path
from random import randint
from threading import Thread
import pytest
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig
from cognite.extractorutils.unstable.core.base import ConfigRevision
from cognite.extractorutils.unstable.core.runtime import Runtime
from test_unstable.conftest import TestConfig, TestExtractor
@pytest.fixture
def local_config_file() -> Generator[Path, None, None]:
file = Path(__file__).parent.parent.parent / f"test-{randint(0, 1000000)}.yaml"
with open(file, "w") as f:
f.write("parameter_one: 123\nparameter_two: abc\n")
yield file
file.unlink(missing_ok=True)
def test_load_local_config(connection_config: ConnectionConfig, local_config_file: Path) -> None:
runtime = Runtime(TestExtractor)
runtime._cognite_client = connection_config.get_cognite_client(
f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}"
)
config: TestConfig
config, revision = runtime._try_get_application_config(
args=Namespace(local_override=[local_config_file]),
connection_config=connection_config,
)
assert revision == "local"
assert config.parameter_one == 123
assert config.parameter_two == "abc"
def test_load_cdf_config(connection_config: ConnectionConfig) -> None:
cognite_client = connection_config.get_cognite_client(f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}")
cognite_client.post(
url=f"/api/v1/projects/{cognite_client.config.project}/odin/config",
json={
"externalId": connection_config.integration,
"config": "parameter-one: 123\nparameter-two: abc\n",
},
headers={"cdf-version": "alpha"},
)
runtime = Runtime(TestExtractor)
runtime._cognite_client = cognite_client
config: TestConfig
config, revision = runtime._try_get_application_config(
args=Namespace(local_override=None),
connection_config=connection_config,
)
assert revision == 1
assert config.parameter_one == 123
assert config.parameter_two == "abc"
def test_load_cdf_config_initial_empty(connection_config: ConnectionConfig) -> None:
"""
Test that the runtime can handle an initial empty config, and that it's picked up when it's set
"""
cognite_client = connection_config.get_cognite_client(f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}")
runtime = Runtime(TestExtractor)
runtime._cognite_client = cognite_client
runtime.RETRY_CONFIG_INTERVAL = 1
def set_config_after_delay() -> None:
time.sleep(3)
cognite_client.post(
url=f"/api/v1/projects/{cognite_client.config.project}/odin/config",
json={
"externalId": connection_config.integration,
"config": "parameter-one: 123\nparameter-two: abc\n",
},
headers={"cdf-version": "alpha"},
)
def cancel_after_delay() -> None:
time.sleep(10)
runtime._cancellation_token.cancel()
Thread(target=set_config_after_delay, daemon=True).start()
Thread(target=cancel_after_delay, daemon=True).start()
start_time = time.time()
result: tuple[TestConfig, ConfigRevision] | None = runtime._safe_get_application_config(
args=Namespace(local_override=None),
connection_config=connection_config,
)
duration = time.time() - start_time
assert result is not None
# Duration should not be much higher than sleep before set (3) + retry interval (1)
assert duration < 5
config, revision = result
assert revision == 1
assert config.parameter_one == 123
assert config.parameter_two == "abc"
# There should be one error reported from initially attempting to run without a config
errors = cognite_client.get(
url=f"/api/v1/projects/{cognite_client.config.project}/odin/errors",
params={"integration": connection_config.integration},
).json()
assert len(errors["items"]) == 1
assert "No configuration found for the given integration" in errors["items"][0]["description"]