Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## Next
## 7.6.0

### Added

* Added a common method `_report_run` that can be called from any extractors using this library to report a extractor run into Extractor Pipelines. This automatically handles truncating the message to less than 1000 characters if required.

### Fixed

* Fixed `_report_success` calls failing if the `success_message` was longer than 1000 characters

### Changed

* In the `unstable` package: `-l` / `--local-override` is renamed to `-f` / `--force-local-config`


## 7.5.14

### Added
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.5.14"
__version__ = "7.6.0"
from .base import Extractor

__all__ = ["Extractor"]
97 changes: 73 additions & 24 deletions cognite/extractorutils/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Module containing the base class for extractors.
"""

# Copyright 2021 Cognite AS
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -22,20 +23,32 @@
from collections.abc import Callable
from dataclasses import is_dataclass
from enum import Enum
from textwrap import shorten
from threading import Thread
from types import TracebackType
from typing import Any, Generic, TypeVar
from typing import Any, Generic, Literal, TypeVar

from dotenv import load_dotenv

from cognite.client import CogniteClient
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
from cognite.extractorutils.configtools import BaseConfig, ConfigResolver, StateStoreConfig
from cognite.client.exceptions import CogniteAPIError
from cognite.extractorutils.configtools import (
BaseConfig,
ConfigResolver,
StateStoreConfig,
)
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.statestore import AbstractStateStore, LocalStateStore, NoStateStore
from cognite.extractorutils.statestore import (
AbstractStateStore,
LocalStateStore,
NoStateStore,
)
from cognite.extractorutils.threading import CancellationToken

ReportStatus = Literal["success", "failure", "seen"]


class ReloadConfigAction(Enum):
"""
Expand Down Expand Up @@ -227,19 +240,53 @@ def recursive_find_state_store(d: dict[str, Any]) -> StateStoreConfig | None:

Extractor._statestore_singleton = self.state_store

def _report_success(self) -> None:
def _report_run(self, status: ReportStatus, message: str) -> None:
"""
Called on a successful exit of the extractor.
Report the status of the extractor run to the extraction pipeline.

Args:
status: Status of the run, either success or failure or seen
message: Message to report to the extraction pipeline
"""
MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000
if self.extraction_pipeline:
self.logger.info("Reporting new successful run")
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
try:
message = message or ""
shortened_message = shorten(
message,
width=MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN,
placeholder="...",
)

run = ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id,
status="success",
message=self.success_message,
status=status,
message=shortened_message,
)
)

self.logger.info(f"Reporting new {status} run: {message}")
self.cognite_client.extraction_pipelines.runs.create(run)
except CogniteAPIError as e:
self.logger.exception(f"Error while reporting run - status {status} - message {message} . Error: {e!s}")

def _report_success(self, message: str | None = None) -> None:
"""
Called on a successful exit of the extractor.

Args:
message: Message to report to the extraction pipeline. If not provided, Extractor.success_message is taken.
"""
message = message or self.success_message
self._report_run("success", message)

def _report_failure(self, message: str) -> None:
"""
Called on an unsuccessful exit of the extractor.

Args:
message: Message to report to the extraction pipeline
"""
self._report_run("failure", message)

def _report_error(self, exception: BaseException) -> None:
"""
Expand All @@ -248,16 +295,8 @@ def _report_error(self, exception: BaseException) -> None:
Args:
exception: Exception object that caused the extractor to fail
"""
self.logger.error("Unexpected error during extraction", exc_info=exception)
if self.extraction_pipeline:
message = f"{type(exception).__name__}: {exception!s}"[:1000]

self.logger.info(f"Reporting new failed run: {message}")
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id, status="failure", message=message
)
)
message = f"{type(exception).__name__}: {exception!s}"
self._report_run("failure", message)

def __enter__(self) -> "Extractor":
"""
Expand All @@ -279,7 +318,9 @@ def __enter__(self) -> "Extractor":
try:
self._initial_load_config(override_path=self.config_file_path)
except InvalidConfigError as e:
print("Critical error: Could not read config file", file=sys.stderr) # noqa: T201
print( # noqa: T201
"Critical error: Could not read config file", file=sys.stderr
)
print(str(e), file=sys.stderr) # noqa: T201
sys.exit(1)

Expand Down Expand Up @@ -338,7 +379,10 @@ def heartbeat_loop() -> None:
return self

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
"""
Shuts down the extractor. Makes sure states are preserved, that all uploads of data and metrics are done, etc.
Expand Down Expand Up @@ -381,7 +425,12 @@ def run(self) -> None:
if not self.started:
raise ValueError("You must run the extractor in a context manager")
if self.run_handle:
self.run_handle(self.cognite_client, self.state_store, self.config, self.cancellation_token)
self.run_handle(
self.cognite_client,
self.state_store,
self.config,
self.cancellation_token,
)
else:
raise ValueError("No run_handle defined")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cognite-extractor-utils"
version = "7.5.14"
version = "7.6.0"
description = "Utilities for easier development of extractors for CDF"
authors = [
{name = "Mathias Lohne", email = "mathias.lohne@cognite.com"}
Expand Down
153 changes: 152 additions & 1 deletion tests/tests_unit/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections.abc import Callable
from dataclasses import dataclass, field
from unittest.mock import patch
from textwrap import shorten
from unittest.mock import Mock, patch

import pytest

from cognite.client import CogniteClient
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
from cognite.extractorutils import Extractor
from cognite.extractorutils.configtools import BaseConfig, StateStoreConfig
from cognite.extractorutils.statestore import LocalStateStore, NoStateStore
Expand Down Expand Up @@ -126,3 +129,151 @@ def test_state_store_getter() -> None:
e5._load_state_store()

assert isinstance(Extractor.get_current_statestore(), LocalStateStore)


@patch("cognite.client.CogniteClient")
def test_report_success(
get_client_mock: Callable[[], CogniteClient],
) -> None:
print("Report success test")

EXTRACTION_PIPELINE = "test_extraction_pipeline"
MESSAGE = "test message"

def validate_message(run: ExtractionPipelineRun):
print(f"Validating message: {run.message}")
assert run.extpipe_external_id == EXTRACTION_PIPELINE
assert run.status == "success"
assert run.message == MESSAGE, "Message does not match expected value"

extractor = Extractor(
name="extractor_test_report_success",
description="description",
config_class=ConfigWithoutStates,
)
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
extractor.cognite_client = get_client_mock()
extractor.logger = logging.getLogger("test_logger")

extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_message)

extractor._report_success(message=MESSAGE)
extractor.extraction_pipeline = None


@patch("cognite.client.CogniteClient")
def test_report_failure(
get_client_mock: Callable[[], CogniteClient],
) -> None:
print("Report success test")

EXTRACTION_PIPELINE = "test_extraction_pipeline"
MESSAGE = "test message"

def validate_message(run: ExtractionPipelineRun):
print(f"Validating message: {run.message}")
assert run.extpipe_external_id == EXTRACTION_PIPELINE
assert run.status == "failure"
assert run.message == MESSAGE, "Message does not match expected value"

extractor = Extractor(
name="extractor_test_report_failure",
description="description",
config_class=ConfigWithoutStates,
)
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
extractor.cognite_client = get_client_mock()
extractor.logger = logging.getLogger("test_logger")

extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_message)

extractor._report_failure(message=MESSAGE)
extractor.extraction_pipeline = None


@patch("cognite.client.CogniteClient")
def test_report_error(
get_client_mock: Callable[[], CogniteClient],
) -> None:
print("Report error test")

EXTRACTION_PIPELINE = "test_extraction_pipeline"
MESSAGE = "test exception"
expected_message = f"Exception: {MESSAGE}"

def validate_message(run: ExtractionPipelineRun):
print(f"Validating message: {run.message}")
assert run.extpipe_external_id == EXTRACTION_PIPELINE
assert run.status == "failure"
assert run.message == expected_message, "Message does not match expected value"

extractor = Extractor(
name="extractor_test_report_error",
description="description",
config_class=ConfigWithoutStates,
)
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
extractor.cognite_client = get_client_mock()
extractor.logger = logging.getLogger("test_logger")

extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_message)

exception = Exception(MESSAGE)

extractor._report_error(exception=exception)
extractor.extraction_pipeline = None


@patch("cognite.client.CogniteClient")
def test_report_run(get_client_mock: Callable[[], CogniteClient]):
print("Report run test")

MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000

EXTRACTION_PIPELINE = "test_extraction_pipeline"
SHORT_MESSAGE = "hello world"
LONG_MESSAGE = "x " * 1500

expected_long_message = shorten(
text=LONG_MESSAGE,
width=MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN,
placeholder="...",
)

# Mock method for reporting run
def validate_short_message(run: ExtractionPipelineRun):
print(f"Validating short message: {run.message}")
assert run.extpipe_external_id == EXTRACTION_PIPELINE
assert len(run.message) <= MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN, (
f"Short message length exceeds maximum allowed length: {MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN}"
)
assert run.message == SHORT_MESSAGE, "Short message does not match expected value"

def validate_long_message(run: ExtractionPipelineRun):
print(f"Validating long message: {run.message}")
assert len(run.message) <= MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN, (
f"Long message length exceeds maximum allowed length: {MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN}"
)
assert run.message == expected_long_message, "Long message does not match expected value"

extractor = Extractor(
name="extractor_test_report_run",
description="description",
config_class=ConfigWithoutStates,
)
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
extractor.cognite_client = get_client_mock()
extractor.logger = logging.getLogger("test_logger")

extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_short_message)
extractor._report_run(status="success", message=SHORT_MESSAGE)

extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_long_message)
extractor._report_run(status="success", message=LONG_MESSAGE)

extractor.extraction_pipeline = None
# assert False
Loading