Skip to content

Commit d3bea4f

Browse files
committed
Merge branch 'master' into ann-rule
2 parents 3993530 + cf88b54 commit d3bea4f

File tree

21 files changed

+1267
-143
lines changed

21 files changed

+1267
-143
lines changed

CHANGELOG.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,32 @@ Changes are grouped as follows
1212
- `Fixed` for any bug fixes.
1313
- `Security` in case of vulnerabilities.
1414

15-
## Next
15+
## 7.7.0
16+
17+
### Added
18+
19+
* Added the `CDMTimeSeriesUploadQueue` class to upload time series datapoints to instances in the Cognite Data Model (CDM). This new uploader uses `NodeId` for identification and can automatically create `CogniteExtractorTimeSeriesApply` instances if they are missing.
20+
21+
### Changed
22+
23+
* Refactored the time series uploaders for better code reuse and maintainability. A new generic base class, `BaseTimeSeriesUploadQueue`, has been introduced to contain common logic for datapoint validation and sanitization. Both the existing `TimeSeriesUploadQueue` and the new `CDMTimeSeriesUploadQueue` now inherit from this base class.
24+
25+
26+
## 7.6.0
27+
28+
### Added
29+
30+
* Added a common method `_report_run` that can be called from any extractors using this library to report a extractor run into Extractor Pipelines. This automatically handles truncating the message to less than 1000 characters if required.
31+
32+
### Fixed
33+
34+
* Fixed `_report_success` calls failing if the `success_message` was longer than 1000 characters
1635

1736
### Changed
1837

1938
* In the `unstable` package: `-l` / `--local-override` is renamed to `-f` / `--force-local-config`
2039

40+
2141
## 7.5.14
2242

2343
### Added

cognite/extractorutils/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
Cognite extractor utils is a Python package that simplifies the development of new extractors.
1717
"""
1818

19-
__version__ = "7.5.14"
19+
__version__ = "7.7.0"
2020
from .base import Extractor
2121

2222
__all__ = ["Extractor"]

cognite/extractorutils/base.py

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""
22
Module containing the base class for extractors.
33
"""
4+
45
# Copyright 2021 Cognite AS
56
#
67
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,20 +23,32 @@
2223
from collections.abc import Callable
2324
from dataclasses import is_dataclass
2425
from enum import Enum
26+
from textwrap import shorten
2527
from threading import Thread
2628
from types import TracebackType
27-
from typing import Any, Generic, TypeVar
29+
from typing import Any, Generic, Literal, TypeVar
2830

2931
from dotenv import load_dotenv
3032

3133
from cognite.client import CogniteClient
3234
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
33-
from cognite.extractorutils.configtools import BaseConfig, ConfigResolver, StateStoreConfig
35+
from cognite.client.exceptions import CogniteAPIError
36+
from cognite.extractorutils.configtools import (
37+
BaseConfig,
38+
ConfigResolver,
39+
StateStoreConfig,
40+
)
3441
from cognite.extractorutils.exceptions import InvalidConfigError
3542
from cognite.extractorutils.metrics import BaseMetrics
36-
from cognite.extractorutils.statestore import AbstractStateStore, LocalStateStore, NoStateStore
43+
from cognite.extractorutils.statestore import (
44+
AbstractStateStore,
45+
LocalStateStore,
46+
NoStateStore,
47+
)
3748
from cognite.extractorutils.threading import CancellationToken
3849

50+
ReportStatus = Literal["success", "failure", "seen"]
51+
3952

4053
class ReloadConfigAction(Enum):
4154
"""
@@ -227,19 +240,53 @@ def recursive_find_state_store(d: dict[str, Any]) -> StateStoreConfig | None:
227240

228241
Extractor._statestore_singleton = self.state_store
229242

230-
def _report_success(self) -> None:
243+
def _report_run(self, status: ReportStatus, message: str) -> None:
231244
"""
232-
Called on a successful exit of the extractor.
245+
Report the status of the extractor run to the extraction pipeline.
246+
247+
Args:
248+
status: Status of the run, either success or failure or seen
249+
message: Message to report to the extraction pipeline
233250
"""
251+
MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000
234252
if self.extraction_pipeline:
235-
self.logger.info("Reporting new successful run")
236-
self.cognite_client.extraction_pipelines.runs.create(
237-
ExtractionPipelineRun(
253+
try:
254+
message = message or ""
255+
shortened_message = shorten(
256+
message,
257+
width=MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN,
258+
placeholder="...",
259+
)
260+
261+
run = ExtractionPipelineRun(
238262
extpipe_external_id=self.extraction_pipeline.external_id,
239-
status="success",
240-
message=self.success_message,
263+
status=status,
264+
message=shortened_message,
241265
)
242-
)
266+
267+
self.logger.info(f"Reporting new {status} run: {message}")
268+
self.cognite_client.extraction_pipelines.runs.create(run)
269+
except CogniteAPIError as e:
270+
self.logger.exception(f"Error while reporting run - status {status} - message {message} . Error: {e!s}")
271+
272+
def _report_success(self, message: str | None = None) -> None:
273+
"""
274+
Called on a successful exit of the extractor.
275+
276+
Args:
277+
message: Message to report to the extraction pipeline. If not provided, Extractor.success_message is taken.
278+
"""
279+
message = message or self.success_message
280+
self._report_run("success", message)
281+
282+
def _report_failure(self, message: str) -> None:
283+
"""
284+
Called on an unsuccessful exit of the extractor.
285+
286+
Args:
287+
message: Message to report to the extraction pipeline
288+
"""
289+
self._report_run("failure", message)
243290

244291
def _report_error(self, exception: BaseException) -> None:
245292
"""
@@ -248,16 +295,8 @@ def _report_error(self, exception: BaseException) -> None:
248295
Args:
249296
exception: Exception object that caused the extractor to fail
250297
"""
251-
self.logger.error("Unexpected error during extraction", exc_info=exception)
252-
if self.extraction_pipeline:
253-
message = f"{type(exception).__name__}: {exception!s}"[:1000]
254-
255-
self.logger.info(f"Reporting new failed run: {message}")
256-
self.cognite_client.extraction_pipelines.runs.create(
257-
ExtractionPipelineRun(
258-
extpipe_external_id=self.extraction_pipeline.external_id, status="failure", message=message
259-
)
260-
)
298+
message = f"{type(exception).__name__}: {exception!s}"
299+
self._report_run("failure", message)
261300

262301
def __enter__(self) -> "Extractor":
263302
"""
@@ -279,7 +318,9 @@ def __enter__(self) -> "Extractor":
279318
try:
280319
self._initial_load_config(override_path=self.config_file_path)
281320
except InvalidConfigError as e:
282-
print("Critical error: Could not read config file", file=sys.stderr) # noqa: T201
321+
print( # noqa: T201
322+
"Critical error: Could not read config file", file=sys.stderr
323+
)
283324
print(str(e), file=sys.stderr) # noqa: T201
284325
sys.exit(1)
285326

@@ -338,7 +379,10 @@ def heartbeat_loop() -> None:
338379
return self
339380

340381
def __exit__(
341-
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
382+
self,
383+
exc_type: type[BaseException] | None,
384+
exc_val: BaseException | None,
385+
exc_tb: TracebackType | None,
342386
) -> bool:
343387
"""
344388
Shuts down the extractor. Makes sure states are preserved, that all uploads of data and metrics are done, etc.
@@ -381,7 +425,12 @@ def run(self) -> None:
381425
if not self.started:
382426
raise ValueError("You must run the extractor in a context manager")
383427
if self.run_handle:
384-
self.run_handle(self.cognite_client, self.state_store, self.config, self.cancellation_token)
428+
self.run_handle(
429+
self.cognite_client,
430+
self.state_store,
431+
self.config,
432+
self.cancellation_token,
433+
)
385434
else:
386435
raise ValueError("No run_handle defined")
387436

cognite/extractorutils/metrics.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(self):
5454

5555
from cognite.client import CogniteClient
5656
from cognite.client.data_classes import Asset, Datapoints, DatapointsArray, TimeSeries
57+
from cognite.client.data_classes.data_modeling import NodeId
5758
from cognite.client.exceptions import CogniteDuplicatedError
5859
from cognite.extractorutils.threading import CancellationToken
5960
from cognite.extractorutils.util import EitherId
@@ -413,7 +414,7 @@ def _push_to_server(self) -> None:
413414
"""
414415
timestamp = int(arrow.get().float_timestamp * 1000)
415416

416-
datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray]] = []
417+
datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray | NodeId]] = []
417418

418419
for metric in REGISTRY.collect():
419420
if isinstance(metric, Metric) and metric.type in ["gauge", "counter"]:

cognite/extractorutils/unstable/configuration/models.py

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import os
66
import re
7+
from collections.abc import Iterator
78
from datetime import timedelta
89
from enum import Enum
910
from pathlib import Path
@@ -53,23 +54,44 @@ class ConfigModel(BaseModel):
5354
)
5455

5556

56-
class _ClientCredentialsConfig(ConfigModel):
57-
type: Literal["client-credentials"]
57+
class Scopes(str):
58+
def __init__(self, scopes: str) -> None:
59+
self._scopes = list(scopes.split(" "))
60+
61+
@classmethod
62+
def __get_pydantic_core_schema__(cls, source_type: Any, handler: GetCoreSchemaHandler) -> CoreSchema:
63+
return core_schema.no_info_after_validator_function(cls, handler(str))
64+
65+
def __eq__(self, other: object) -> bool:
66+
if not isinstance(other, Scopes):
67+
return NotImplemented
68+
return self._scopes == other._scopes
69+
70+
def __hash__(self) -> int:
71+
return hash(self._scopes)
72+
73+
def __iter__(self) -> Iterator[str]:
74+
return iter(self._scopes)
75+
76+
77+
class BaseCredentialsConfig(ConfigModel):
5878
client_id: str
79+
scopes: Scopes
80+
81+
82+
class _ClientCredentialsConfig(BaseCredentialsConfig):
83+
type: Literal["client-credentials"]
5984
client_secret: str
6085
token_url: str
61-
scopes: list[str]
6286
resource: str | None = None
6387
audience: str | None = None
6488

6589

66-
class _ClientCertificateConfig(ConfigModel):
90+
class _ClientCertificateConfig(BaseCredentialsConfig):
6791
type: Literal["client-certificate"]
68-
client_id: str
6992
path: Path
7093
password: str | None = None
7194
authority_url: str
72-
scopes: list[str]
7395

7496

7597
AuthenticationConfig = Annotated[_ClientCredentialsConfig | _ClientCertificateConfig, Field(discriminator="type")]
@@ -191,18 +213,26 @@ def __repr__(self) -> str:
191213
return self._expression
192214

193215

194-
class _ConnectionParameters(ConfigModel):
195-
gzip_compression: bool = False
196-
status_forcelist: list[int] = Field(default_factory=lambda: [429, 502, 503, 504])
197-
max_retries: int = 10
198-
max_retries_connect: int = 3
199-
max_retry_backoff: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))
200-
max_connection_pool_size: int = 50
201-
ssl_verify: bool = True
202-
proxies: dict[str, str] = Field(default_factory=dict)
216+
class RetriesConfig(ConfigModel):
217+
max_retries: int = Field(default=10, ge=-1)
218+
max_backoff: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))
203219
timeout: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))
204220

205221

222+
class SslCertificatesConfig(ConfigModel):
223+
verify: bool = True
224+
allow_list: list[str] | None = None
225+
226+
227+
class ConnectionParameters(ConfigModel):
228+
retries: RetriesConfig = Field(default_factory=RetriesConfig)
229+
ssl_certificates: SslCertificatesConfig = Field(default_factory=SslCertificatesConfig)
230+
231+
232+
class IntegrationConfig(ConfigModel):
233+
external_id: str
234+
235+
206236
class ConnectionConfig(ConfigModel):
207237
"""
208238
Configuration for connecting to a Cognite Data Fusion project.
@@ -216,11 +246,11 @@ class ConnectionConfig(ConfigModel):
216246
project: str
217247
base_url: str
218248

219-
integration: str
249+
integration: IntegrationConfig
220250

221251
authentication: AuthenticationConfig
222252

223-
connection: _ConnectionParameters = Field(default_factory=_ConnectionParameters)
253+
connection: ConnectionParameters = Field(default_factory=ConnectionParameters)
224254

225255
def get_cognite_client(self, client_name: str) -> CogniteClient:
226256
"""
@@ -235,14 +265,9 @@ def get_cognite_client(self, client_name: str) -> CogniteClient:
235265
from cognite.client.config import global_config
236266

237267
global_config.disable_pypi_version_check = True
238-
global_config.disable_gzip = not self.connection.gzip_compression
239-
global_config.status_forcelist = set(self.connection.status_forcelist)
240-
global_config.max_retries = self.connection.max_retries
241-
global_config.max_retries_connect = self.connection.max_retries_connect
242-
global_config.max_retry_backoff = self.connection.max_retry_backoff.seconds
243-
global_config.max_connection_pool_size = self.connection.max_connection_pool_size
244-
global_config.disable_ssl = not self.connection.ssl_verify
245-
global_config.proxies = self.connection.proxies
268+
global_config.max_retries = self.connection.retries.max_retries
269+
global_config.max_retry_backoff = self.connection.retries.max_backoff.seconds
270+
global_config.disable_ssl = not self.connection.ssl_certificates.verify
246271

247272
credential_provider: CredentialProvider
248273
match self.authentication:
@@ -270,7 +295,7 @@ def get_cognite_client(self, client_name: str) -> CogniteClient:
270295
client_id=client_certificate.client_id,
271296
cert_thumbprint=str(thumbprint),
272297
certificate=str(key),
273-
scopes=client_certificate.scopes,
298+
scopes=list(client_certificate.scopes),
274299
)
275300

276301
case _:
@@ -280,7 +305,7 @@ def get_cognite_client(self, client_name: str) -> CogniteClient:
280305
project=self.project,
281306
base_url=self.base_url,
282307
client_name=client_name,
283-
timeout=self.connection.timeout.seconds,
308+
timeout=self.connection.retries.timeout.seconds,
284309
credentials=credential_provider,
285310
)
286311

@@ -315,7 +340,9 @@ def from_environment(cls) -> "ConnectionConfig":
315340
client_id=os.environ["COGNITE_CLIENT_ID"],
316341
client_secret=os.environ["COGNITE_CLIENT_SECRET"],
317342
token_url=os.environ["COGNITE_TOKEN_URL"],
318-
scopes=os.environ["COGNITE_TOKEN_SCOPES"].split(","),
343+
scopes=Scopes(
344+
os.environ["COGNITE_TOKEN_SCOPES"],
345+
),
319346
)
320347
elif "COGNITE_CLIENT_CERTIFICATE_PATH" in os.environ:
321348
auth = _ClientCertificateConfig(
@@ -324,15 +351,17 @@ def from_environment(cls) -> "ConnectionConfig":
324351
path=Path(os.environ["COGNITE_CLIENT_CERTIFICATE_PATH"]),
325352
password=os.environ.get("COGNITE_CLIENT_CERTIFICATE_PATH"),
326353
authority_url=os.environ["COGNITE_AUTHORITY_URL"],
327-
scopes=os.environ["COGNITE_TOKEN_SCOPES"].split(","),
354+
scopes=Scopes(
355+
os.environ["COGNITE_TOKEN_SCOPES"],
356+
),
328357
)
329358
else:
330359
raise KeyError("Missing auth, either COGNITE_CLIENT_SECRET or COGNITE_CLIENT_CERTIFICATE_PATH must be set")
331360

332361
return ConnectionConfig(
333362
project=os.environ["COGNITE_PROJECT"],
334363
base_url=os.environ["COGNITE_BASE_URL"],
335-
integration=os.environ["COGNITE_INTEGRATION"],
364+
integration=IntegrationConfig(external_id=os.environ["COGNITE_INTEGRATION"]),
336365
authentication=auth,
337366
)
338367

0 commit comments

Comments
 (0)