Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.5.15

### Added

* Added a common method `_report_run` that can be called from any extractors using this library to report a extractor run into Extractor Pipelines. This automatically handles truncating the message to less than 1000 characters if required.

### Fixed

* Fixed `_report_success` calls failing if the `success_message` was longer than 1000 characters

## 7.5.14

### Added
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.5.14"
__version__ = "7.5.15"
from .base import Extractor

__all__ = ["Extractor"]
58 changes: 38 additions & 20 deletions cognite/extractorutils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from collections.abc import Callable
from dataclasses import is_dataclass
from enum import Enum
from textwrap import shorten
from threading import Thread
from types import TracebackType
from typing import Any, Generic, TypeVar
Expand All @@ -26,6 +27,7 @@

from cognite.client import CogniteClient
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
from cognite.client.exceptions import CogniteAPIError
from cognite.extractorutils.configtools import BaseConfig, ConfigResolver, StateStoreConfig
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.metrics import BaseMetrics
Expand Down Expand Up @@ -210,37 +212,53 @@ def recursive_find_state_store(d: dict[str, Any]) -> StateStoreConfig | None:

Extractor._statestore_singleton = self.state_store

def _report_success(self) -> None:
"""
Called on a successful exit of the extractor
"""
def _report_run(self, status: str, message: str) -> None:
MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000
if self.extraction_pipeline:
self.logger.info("Reporting new successful run")
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
try:
message = message or ""
shortened_message = shorten(
message,
width=MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN,
placeholder="...",
)

run = ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id,
status="success",
message=self.success_message,
status=status,
message=shortened_message,
)
)

def _report_error(self, exception: BaseException) -> None:
self.logger.info(f"Reporting new {status} run: {message}")
self.cognite_client.extraction_pipelines.runs.create(run)
except CogniteAPIError as e:
self.logger.exception(
f"error while reporting run - status {status} - message {message} . Error: {str(e)}"
)

def _report_success(self, message: str | None = None) -> None:
"""
Called on a successful exit of the extractor
"""
message = message or self.success_message
self._report_run("success", message)

def _report_error(self, exception: BaseException, message: str | None = None) -> None:
"""
Called on an unsuccessful exit of the extractor

Args:
exception: Exception object that caused the extractor to fail
"""
self.logger.error("Unexpected error during extraction", exc_info=exception)
if self.extraction_pipeline:
message = f"{type(exception).__name__}: {str(exception)}"[:1000]

self.logger.info(f"Reporting new failed run: {message}")
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id, status="failure", message=message
)
)
if message is None and exception is None:
self.logger.exception("Failed to report error to Extraction Pipelines. No message or exception provided.")
return None

if message is None and exception is not None:
message = f"{type(exception).__name__}: {str(exception)}"

self._report_run("failure", message)

def __enter__(self) -> "Extractor":
"""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cognite-extractor-utils"
version = "7.5.14"
version = "7.5.15"
description = "Utilities for easier development of extractors for CDF"
authors = [
{name = "Mathias Lohne", email = "mathias.lohne@cognite.com"}
Expand Down
57 changes: 56 additions & 1 deletion tests/tests_unit/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections.abc import Callable
from dataclasses import dataclass, field
from unittest.mock import patch
from textwrap import shorten
from unittest.mock import Mock, patch

import pytest

from cognite.client import CogniteClient
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
from cognite.extractorutils import Extractor
from cognite.extractorutils.configtools import BaseConfig, StateStoreConfig
from cognite.extractorutils.statestore import LocalStateStore, NoStateStore
Expand Down Expand Up @@ -126,3 +129,55 @@ def test_state_store_getter() -> None:
e5._load_state_store()

assert isinstance(Extractor.get_current_statestore(), LocalStateStore)


@patch("cognite.client.CogniteClient")
def test_report_run(get_client_mock: Callable[[], CogniteClient]):
print("Report run test")

MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000

EXTRACTION_PIPELINE = "test_extraction_pipeline"
SHORT_MESSAGE = "hello world"
LONG_MESSAGE = "x " * 1500

expected_long_message = shorten(
text=LONG_MESSAGE,
width=MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN,
placeholder="...",
)

# Mock method for reporting run
def validate_short_message(run: ExtractionPipelineRun):
print(f"Validating short message: {run.message}")
assert run.extpipe_external_id == EXTRACTION_PIPELINE
assert (
len(run.message) <= MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN
), f"Short message length exceeds maximum allowed length: {MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN}"
assert run.message == SHORT_MESSAGE, "Short message does not match expected value"

def validate_long_message(run: ExtractionPipelineRun):
print(f"Validating long message: {run.message}")
assert (
len(run.message) <= MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN
), f"Long message length exceeds maximum allowed length: {MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN}"
assert run.message == expected_long_message, "Long message does not match expected value"

extractor = Extractor(
name="extractor_test_report_run",
description="description",
config_class=ConfigWithoutStates,
)
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
extractor.cognite_client = get_client_mock()
extractor.logger = logging.getLogger("test_logger")

extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_short_message)
extractor._report_run(status="success", message=SHORT_MESSAGE)

extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_long_message)
extractor._report_run(status="success", message=LONG_MESSAGE)

extractor.extraction_pipeline = None
# assert False
Loading