Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
98a4ee5
Add CDM Timeseries Upload Queue
devendra-lohar May 8, 2025
03907c7
codestyle check fixes
devendra-lohar May 9, 2025
9498de6
test: testing
toondaey May 9, 2025
88bc282
test: print test logs
toondaey May 9, 2025
3a0c592
test: remove changes
toondaey May 9, 2025
399c989
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar May 13, 2025
6d1a1c9
Add tests for CDM Timeseries upload queue
devendra-lohar May 14, 2025
184e47b
adding common datapoints sanitize method
devendra-lohar May 15, 2025
ee2e2cc
review comment fixes
devendra-lohar May 19, 2025
284ad8a
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
toondaey May 20, 2025
726a0aa
Add BaseTimeSeriesUploadQueue class
devendra-lohar May 21, 2025
77e8f3a
BaseTimeSeriesUploadQueue changes
devendra-lohar May 23, 2025
728102f
Add CDMTimeSeriesUploadQueue changes to uploader_extractor
devendra-lohar May 23, 2025
2de87f9
Refactor: unify __enter__ in the Base class
devendra-lohar May 23, 2025
8443844
Regular review changes
devendra-lohar May 29, 2025
8acafd7
removing unneccesary omment
devendra-lohar May 30, 2025
be6f791
Align cdm timeseries creation logic with legacy
devendra-lohar Jun 4, 2025
94f7738
[Modify] Align consistency with legacy missing factory
devendra-lohar Jun 9, 2025
50576f4
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 9, 2025
06eb57e
ruff formatting changes
devendra-lohar Jun 11, 2025
ed59c8b
[modify] regular formatting changes
devendra-lohar Jun 11, 2025
ab76970
ruff formatting change
devendra-lohar Jun 11, 2025
8432554
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 12, 2025
dea4ef4
fixing docstring
devendra-lohar Jun 12, 2025
ecdade5
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 24, 2025
8e63e36
fix type for insert datapoints api in metrics.py
devendra-lohar Jun 24, 2025
a709828
add more robust testcases to cover edge cases
devendra-lohar Jun 24, 2025
f024e28
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 134 additions & 10 deletions cognite/extractorutils/uploader/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
StatusCode,
TimeSeries,
)
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorTimeSeriesApply
from cognite.client.exceptions import CogniteDuplicatedError, CogniteNotFoundError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
Expand Down Expand Up @@ -171,6 +173,20 @@ def _is_datapoint_valid(
else:
return True

def _sanitize_datapoints(self, datapoints: DataPointList | None) -> DataPointList:
datapoints = datapoints or []
old_len = len(datapoints)
datapoints = list(filter(self._is_datapoint_valid, datapoints))

new_len = len(datapoints)

if old_len > new_len:
diff = old_len - new_len
self.logger.warning(f"Discarding {diff} datapoints due to bad timestamp or value")
TIMESERIES_UPLOADER_POINTS_DISCARDED.inc(diff)

return datapoints

def add_to_upload_queue(
self, *, id: int | None = None, external_id: str | None = None, datapoints: DataPointList | None = None
) -> None:
Expand All @@ -183,16 +199,7 @@ def add_to_upload_queue(
external_id: External ID of time series. Either this or external_id must be set.
datapoints: list of data points to add
"""
datapoints = datapoints or []
old_len = len(datapoints)
datapoints = list(filter(self._is_datapoint_valid, datapoints))

new_len = len(datapoints)

if old_len > new_len:
diff = old_len - new_len
self.logger.warning(f"Discarding {diff} datapoints due to bad timestamp or value")
TIMESERIES_UPLOADER_POINTS_DISCARDED.inc(diff)
datapoints = self._sanitize_datapoints(datapoints)

either_id = EitherId(id=id, external_id=external_id)

Expand Down Expand Up @@ -340,6 +347,123 @@ def __len__(self) -> int:
return self.upload_queue_size


class CDMTimeSeriesUploadQueue(TimeSeriesUploadQueue):
"""
Upload queue for time series

Args:
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of dicts containing the datapoints that were uploaded (on the same format as the kwargs in
datapoints upload in the Cognite SDK).
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
"""

def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Callable[[list[dict[str, str | DataPointList]]], None] | None = None,
max_queue_size: int | None = None,
max_upload_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
create_missing: Callable[[str, DataPointList], TimeSeries] | bool = False,
data_set_id: int | None = None,
cancellation_token: CancellationToken | None = None,
):
super().__init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
create_missing,
data_set_id,
cancellation_token,
)

def _apply_cognite_timeseries(self, timeseries_apply: CogniteExtractorTimeSeriesApply) -> NodeId:
instance_result = self.cdf_client.data_modeling.instances.apply(timeseries_apply)
node = instance_result.nodes[0]
return node.as_id()

def add_apply_to_upload_queue(
self, *, timeseries_apply: CogniteExtractorTimeSeriesApply, datapoints: DataPointList | None = None
) -> None:
"""
Add data points to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.

Args:
timeseries_apply: CogniteExtractorTimeSeriesApply object for which the node is to be created.
datapoints: list of data points to add
"""
datapoints = self._sanitize_datapoints(datapoints)

instance_id = self._apply_cognite_timeseries(timeseries_apply)
either_id = EitherId(instance_id=instance_id)

with self.lock:
if either_id not in self.upload_queue:
self.upload_queue[either_id] = []

self.upload_queue[either_id].extend(datapoints)
self.points_queued.inc(len(datapoints))
self.upload_queue_size += len(datapoints)
self.queue_size.set(self.upload_queue_size)

self._check_triggers()

def upload(self) -> None:
"""
Trigger an upload of the queue, clears queue afterwards
"""

@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_batch(upload_this: list[dict]) -> list[dict]:
if len(upload_this) > 0:
self.cdf_client.time_series.data.insert_multiple(upload_this)

return upload_this

if len(self.upload_queue) == 0:
return

with self.lock:
upload_this = _upload_batch(
[
{either_id.type(): either_id.content(), "datapoints": list(datapoints)}
for either_id, datapoints in self.upload_queue.items()
if len(datapoints) > 0
]
)

for _either_id, datapoints in self.upload_queue.items():
self.points_written.inc(len(datapoints))

try:
self._post_upload(upload_this)
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

self.upload_queue.clear()
self.logger.info(f"Uploaded {self.upload_queue_size} datapoints")
self.upload_queue_size = 0
self.queue_size.set(self.upload_queue_size)


class SequenceUploadQueue(AbstractUploadQueue):
def __init__(
self,
Expand Down
44 changes: 30 additions & 14 deletions cognite/extractorutils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from cognite.client import CogniteClient
from cognite.client.data_classes import Asset, ExtractionPipelineRun, TimeSeries
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.exceptions import CogniteAPIError, CogniteException, CogniteFileUploadError, CogniteNotFoundError
from cognite.extractorutils.threading import CancellationToken

Expand Down Expand Up @@ -79,35 +80,41 @@ def ensure_assets(cdf_client: CogniteClient, assets: Iterable[Asset]) -> None:

class EitherId:
"""
Class representing an ID in CDF, which can either be an external or internal ID. An EitherId can only hold one ID
type, not both.
Class representing an ID in CDF, which can either be an external ID, internal ID or instance ID.
An EitherId can only hold one ID type, not more than one.

Args:
id: Internal ID
external_id: external ID. It can be `external_id` or `externalId`
instance_id: Instance ID. It can be `instance_id` or `instanceId`

Raises:
TypeError: If none of both of id types are set.
TypeError: If none or more than one of the id types are set.
"""

def __init__(self, **kwargs: int | str | None):
def __init__(self, **kwargs: int | str | NodeId | None):
internal_id = kwargs.get("id")
external_id = kwargs.get("externalId") or kwargs.get("external_id")
instance_id = kwargs.get("instanceId") or kwargs.get("instance_id")

if internal_id is None and external_id is None:
raise TypeError("Either id or external_id must be set")
identifiers = [internal_id, external_id, instance_id]
provided_ids = [id_val for id_val in identifiers if id_val is not None]

if internal_id is not None and external_id is not None:
raise TypeError("Only one of id and external_id can be set")
if not provided_ids:
raise TypeError("Either id, external_id, or instance_id must be set")
if len(provided_ids) > 1:
raise TypeError("Only one of id, external_id, or instance_id can be set")

if internal_id is not None and not isinstance(internal_id, int):
raise TypeError("Internal IDs must be integers")

if external_id is not None and not isinstance(external_id, str):
raise TypeError("External IDs must be strings")
if instance_id is not None and not isinstance(instance_id, NodeId):
raise TypeError("Instance IDs must be NodeId objects")

self.internal_id: int | None = internal_id
self.external_id: str | None = external_id
self.instance_id: NodeId | None = instance_id

def type(self) -> str:
"""
Expand All @@ -116,16 +123,21 @@ def type(self) -> str:
Returns:
'id' if the EitherId represents an internal ID, 'externalId' if the EitherId represents an external ID
"""
return "id" if self.internal_id is not None else "externalId"
if self.internal_id is not None:
return "id"
elif self.instance_id is not None:
return "instanceId"
else:
return "externalId"

def content(self) -> int | str:
def content(self) -> int | str | NodeId:
"""
Get the value of the ID

Returns:
The ID
"""
return self.internal_id or self.external_id # type: ignore # checked to be not None in init
return self.internal_id or self.external_id or self.instance_id # type: ignore

def __eq__(self, other: Any) -> bool:
"""
Expand All @@ -140,7 +152,11 @@ def __eq__(self, other: Any) -> bool:
if not isinstance(other, EitherId):
return False

return self.internal_id == other.internal_id and self.external_id == other.external_id
return (
self.internal_id == other.internal_id
and self.external_id == other.external_id
and self.instance_id == other.instance_id
)

def __hash__(self) -> int:
"""
Expand All @@ -149,7 +165,7 @@ def __hash__(self) -> int:
Returns:
Hash code of ID
"""
return hash((self.internal_id, self.external_id))
return hash((self.internal_id, self.external_id, self.instance_id))

def __str__(self) -> str:
"""
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

class ETestType(Enum):
TIME_SERIES = "time_series"
CDM_TIME_SERIES = "cdm_time_series"
FILES = "files"
RAW = "raw"
ASSETS = "assets"
Expand Down Expand Up @@ -66,6 +67,10 @@ def set_client() -> CogniteClient:
def clean_test(client: CogniteClient, test_parameter: ParamTest) -> None:
if test_parameter.test_type.value == ETestType.TIME_SERIES.value:
client.time_series.delete(external_id=test_parameter.external_ids, ignore_unknown_ids=True)
if test_parameter.test_type.value == ETestType.CDM_TIME_SERIES.value:
client.data_modeling.instances.delete(
nodes=[NodeId("ExtractorUtilsTests", i) for i in test_parameter.external_ids]
)
elif test_parameter.test_type.value == ETestType.EVENTS.value:
client.events.delete(external_id=test_parameter.external_ids, ignore_unknown_ids=True)
elif test_parameter.test_type.value == ETestType.ASSETS.value:
Expand Down
Loading
Loading