Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 48 additions & 28 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Annotated, Any, Literal

from humps import kebabize
from pydantic import BaseModel, ConfigDict, Field, GetCoreSchemaHandler
from pydantic import BaseModel, ConfigDict, Field, GetCoreSchemaHandler, field_validator
from pydantic_core import CoreSchema, core_schema
from typing_extensions import assert_never

Expand Down Expand Up @@ -45,29 +45,37 @@ class ConfigModel(BaseModel):
)


class _ClientCredentialsConfig(ConfigModel):
type: Literal["client-credentials"]
class BaseCredentialsConfig(ConfigModel):
client_id: str
scopes: list[str]

@field_validator("scopes", mode="before", json_schema_input_type=str | list[str])
@classmethod
def cast_scopes(cls, scopes: str | list[str]) -> list[str]:
if isinstance(scopes, str):
return [scope.strip() for scope in scopes.split(",")]
return scopes


class _ClientCredentialsConfig(BaseCredentialsConfig):
type: Literal["client-credentials"]
client_secret: str
token_url: str
scopes: list[str]
resource: str | None = None
audience: str | None = None


class _ClientCertificateConfig(ConfigModel):
class _ClientCertificateConfig(BaseCredentialsConfig):
type: Literal["client-certificate"]
client_id: str
path: Path
password: str | None = None
authority_url: str
scopes: list[str]


AuthenticationConfig = Annotated[_ClientCredentialsConfig | _ClientCertificateConfig, Field(discriminator="type")]


class TimeIntervalConfig:
class TimeIntervalConfig(str):
"""
Configuration parameter for setting a time interval
"""
Expand Down Expand Up @@ -139,23 +147,40 @@ def __repr__(self) -> str:
return self._expression


class _ConnectionParameters(ConfigModel):
gzip_compression: bool = False
status_forcelist: list[int] = Field(default_factory=lambda: [429, 502, 503, 504])
max_retries: int = 10
max_retries_connect: int = 3
max_retry_backoff: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))
max_connection_pool_size: int = 50
ssl_verify: bool = True
proxies: dict[str, str] = Field(default_factory=dict)
class RetriesConfig(ConfigModel):
max_retries: int = Field(default=10, ge=-1)
max_backoff: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))
timeout: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))


class SslCertificatesConfig(ConfigModel):
verify: bool = True
allow_list: list[str] | None = None

@field_validator("allow_list", mode="before", json_schema_input_type=str | list[str] | None)
@classmethod
def cast_thumbprints(cls, thumbprints: str | list[str] | None) -> list[str] | None:
if thumbprints is None:
return None
if isinstance(thumbprints, str):
return [scope.strip() for scope in thumbprints.split(",")]
return thumbprints


class _ConnectionParameters(ConfigModel):
retries: RetriesConfig = Field(default_factory=RetriesConfig)
ssl_certificates: SslCertificatesConfig = Field(default_factory=SslCertificatesConfig)


class IntegrationConfig(ConfigModel):
external_id: str


class ConnectionConfig(ConfigModel):
project: str
base_url: str

integration: str
integration: IntegrationConfig

authentication: AuthenticationConfig

Expand All @@ -165,14 +190,9 @@ def get_cognite_client(self, client_name: str) -> CogniteClient:
from cognite.client.config import global_config

global_config.disable_pypi_version_check = True
global_config.disable_gzip = not self.connection.gzip_compression
global_config.status_forcelist = set(self.connection.status_forcelist)
global_config.max_retries = self.connection.max_retries
global_config.max_retries_connect = self.connection.max_retries_connect
global_config.max_retry_backoff = self.connection.max_retry_backoff.seconds
global_config.max_connection_pool_size = self.connection.max_connection_pool_size
global_config.disable_ssl = not self.connection.ssl_verify
global_config.proxies = self.connection.proxies
global_config.max_retries = self.connection.retries.max_retries
global_config.max_retry_backoff = self.connection.retries.max_backoff.seconds
global_config.disable_ssl = not self.connection.ssl_certificates.verify

credential_provider: CredentialProvider
match self.authentication:
Expand Down Expand Up @@ -210,7 +230,7 @@ def get_cognite_client(self, client_name: str) -> CogniteClient:
project=self.project,
base_url=self.base_url,
client_name=client_name,
timeout=self.connection.timeout.seconds,
timeout=self.connection.retries.timeout.seconds,
credentials=credential_provider,
)

Expand Down Expand Up @@ -242,7 +262,7 @@ def from_environment(cls) -> "ConnectionConfig":
return ConnectionConfig(
project=os.environ["COGNITE_PROJECT"],
base_url=os.environ["COGNITE_BASE_URL"],
integration=os.environ["COGNITE_INTEGRATION"],
integration=IntegrationConfig(external_id=os.environ["COGNITE_INTEGRATION"]),
authentication=auth,
)

Expand Down
4 changes: 2 additions & 2 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _checkin(self) -> None:
res = self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/integrations/checkin",
json={
"externalId": self.connection_config.integration,
"externalId": self.connection_config.integration.external_id,
"taskEvents": task_updates,
"errors": error_updates,
},
Expand Down Expand Up @@ -265,7 +265,7 @@ def _report_extractor_info(self) -> None:
self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/integrations/extractorinfo",
json={
"externalId": self.connection_config.integration,
"externalId": self.connection_config.integration.external_id,
"activeConfigRevision": self.current_config_revision,
"extractor": {
"version": self.VERSION,
Expand Down
12 changes: 6 additions & 6 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
load_file,
load_from_cdf,
)
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig
from cognite.extractorutils.unstable.core._dto import Error
from cognite.extractorutils.unstable.core.errors import ErrorLevel
from cognite.extractorutils.util import now

from ._messaging import RuntimeMessage
from .base import ConfigRevision, ConfigType, Extractor, FullConfig
from .base import ConfigRevision, Extractor, FullConfig

__all__ = ["ExtractorType", "Runtime"]

Expand Down Expand Up @@ -141,7 +141,7 @@ def _try_get_application_config(
self,
args: Namespace,
connection_config: ConnectionConfig,
) -> tuple[ConfigType, ConfigRevision]:
) -> tuple[ExtractorConfig, ConfigRevision]:
current_config_revision: ConfigRevision

if args.local_override:
Expand All @@ -162,7 +162,7 @@ def _try_get_application_config(

application_config, current_config_revision = load_from_cdf(
self._cognite_client,
connection_config.integration,
connection_config.integration.external_id,
self._extractor_class.CONFIG_TYPE,
)

Expand All @@ -172,7 +172,7 @@ def _safe_get_application_config(
self,
args: Namespace,
connection_config: ConnectionConfig,
) -> tuple[ConfigType, ConfigRevision] | None:
) -> tuple[ExtractorConfig, ConfigRevision] | None:
prev_error: str | None = None

while not self._cancellation_token.is_cancelled:
Expand Down Expand Up @@ -201,7 +201,7 @@ def _safe_get_application_config(
self._cognite_client.post(
f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin",
json={
"externalId": connection_config.integration,
"externalId": connection_config.integration.external_id,
"errors": [error.model_dump()],
},
headers={"cdf-version": "alpha"},
Expand Down
3 changes: 2 additions & 1 deletion tests/test_unstable/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
ExtractorConfig,
IntegrationConfig,
_ClientCredentialsConfig,
)
from cognite.extractorutils.unstable.core.base import Extractor
Expand Down Expand Up @@ -75,7 +76,7 @@ def connection_config(extraction_pipeline: str) -> ConnectionConfig:
return ConnectionConfig(
project=os.environ["COGNITE_DEV_PROJECT"],
base_url=os.environ["COGNITE_DEV_BASE_URL"],
integration=extraction_pipeline,
integration=IntegrationConfig(external_id=extraction_pipeline),
authentication=_ClientCredentialsConfig(
type="client-credentials",
client_id=os.environ.get("COGNITE_DEV_CLIENT_ID", os.environ["COGNITE_CLIENT_ID"]),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_unstable/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_simple_task_report(

# Test that the task run is entered into the history for that task
res = extractor.cognite_client.get(
f"/api/v1/projects/{extractor.cognite_client.config.project}/integrations/history?integration={connection_config.integration}&taskName=TestTask",
f"/api/v1/projects/{extractor.cognite_client.config.project}/integrations/history?integration={connection_config.integration.external_id}&taskName=TestTask",
headers={"cdf-version": "alpha"},
).json()

Expand Down
Loading
Loading