Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## Next

### Added

* In the `unstable` package: Proper retries when fetching configs from CDF

## 7.5.13

### Security
Expand Down
18 changes: 12 additions & 6 deletions cognite/extractorutils/unstable/configuration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import ValidationError

from cognite.client import CogniteClient
from cognite.client.exceptions import CogniteAPIError
from cognite.extractorutils.configtools.loaders import _load_yaml_dict_raw
from cognite.extractorutils.exceptions import InvalidConfigError as OldInvalidConfigError
from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError
Expand Down Expand Up @@ -41,12 +42,17 @@ def load_from_cdf(
params: dict[str, str | int] = {"integration": external_id}
if revision:
params["revision"] = revision
response = cognite_client.get(
f"/api/v1/projects/{cognite_client.config.project}/odin/config",
params=params,
headers={"cdf-version": "alpha"},
)
response.raise_for_status()
try:
response = cognite_client.get(
f"/api/v1/projects/{cognite_client.config.project}/odin/config",
params=params,
headers={"cdf-version": "alpha"},
)
except CogniteAPIError as e:
if e.code == 404:
raise InvalidConfigError("No configuration found for the given integration") from e
raise e

data = response.json()

try:
Expand Down
105 changes: 75 additions & 30 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,29 @@
from argparse import ArgumentParser, Namespace
from multiprocessing import Process, Queue
from pathlib import Path
from random import randint
from typing import Any, Generic, TypeVar
from uuid import uuid4

from requests.exceptions import ConnectionError
from typing_extensions import assert_never

from cognite.client.exceptions import CogniteAPIError, CogniteAuthError, CogniteConnectionError
from cognite.client import CogniteClient
from cognite.client.exceptions import (
CogniteAPIError,
CogniteAuthError,
CogniteConnectionError,
)
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError
from cognite.extractorutils.unstable.configuration.loaders import load_file, load_from_cdf
from cognite.extractorutils.unstable.configuration.loaders import (
load_file,
load_from_cdf,
)
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig
from cognite.extractorutils.unstable.core._dto import Error
from cognite.extractorutils.unstable.core.errors import ErrorLevel
from cognite.extractorutils.util import now

from ._messaging import RuntimeMessage
from .base import ConfigRevision, ConfigType, Extractor, FullConfig
Expand All @@ -26,6 +38,8 @@


class Runtime(Generic[ExtractorType]):
RETRY_CONFIG_INTERVAL = 30

def __init__(
self,
extractor: type[ExtractorType],
Expand All @@ -37,6 +51,8 @@ def __init__(
self.logger = logging.getLogger(f"{self._extractor_class.EXTERNAL_ID}.runtime")
self._setup_logging()

self._cognite_client: CogniteClient

def _create_argparser(self) -> ArgumentParser:
argparser = ArgumentParser(
prog=sys.argv[0],
Expand Down Expand Up @@ -121,7 +137,7 @@ def _spawn_extractor(
self.logger.info(f"Started extractor with PID {process.pid}")
return process

def _get_application_config(
def _try_get_application_config(
self,
args: Namespace,
connection_config: ConnectionConfig,
Expand All @@ -143,39 +159,65 @@ def _get_application_config(

else:
self.logger.info("Loading application config from CDF")
client = connection_config.get_cognite_client(
f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}"

application_config, current_config_revision = load_from_cdf(
self._cognite_client,
connection_config.integration,
self._extractor_class.CONFIG_TYPE,
)

errors: list[Error] = []
return application_config, current_config_revision

def _safe_get_application_config(
self,
args: Namespace,
connection_config: ConnectionConfig,
) -> tuple[ConfigType, ConfigRevision] | None:
prev_error: str | None = None

while not self._cancellation_token.is_cancelled:
try:
application_config, current_config_revision = load_from_cdf(
client,
connection_config.integration,
self._extractor_class.CONFIG_TYPE,
return self._try_get_application_config(args, connection_config)

except Exception as e:
error_message = str(e)
if error_message == prev_error:
# Same error as before, no need to log it again
self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL))
continue
prev_error = error_message

ts = now()
error = Error(
external_id=str(uuid4()),
level=ErrorLevel.fatal.value,
start_time=ts,
end_time=ts,
description=error_message,
details=None,
task=None,
)

finally:
if errors:
client.post(
f"/api/v1/projects/{client.config.project}/odin/checkin",
json={
"externalId": connection_config.integration,
"errors": [e.model_dump() for e in errors],
},
headers={"cdf-version": "alpha"},
)
self._cognite_client.post(
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
json={
"externalId": connection_config.integration,
"errors": [error.model_dump()],
},
headers={"cdf-version": "alpha"},
)

return application_config, current_config_revision
self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL))

return None

def _verify_connection_config(self, connection_config: ConnectionConfig) -> bool:
client = connection_config.get_cognite_client(
self._cognite_client = connection_config.get_cognite_client(
f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}"
)
try:
client.post(
f"/api/v1/projects/{client.config.project}/odin/checkin",
self._cognite_client.post(
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
json={
"externalId": connection_config.integration,
},
Expand Down Expand Up @@ -234,16 +276,19 @@ def run(self) -> None:
if not args.skip_init_checks and not self._verify_connection_config(connection_config):
sys.exit(1)

# This has to be Any. We don't know the type of the extractors' config at type checking since the sel doesn't
# This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't
# exist yet, and I have not found a way to represent it in a generic way that isn't just an Any in disguise.
application_config: Any
config: tuple[Any, ConfigRevision] | None

while not self._cancellation_token.is_cancelled:
try:
application_config, current_config_revision = self._get_application_config(args, connection_config)
config = self._safe_get_application_config(args, connection_config)
if config is None:
if self._cancellation_token.is_cancelled:
break
continue

except InvalidConfigError:
self.logger.critical("Could not get a valid application config file. Shutting down")
sys.exit(1)
application_config, current_config_revision = config

# Start extractor in separate process, and wait for it to end
process = self._spawn_extractor(
Expand Down
121 changes: 121 additions & 0 deletions tests/test_unstable/test_runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import time
from argparse import Namespace
from collections.abc import Generator
from pathlib import Path
from random import randint
from threading import Thread

import pytest

from cognite.extractorutils.unstable.configuration.models import ConnectionConfig
from cognite.extractorutils.unstable.core.base import ConfigRevision
from cognite.extractorutils.unstable.core.runtime import Runtime
from test_unstable.conftest import TestConfig, TestExtractor


@pytest.fixture
def local_config_file() -> Generator[Path, None, None]:
file = Path(__file__).parent.parent.parent / f"test-{randint(0, 1000000)}.yaml"
with open(file, "w") as f:
f.write("parameter_one: 123\nparameter_two: abc\n")

yield file

file.unlink(missing_ok=True)


def test_load_local_config(connection_config: ConnectionConfig, local_config_file: Path) -> None:
runtime = Runtime(TestExtractor)
runtime._cognite_client = connection_config.get_cognite_client(
f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}"
)

config: TestConfig
config, revision = runtime._try_get_application_config(
args=Namespace(local_override=[local_config_file]),
connection_config=connection_config,
)

assert revision == "local"
assert config.parameter_one == 123
assert config.parameter_two == "abc"


def test_load_cdf_config(connection_config: ConnectionConfig) -> None:
cognite_client = connection_config.get_cognite_client(f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}")
cognite_client.post(
url=f"/api/v1/projects/{cognite_client.config.project}/odin/config",
json={
"externalId": connection_config.integration,
"config": "parameter-one: 123\nparameter-two: abc\n",
},
headers={"cdf-version": "alpha"},
)

runtime = Runtime(TestExtractor)
runtime._cognite_client = cognite_client

config: TestConfig
config, revision = runtime._try_get_application_config(
args=Namespace(local_override=None),
connection_config=connection_config,
)

assert revision == 1
assert config.parameter_one == 123
assert config.parameter_two == "abc"


def test_load_cdf_config_initial_empty(connection_config: ConnectionConfig) -> None:
"""
Test that the runtime can handle an initial empty config, and that it's picked up when it's set
"""
cognite_client = connection_config.get_cognite_client(f"{TestExtractor.EXTERNAL_ID}-{TestExtractor.VERSION}")

runtime = Runtime(TestExtractor)
runtime._cognite_client = cognite_client
runtime.RETRY_CONFIG_INTERVAL = 1

def set_config_after_delay() -> None:
time.sleep(3)
cognite_client.post(
url=f"/api/v1/projects/{cognite_client.config.project}/odin/config",
json={
"externalId": connection_config.integration,
"config": "parameter-one: 123\nparameter-two: abc\n",
},
headers={"cdf-version": "alpha"},
)

def cancel_after_delay() -> None:
time.sleep(10)
runtime._cancellation_token.cancel()

Thread(target=set_config_after_delay, daemon=True).start()
Thread(target=cancel_after_delay, daemon=True).start()

start_time = time.time()
result: tuple[TestConfig, ConfigRevision] | None = runtime._safe_get_application_config(
args=Namespace(local_override=None),
connection_config=connection_config,
)
duration = time.time() - start_time

assert result is not None

# Duration should not be much higher than sleep before set (3) + retry interval (1)
assert duration < 5

config, revision = result
assert revision == 1
assert config.parameter_one == 123
assert config.parameter_two == "abc"

# There should be one error reported from initially attempting to run without a config
errors = cognite_client.get(
url=f"/api/v1/projects/{cognite_client.config.project}/odin/errors",
params={"integration": connection_config.integration},
).json()

assert len(errors["items"]) == 1
assert "No configuration found for the given integration" in errors["items"][0]["description"]
Loading