Skip to content

Commit 0f2aca9

Browse files
authored
Improve fetching of application config in unstable (#434)
If a config doesn't exist in the integrations api, don't fail but enter a retry loop (until canceled).
1 parent e46868f commit 0f2aca9

File tree

4 files changed

+214
-36
lines changed

4 files changed

+214
-36
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ Changes are grouped as follows
1212
- `Fixed` for any bug fixes.
1313
- `Security` in case of vulnerabilities.
1414

15+
## Next
16+
17+
### Added
18+
19+
* In the `unstable` package: Proper retries when fetching configs from CDF
20+
1521
## 7.5.13
1622

1723
### Security

cognite/extractorutils/unstable/configuration/loaders.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from pydantic import ValidationError
88

99
from cognite.client import CogniteClient
10+
from cognite.client.exceptions import CogniteAPIError
1011
from cognite.extractorutils.configtools.loaders import _load_yaml_dict_raw
1112
from cognite.extractorutils.exceptions import InvalidConfigError as OldInvalidConfigError
1213
from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError
@@ -41,12 +42,17 @@ def load_from_cdf(
4142
params: dict[str, str | int] = {"integration": external_id}
4243
if revision:
4344
params["revision"] = revision
44-
response = cognite_client.get(
45-
f"/api/v1/projects/{cognite_client.config.project}/odin/config",
46-
params=params,
47-
headers={"cdf-version": "alpha"},
48-
)
49-
response.raise_for_status()
45+
try:
46+
response = cognite_client.get(
47+
f"/api/v1/projects/{cognite_client.config.project}/odin/config",
48+
params=params,
49+
headers={"cdf-version": "alpha"},
50+
)
51+
except CogniteAPIError as e:
52+
if e.code == 404:
53+
raise InvalidConfigError("No configuration found for the given integration") from e
54+
raise e
55+
5056
data = response.json()
5157

5258
try:

cognite/extractorutils/unstable/core/runtime.py

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,29 @@
55
from argparse import ArgumentParser, Namespace
66
from multiprocessing import Process, Queue
77
from pathlib import Path
8+
from random import randint
89
from typing import Any, Generic, TypeVar
10+
from uuid import uuid4
911

1012
from requests.exceptions import ConnectionError
1113
from typing_extensions import assert_never
1214

13-
from cognite.client.exceptions import CogniteAPIError, CogniteAuthError, CogniteConnectionError
15+
from cognite.client import CogniteClient
16+
from cognite.client.exceptions import (
17+
CogniteAPIError,
18+
CogniteAuthError,
19+
CogniteConnectionError,
20+
)
1421
from cognite.extractorutils.threading import CancellationToken
1522
from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError
16-
from cognite.extractorutils.unstable.configuration.loaders import load_file, load_from_cdf
23+
from cognite.extractorutils.unstable.configuration.loaders import (
24+
load_file,
25+
load_from_cdf,
26+
)
1727
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig
1828
from cognite.extractorutils.unstable.core._dto import Error
29+
from cognite.extractorutils.unstable.core.errors import ErrorLevel
30+
from cognite.extractorutils.util import now
1931

2032
from ._messaging import RuntimeMessage
2133
from .base import ConfigRevision, ConfigType, Extractor, FullConfig
@@ -26,6 +38,8 @@
2638

2739

2840
class Runtime(Generic[ExtractorType]):
41+
RETRY_CONFIG_INTERVAL = 30
42+
2943
def __init__(
3044
self,
3145
extractor: type[ExtractorType],
@@ -37,6 +51,8 @@ def __init__(
3751
self.logger = logging.getLogger(f"{self._extractor_class.EXTERNAL_ID}.runtime")
3852
self._setup_logging()
3953

54+
self._cognite_client: CogniteClient
55+
4056
def _create_argparser(self) -> ArgumentParser:
4157
argparser = ArgumentParser(
4258
prog=sys.argv[0],
@@ -121,7 +137,7 @@ def _spawn_extractor(
121137
self.logger.info(f"Started extractor with PID {process.pid}")
122138
return process
123139

124-
def _get_application_config(
140+
def _try_get_application_config(
125141
self,
126142
args: Namespace,
127143
connection_config: ConnectionConfig,
@@ -143,39 +159,65 @@ def _get_application_config(
143159

144160
else:
145161
self.logger.info("Loading application config from CDF")
146-
client = connection_config.get_cognite_client(
147-
f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}"
162+
163+
application_config, current_config_revision = load_from_cdf(
164+
self._cognite_client,
165+
connection_config.integration,
166+
self._extractor_class.CONFIG_TYPE,
148167
)
149168

150-
errors: list[Error] = []
169+
return application_config, current_config_revision
170+
171+
def _safe_get_application_config(
172+
self,
173+
args: Namespace,
174+
connection_config: ConnectionConfig,
175+
) -> tuple[ConfigType, ConfigRevision] | None:
176+
prev_error: str | None = None
151177

178+
while not self._cancellation_token.is_cancelled:
152179
try:
153-
application_config, current_config_revision = load_from_cdf(
154-
client,
155-
connection_config.integration,
156-
self._extractor_class.CONFIG_TYPE,
180+
return self._try_get_application_config(args, connection_config)
181+
182+
except Exception as e:
183+
error_message = str(e)
184+
if error_message == prev_error:
185+
# Same error as before, no need to log it again
186+
self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL))
187+
continue
188+
prev_error = error_message
189+
190+
ts = now()
191+
error = Error(
192+
external_id=str(uuid4()),
193+
level=ErrorLevel.fatal.value,
194+
start_time=ts,
195+
end_time=ts,
196+
description=error_message,
197+
details=None,
198+
task=None,
157199
)
158200

159-
finally:
160-
if errors:
161-
client.post(
162-
f"/api/v1/projects/{client.config.project}/odin/checkin",
163-
json={
164-
"externalId": connection_config.integration,
165-
"errors": [e.model_dump() for e in errors],
166-
},
167-
headers={"cdf-version": "alpha"},
168-
)
201+
self._cognite_client.post(
202+
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
203+
json={
204+
"externalId": connection_config.integration,
205+
"errors": [error.model_dump()],
206+
},
207+
headers={"cdf-version": "alpha"},
208+
)
169209

170-
return application_config, current_config_revision
210+
self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL))
211+
212+
return None
171213

172214
def _verify_connection_config(self, connection_config: ConnectionConfig) -> bool:
173-
client = connection_config.get_cognite_client(
215+
self._cognite_client = connection_config.get_cognite_client(
174216
f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}"
175217
)
176218
try:
177-
client.post(
178-
f"/api/v1/projects/{client.config.project}/odin/checkin",
219+
self._cognite_client.post(
220+
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
179221
json={
180222
"externalId": connection_config.integration,
181223
},
@@ -234,16 +276,19 @@ def run(self) -> None:
234276
if not args.skip_init_checks and not self._verify_connection_config(connection_config):
235277
sys.exit(1)
236278

237-
# This has to be Any. We don't know the type of the extractors' config at type checking since the sel doesn't
279+
# This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't
238280
# exist yet, and I have not found a way to represent it in a generic way that isn't just an Any in disguise.
239281
application_config: Any
282+
config: tuple[Any, ConfigRevision] | None
283+
240284
while not self._cancellation_token.is_cancelled:
241-
try:
242-
application_config, current_config_revision = self._get_application_config(args, connection_config)
285+
config = self._safe_get_application_config(args, connection_config)
286+
if config is None:
287+
if self._cancellation_token.is_cancelled:
288+
break
289+
continue
243290

244-
except InvalidConfigError:
245-
self.logger.critical("Could not get a valid application config file. Shutting down")
246-
sys.exit(1)
291+
application_config, current_config_revision = config
247292

248293
# Start extractor in separate process, and wait for it to end
249294
process = self._spawn_extractor(
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import time
2+
from argparse import Namespace
3+
from collections.abc import Generator
4+
from pathlib import Path
5+
from random import randint
6+
from threading import Thread
7+
8+
import pytest
9+
10+
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig
11+
from cognite.extractorutils.unstable.core.base import ConfigRevision
12+
from cognite.extractorutils.unstable.core.runtime import Runtime
13+
from test_unstable.conftest import TestConfig, TestExtractor
14+
15+
16+
@pytest.fixture
17+
def local_config_file() -> Generator[Path, None, None]:
18+
file = Path(__file__).parent.parent.parent / f"test-{randint(0, 1000000)}.yaml"
19+
with open(file, "w") as f:
20+
f.write("parameter_one: 123\nparameter_two: abc\n")
21+
22+
yield file
23+
24+
file.unlink(missing_ok=True)
25+
26+
27+
def test_load_local_config(connection_config: ConnectionConfig, local_config_file: Path) -> None:
28+
runtime = Runtime(TestExtractor)
29+
runtime._cognite_client = connection_config.get_cognite_client(
30+
f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}"
31+
)
32+
33+
config: TestConfig
34+
config, revision = runtime._try_get_application_config(
35+
args=Namespace(local_override=[local_config_file]),
36+
connection_config=connection_config,
37+
)
38+
39+
assert revision == "local"
40+
assert config.parameter_one == 123
41+
assert config.parameter_two == "abc"
42+
43+
44+
def test_load_cdf_config(connection_config: ConnectionConfig) -> None:
45+
cognite_client = connection_config.get_cognite_client(f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}")
46+
cognite_client.post(
47+
url=f"/api/v1/projects/{cognite_client.config.project}/odin/config",
48+
json={
49+
"externalId": connection_config.integration,
50+
"config": "parameter-one: 123\nparameter-two: abc\n",
51+
},
52+
headers={"cdf-version": "alpha"},
53+
)
54+
55+
runtime = Runtime(TestExtractor)
56+
runtime._cognite_client = cognite_client
57+
58+
config: TestConfig
59+
config, revision = runtime._try_get_application_config(
60+
args=Namespace(local_override=None),
61+
connection_config=connection_config,
62+
)
63+
64+
assert revision == 1
65+
assert config.parameter_one == 123
66+
assert config.parameter_two == "abc"
67+
68+
69+
def test_load_cdf_config_initial_empty(connection_config: ConnectionConfig) -> None:
70+
"""
71+
Test that the runtime can handle an initial empty config, and that it's picked up when it's set
72+
"""
73+
cognite_client = connection_config.get_cognite_client(f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}")
74+
75+
runtime = Runtime(TestExtractor)
76+
runtime._cognite_client = cognite_client
77+
runtime.RETRY_CONFIG_INTERVAL = 1
78+
79+
def set_config_after_delay() -> None:
80+
time.sleep(3)
81+
cognite_client.post(
82+
url=f"/api/v1/projects/{cognite_client.config.project}/odin/config",
83+
json={
84+
"externalId": connection_config.integration,
85+
"config": "parameter-one: 123\nparameter-two: abc\n",
86+
},
87+
headers={"cdf-version": "alpha"},
88+
)
89+
90+
def cancel_after_delay() -> None:
91+
time.sleep(10)
92+
runtime._cancellation_token.cancel()
93+
94+
Thread(target=set_config_after_delay, daemon=True).start()
95+
Thread(target=cancel_after_delay, daemon=True).start()
96+
97+
start_time = time.time()
98+
result: tuple[TestConfig, ConfigRevision] | None = runtime._safe_get_application_config(
99+
args=Namespace(local_override=None),
100+
connection_config=connection_config,
101+
)
102+
duration = time.time() - start_time
103+
104+
assert result is not None
105+
106+
# Duration should not be much higher than sleep before set (3) + retry interval (1)
107+
assert duration < 5
108+
109+
config, revision = result
110+
assert revision == 1
111+
assert config.parameter_one == 123
112+
assert config.parameter_two == "abc"
113+
114+
# There should be one error reported from initially attempting to run without a config
115+
errors = cognite_client.get(
116+
url=f"/api/v1/projects/{cognite_client.config.project}/odin/errors",
117+
params={"integration": connection_config.integration},
118+
).json()
119+
120+
assert len(errors["items"]) == 1
121+
assert "No configuration found for the given integration" in errors["items"][0]["description"]

0 commit comments

Comments
 (0)