Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 46 additions & 22 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
OAuthClientCertificate,
OAuthClientCredentials,
)
from cognite.client.data_classes import Asset
from cognite.client.data_classes import Asset, DataSet
from cognite.extractorutils.configtools._util import _load_certificate_data
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.metrics import AbstractMetricsPusher, CognitePusher, PrometheusPusher
Expand Down Expand Up @@ -246,6 +246,27 @@ class IntegrationConfig(ConfigModel):
external_id: str


class EitherIdConfig(ConfigModel):
"""
Configuration parameter representing an ID in CDF, which can either be an external or internal ID.

An EitherId can only hold one ID type, not both.
"""

id: int | None = None
external_id: str | None = None

@property
def either_id(self) -> EitherId:
"""
Returns an EitherId object based on the current configuration.

Raises:
TypeError: If both id and external_id are None, or if both are set.
"""
return EitherId(id=self.id, external_id=self.external_id)


class ConnectionConfig(ConfigModel):
"""
Configuration for connecting to a Cognite Data Fusion project.
Expand Down Expand Up @@ -444,27 +465,6 @@ class LogConsoleHandlerConfig(ConfigModel):
LogHandlerConfig = Annotated[LogFileHandlerConfig | LogConsoleHandlerConfig, Field(discriminator="type")]


class EitherIdConfig(ConfigModel):
"""
Configuration parameter representing an ID in CDF, which can either be an external or internal ID.

An EitherId can only hold one ID type, not both.
"""

id: int | None = None
external_id: str | None = None

@property
def either_id(self) -> EitherId:
"""
Returns an EitherId object based on the current configuration.

Raises:
TypeError: If both id and external_id are None, or if both are set.
"""
return EitherId(id=self.id, external_id=self.external_id)


class _PushGatewayConfig(ConfigModel):
"""
Configuration for pushing metrics to a Prometheus Push Gateway.
Expand Down Expand Up @@ -869,6 +869,30 @@ class ExtractorConfig(ConfigModel):
metrics: MetricsConfig | None = None
log_handlers: list[LogHandlerConfig] = Field(default_factory=_log_handler_default)
retry_startup: bool = True
upload_queue_size: int = 50_000
data_set: EitherIdConfig | None = None
data_set_external_id: str | None = None

def get_data_set(self, cdf_client: CogniteClient) -> DataSet | None:
"""
Retrieves the DataSet object based on the configuration.

Args:
cdf_client: An instance of CogniteClient to use for retrieving the DataSet.

Returns:
DataSet object if data_set, data_set_id, or data_set_external_id is provided; otherwise None.
"""
if self.data_set_external_id:
return cdf_client.data_sets.retrieve(external_id=self.data_set_external_id)

if not self.data_set:
return None

return cdf_client.data_sets.retrieve(
id=self.data_set.either_id.internal_id,
external_id=self.data_set.either_id.external_id,
)


ConfigType = TypeVar("ConfigType", bound=ExtractorConfig)
Expand Down
14 changes: 14 additions & 0 deletions tests/test_unstable/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cognite.extractorutils.unstable.configuration.loaders import ConfigFormat, load_io
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
ExtractorConfig,
LocalStateStoreConfig,
LogConsoleHandlerConfig,
LogFileHandlerConfig,
Expand Down Expand Up @@ -344,3 +345,16 @@ def test_pushgatewayconfig_none_credentials_from_yaml() -> None:
assert pusher.password is None
assert pusher.url == "http://localhost:9091"
assert pusher.job_name == "test-job"


def test_extractor_config_upload_queue_size_with_yaml() -> None:
"""Test upload_queue_size parsing from YAML configuration."""
config_yaml = """
upload-queue-size: 200000
retry-startup: false
"""
stream = StringIO(config_yaml)
config = load_io(stream, ConfigFormat.YAML, ExtractorConfig)

assert config.upload_queue_size == 200_000
assert config.retry_startup is False
83 changes: 83 additions & 0 deletions tests/test_unstable/test_configuration.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import os
from io import StringIO
from unittest.mock import Mock

import pytest
from pydantic import Field

from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes import DataSet
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.unstable.configuration.loaders import ConfigFormat, load_io
from cognite.extractorutils.unstable.configuration.models import (
ConfigModel,
ConnectionConfig,
EitherIdConfig,
ExtractorConfig,
FileSizeConfig,
LogLevel,
TimeIntervalConfig,
Expand Down Expand Up @@ -303,3 +307,82 @@ def test_setting_log_level_from_any_case() -> None:

with pytest.raises(ValueError):
LogLevel("not-a-log-level")


@pytest.mark.parametrize(
"data_set_external_id,data_set_config,expected_call,expected_result_attrs,should_return_none",
[
# Test with data_set_external_id provided
(
"test-dataset",
None,
{"external_id": "test-dataset"},
{"external_id": "test-dataset", "name": "Test Dataset"},
False,
),
# Test with data_set config using internal ID
(
None,
EitherIdConfig(id=12345),
{"id": 12345, "external_id": None},
{"id": 12345, "name": "Test Dataset"},
False,
),
# Test with data_set config using external ID
(
None,
EitherIdConfig(external_id="config-dataset"),
{"id": None, "external_id": "config-dataset"},
{"external_id": "config-dataset", "name": "Config Dataset"},
False,
),
# Test that data_set_external_id takes priority over data_set
(
"priority-dataset",
EitherIdConfig(external_id="should-be-ignored"),
{"external_id": "priority-dataset"},
{"external_id": "priority-dataset", "name": "Priority Dataset"},
False,
),
# Test with neither data_set_external_id nor data_set provided
(
None,
None,
{},
{},
True,
),
],
)
def test_get_data_set_various_configurations(
data_set_external_id: str | None,
data_set_config: EitherIdConfig | None,
expected_call: dict | None,
expected_result_attrs: dict | None,
should_return_none: bool,
) -> None:
"""Test get_data_set method with various configuration scenarios."""
extractor_config = ExtractorConfig(
retry_startup=False,
data_set_external_id=data_set_external_id,
data_set=data_set_config,
)

# Create a mock client instead of using a real one
mock_client = Mock()

if not should_return_none:
mock_dataset = DataSet(**expected_result_attrs)
mock_client.data_sets.retrieve.return_value = mock_dataset

result = extractor_config.get_data_set(mock_client)

if should_return_none:
assert result is None
mock_client.data_sets.retrieve.assert_not_called()
else:
assert result is not None
for attr, value in expected_result_attrs.items():
if attr != "name":
assert getattr(result, attr) == value
mock_client.data_sets.retrieve.assert_called_once_with(**expected_call)
Loading