Skip to content

Commit 5f7b27d

Browse files
nithinbNithin Bodanapu
andauthored
Shorten the messages written to extractor pipeline if its more than 1000 characters (#440)
* Define extraction pipeline reporting messages here that can be re-used in other extractors * Added test case for report_run * Version changes to trigger a release * Ruff Formatting changes after resolving merge conflicts --------- Co-authored-by: Nithin Bodanapu <nithin.bodanapu@cognite.com>
1 parent 6ec693b commit 5f7b27d

File tree

5 files changed

+237
-28
lines changed

5 files changed

+237
-28
lines changed

CHANGELOG.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,21 @@ Changes are grouped as follows
1212
- `Fixed` for any bug fixes.
1313
- `Security` in case of vulnerabilities.
1414

15-
## Next
15+
## 7.6.0
16+
17+
### Added
18+
19+
* Added a common method `_report_run` that can be called from any extractors using this library to report a extractor run into Extractor Pipelines. This automatically handles truncating the message to less than 1000 characters if required.
20+
21+
### Fixed
22+
23+
* Fixed `_report_success` calls failing if the `success_message` was longer than 1000 characters
1624

1725
### Changed
1826

1927
* In the `unstable` package: `-l` / `--local-override` is renamed to `-f` / `--force-local-config`
2028

29+
2130
## 7.5.14
2231

2332
### Added

cognite/extractorutils/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
Cognite extractor utils is a Python package that simplifies the development of new extractors.
1717
"""
1818

19-
__version__ = "7.5.14"
19+
__version__ = "7.6.0"
2020
from .base import Extractor
2121

2222
__all__ = ["Extractor"]

cognite/extractorutils/base.py

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""
22
Module containing the base class for extractors.
33
"""
4+
45
# Copyright 2021 Cognite AS
56
#
67
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,20 +23,32 @@
2223
from collections.abc import Callable
2324
from dataclasses import is_dataclass
2425
from enum import Enum
26+
from textwrap import shorten
2527
from threading import Thread
2628
from types import TracebackType
27-
from typing import Any, Generic, TypeVar
29+
from typing import Any, Generic, Literal, TypeVar
2830

2931
from dotenv import load_dotenv
3032

3133
from cognite.client import CogniteClient
3234
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
33-
from cognite.extractorutils.configtools import BaseConfig, ConfigResolver, StateStoreConfig
35+
from cognite.client.exceptions import CogniteAPIError
36+
from cognite.extractorutils.configtools import (
37+
BaseConfig,
38+
ConfigResolver,
39+
StateStoreConfig,
40+
)
3441
from cognite.extractorutils.exceptions import InvalidConfigError
3542
from cognite.extractorutils.metrics import BaseMetrics
36-
from cognite.extractorutils.statestore import AbstractStateStore, LocalStateStore, NoStateStore
43+
from cognite.extractorutils.statestore import (
44+
AbstractStateStore,
45+
LocalStateStore,
46+
NoStateStore,
47+
)
3748
from cognite.extractorutils.threading import CancellationToken
3849

50+
ReportStatus = Literal["success", "failure", "seen"]
51+
3952

4053
class ReloadConfigAction(Enum):
4154
"""
@@ -227,19 +240,53 @@ def recursive_find_state_store(d: dict[str, Any]) -> StateStoreConfig | None:
227240

228241
Extractor._statestore_singleton = self.state_store
229242

230-
def _report_success(self) -> None:
243+
def _report_run(self, status: ReportStatus, message: str) -> None:
231244
"""
232-
Called on a successful exit of the extractor.
245+
Report the status of the extractor run to the extraction pipeline.
246+
247+
Args:
248+
status: Status of the run, either success or failure or seen
249+
message: Message to report to the extraction pipeline
233250
"""
251+
MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000
234252
if self.extraction_pipeline:
235-
self.logger.info("Reporting new successful run")
236-
self.cognite_client.extraction_pipelines.runs.create(
237-
ExtractionPipelineRun(
253+
try:
254+
message = message or ""
255+
shortened_message = shorten(
256+
message,
257+
width=MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN,
258+
placeholder="...",
259+
)
260+
261+
run = ExtractionPipelineRun(
238262
extpipe_external_id=self.extraction_pipeline.external_id,
239-
status="success",
240-
message=self.success_message,
263+
status=status,
264+
message=shortened_message,
241265
)
242-
)
266+
267+
self.logger.info(f"Reporting new {status} run: {message}")
268+
self.cognite_client.extraction_pipelines.runs.create(run)
269+
except CogniteAPIError as e:
270+
self.logger.exception(f"Error while reporting run - status {status} - message {message} . Error: {e!s}")
271+
272+
def _report_success(self, message: str | None = None) -> None:
273+
"""
274+
Called on a successful exit of the extractor.
275+
276+
Args:
277+
message: Message to report to the extraction pipeline. If not provided, Extractor.success_message is taken.
278+
"""
279+
message = message or self.success_message
280+
self._report_run("success", message)
281+
282+
def _report_failure(self, message: str) -> None:
283+
"""
284+
Called on an unsuccessful exit of the extractor.
285+
286+
Args:
287+
message: Message to report to the extraction pipeline
288+
"""
289+
self._report_run("failure", message)
243290

244291
def _report_error(self, exception: BaseException) -> None:
245292
"""
@@ -248,16 +295,8 @@ def _report_error(self, exception: BaseException) -> None:
248295
Args:
249296
exception: Exception object that caused the extractor to fail
250297
"""
251-
self.logger.error("Unexpected error during extraction", exc_info=exception)
252-
if self.extraction_pipeline:
253-
message = f"{type(exception).__name__}: {exception!s}"[:1000]
254-
255-
self.logger.info(f"Reporting new failed run: {message}")
256-
self.cognite_client.extraction_pipelines.runs.create(
257-
ExtractionPipelineRun(
258-
extpipe_external_id=self.extraction_pipeline.external_id, status="failure", message=message
259-
)
260-
)
298+
message = f"{type(exception).__name__}: {exception!s}"
299+
self._report_run("failure", message)
261300

262301
def __enter__(self) -> "Extractor":
263302
"""
@@ -279,7 +318,9 @@ def __enter__(self) -> "Extractor":
279318
try:
280319
self._initial_load_config(override_path=self.config_file_path)
281320
except InvalidConfigError as e:
282-
print("Critical error: Could not read config file", file=sys.stderr) # noqa: T201
321+
print( # noqa: T201
322+
"Critical error: Could not read config file", file=sys.stderr
323+
)
283324
print(str(e), file=sys.stderr) # noqa: T201
284325
sys.exit(1)
285326

@@ -338,7 +379,10 @@ def heartbeat_loop() -> None:
338379
return self
339380

340381
def __exit__(
341-
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
382+
self,
383+
exc_type: type[BaseException] | None,
384+
exc_val: BaseException | None,
385+
exc_tb: TracebackType | None,
342386
) -> bool:
343387
"""
344388
Shuts down the extractor. Makes sure states are preserved, that all uploads of data and metrics are done, etc.
@@ -381,7 +425,12 @@ def run(self) -> None:
381425
if not self.started:
382426
raise ValueError("You must run the extractor in a context manager")
383427
if self.run_handle:
384-
self.run_handle(self.cognite_client, self.state_store, self.config, self.cancellation_token)
428+
self.run_handle(
429+
self.cognite_client,
430+
self.state_store,
431+
self.config,
432+
self.cancellation_token,
433+
)
385434
else:
386435
raise ValueError("No run_handle defined")
387436

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "cognite-extractor-utils"
3-
version = "7.5.14"
3+
version = "7.6.0"
44
description = "Utilities for easier development of extractors for CDF"
55
authors = [
66
{name = "Mathias Lohne", email = "mathias.lohne@cognite.com"}

tests/tests_unit/test_base.py

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import logging
1415
from collections.abc import Callable
1516
from dataclasses import dataclass, field
16-
from unittest.mock import patch
17+
from textwrap import shorten
18+
from unittest.mock import Mock, patch
1719

1820
import pytest
1921

2022
from cognite.client import CogniteClient
23+
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
2124
from cognite.extractorutils import Extractor
2225
from cognite.extractorutils.configtools import BaseConfig, StateStoreConfig
2326
from cognite.extractorutils.statestore import LocalStateStore, NoStateStore
@@ -126,3 +129,151 @@ def test_state_store_getter() -> None:
126129
e5._load_state_store()
127130

128131
assert isinstance(Extractor.get_current_statestore(), LocalStateStore)
132+
133+
134+
@patch("cognite.client.CogniteClient")
135+
def test_report_success(
136+
get_client_mock: Callable[[], CogniteClient],
137+
) -> None:
138+
print("Report success test")
139+
140+
EXTRACTION_PIPELINE = "test_extraction_pipeline"
141+
MESSAGE = "test message"
142+
143+
def validate_message(run: ExtractionPipelineRun):
144+
print(f"Validating message: {run.message}")
145+
assert run.extpipe_external_id == EXTRACTION_PIPELINE
146+
assert run.status == "success"
147+
assert run.message == MESSAGE, "Message does not match expected value"
148+
149+
extractor = Extractor(
150+
name="extractor_test_report_success",
151+
description="description",
152+
config_class=ConfigWithoutStates,
153+
)
154+
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
155+
extractor.cognite_client = get_client_mock()
156+
extractor.logger = logging.getLogger("test_logger")
157+
158+
extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
159+
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_message)
160+
161+
extractor._report_success(message=MESSAGE)
162+
extractor.extraction_pipeline = None
163+
164+
165+
@patch("cognite.client.CogniteClient")
166+
def test_report_failure(
167+
get_client_mock: Callable[[], CogniteClient],
168+
) -> None:
169+
print("Report success test")
170+
171+
EXTRACTION_PIPELINE = "test_extraction_pipeline"
172+
MESSAGE = "test message"
173+
174+
def validate_message(run: ExtractionPipelineRun):
175+
print(f"Validating message: {run.message}")
176+
assert run.extpipe_external_id == EXTRACTION_PIPELINE
177+
assert run.status == "failure"
178+
assert run.message == MESSAGE, "Message does not match expected value"
179+
180+
extractor = Extractor(
181+
name="extractor_test_report_failure",
182+
description="description",
183+
config_class=ConfigWithoutStates,
184+
)
185+
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
186+
extractor.cognite_client = get_client_mock()
187+
extractor.logger = logging.getLogger("test_logger")
188+
189+
extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
190+
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_message)
191+
192+
extractor._report_failure(message=MESSAGE)
193+
extractor.extraction_pipeline = None
194+
195+
196+
@patch("cognite.client.CogniteClient")
197+
def test_report_error(
198+
get_client_mock: Callable[[], CogniteClient],
199+
) -> None:
200+
print("Report error test")
201+
202+
EXTRACTION_PIPELINE = "test_extraction_pipeline"
203+
MESSAGE = "test exception"
204+
expected_message = f"Exception: {MESSAGE}"
205+
206+
def validate_message(run: ExtractionPipelineRun):
207+
print(f"Validating message: {run.message}")
208+
assert run.extpipe_external_id == EXTRACTION_PIPELINE
209+
assert run.status == "failure"
210+
assert run.message == expected_message, "Message does not match expected value"
211+
212+
extractor = Extractor(
213+
name="extractor_test_report_error",
214+
description="description",
215+
config_class=ConfigWithoutStates,
216+
)
217+
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
218+
extractor.cognite_client = get_client_mock()
219+
extractor.logger = logging.getLogger("test_logger")
220+
221+
extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
222+
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_message)
223+
224+
exception = Exception(MESSAGE)
225+
226+
extractor._report_error(exception=exception)
227+
extractor.extraction_pipeline = None
228+
229+
230+
@patch("cognite.client.CogniteClient")
231+
def test_report_run(get_client_mock: Callable[[], CogniteClient]):
232+
print("Report run test")
233+
234+
MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000
235+
236+
EXTRACTION_PIPELINE = "test_extraction_pipeline"
237+
SHORT_MESSAGE = "hello world"
238+
LONG_MESSAGE = "x " * 1500
239+
240+
expected_long_message = shorten(
241+
text=LONG_MESSAGE,
242+
width=MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN,
243+
placeholder="...",
244+
)
245+
246+
# Mock method for reporting run
247+
def validate_short_message(run: ExtractionPipelineRun):
248+
print(f"Validating short message: {run.message}")
249+
assert run.extpipe_external_id == EXTRACTION_PIPELINE
250+
assert len(run.message) <= MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN, (
251+
f"Short message length exceeds maximum allowed length: {MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN}"
252+
)
253+
assert run.message == SHORT_MESSAGE, "Short message does not match expected value"
254+
255+
def validate_long_message(run: ExtractionPipelineRun):
256+
print(f"Validating long message: {run.message}")
257+
assert len(run.message) <= MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN, (
258+
f"Long message length exceeds maximum allowed length: {MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN}"
259+
)
260+
assert run.message == expected_long_message, "Long message does not match expected value"
261+
262+
extractor = Extractor(
263+
name="extractor_test_report_run",
264+
description="description",
265+
config_class=ConfigWithoutStates,
266+
)
267+
extractor._initial_load_config("tests/tests_unit/dummyconfig.yaml")
268+
extractor.cognite_client = get_client_mock()
269+
extractor.logger = logging.getLogger("test_logger")
270+
271+
extractor.extraction_pipeline = ExtractionPipeline(external_id=EXTRACTION_PIPELINE, name=EXTRACTION_PIPELINE)
272+
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_short_message)
273+
extractor._report_run(status="success", message=SHORT_MESSAGE)
274+
275+
extractor.cognite_client.extraction_pipelines.runs.create = Mock(side_effect=validate_long_message)
276+
extractor._report_run(status="success", message=LONG_MESSAGE)
277+
278+
extractor.extraction_pipeline = None
279+
# assert False

0 commit comments

Comments
 (0)