Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
98a4ee5
Add CDM Timeseries Upload Queue
devendra-lohar May 8, 2025
03907c7
codestyle check fixes
devendra-lohar May 9, 2025
9498de6
test: testing
toondaey May 9, 2025
88bc282
test: print test logs
toondaey May 9, 2025
3a0c592
test: remove changes
toondaey May 9, 2025
399c989
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar May 13, 2025
6d1a1c9
Add tests for CDM Timeseries upload queue
devendra-lohar May 14, 2025
184e47b
adding common datapoints sanitize method
devendra-lohar May 15, 2025
ee2e2cc
review comment fixes
devendra-lohar May 19, 2025
284ad8a
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
toondaey May 20, 2025
726a0aa
Add BaseTimeSeriesUploadQueue class
devendra-lohar May 21, 2025
77e8f3a
BaseTimeSeriesUploadQueue changes
devendra-lohar May 23, 2025
728102f
Add CDMTimeSeriesUploadQueue changes to uploader_extractor
devendra-lohar May 23, 2025
2de87f9
Refactor: unify __enter__ in the Base class
devendra-lohar May 23, 2025
8443844
Regular review changes
devendra-lohar May 29, 2025
8acafd7
removing unneccesary omment
devendra-lohar May 30, 2025
be6f791
Align cdm timeseries creation logic with legacy
devendra-lohar Jun 4, 2025
94f7738
[Modify] Align consistency with legacy missing factory
devendra-lohar Jun 9, 2025
50576f4
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 9, 2025
06eb57e
ruff formatting changes
devendra-lohar Jun 11, 2025
ed59c8b
[modify] regular formatting changes
devendra-lohar Jun 11, 2025
ab76970
ruff formatting change
devendra-lohar Jun 11, 2025
8432554
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 12, 2025
dea4ef4
fixing docstring
devendra-lohar Jun 12, 2025
ecdade5
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 24, 2025
8e63e36
fix type for insert datapoints api in metrics.py
devendra-lohar Jun 24, 2025
a709828
add more robust testcases to cover edge cases
devendra-lohar Jun 24, 2025
f024e28
Merge branch 'master' into DOG-5255-CDM-Timeseries-db-extractor
devendra-lohar Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite/extractorutils/uploader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from .files import BytesUploadQueue, FileUploadQueue, IOFileUploadQueue
from .raw import RawUploadQueue
from .time_series import (
CDMTimeSeriesUploadQueue,
DataPoint,
DataPointList,
SequenceUploadQueue,
Expand All @@ -87,5 +88,6 @@
"DataPointList",
"SequenceUploadQueue",
"TimeSeriesUploadQueue",
"CDMTimeSeriesUploadQueue",
"default_time_series_factory",
]
265 changes: 213 additions & 52 deletions cognite/extractorutils/uploader/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections.abc import Callable
from datetime import datetime
from types import TracebackType
from typing import Any
from typing import Any, TypeVar

from cognite.client import CogniteClient
from cognite.client.data_classes import (
Expand All @@ -26,6 +26,8 @@
StatusCode,
TimeSeries,
)
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorTimeSeriesApply
from cognite.client.exceptions import CogniteDuplicatedError, CogniteNotFoundError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
Expand Down Expand Up @@ -59,6 +61,8 @@
DataPoint = DataPointWithoutStatus | DataPointWithStatus
DataPointList = list[DataPoint]

TQueue = TypeVar("TQueue", bound="BaseTimeSeriesUploadQueue")


def default_time_series_factory(external_id: str, datapoints: DataPointList) -> TimeSeries:
"""
Expand All @@ -79,9 +83,9 @@ def default_time_series_factory(external_id: str, datapoints: DataPointList) ->
return TimeSeries(external_id=external_id, is_string=is_string)


class TimeSeriesUploadQueue(AbstractUploadQueue):
class BaseTimeSeriesUploadQueue(AbstractUploadQueue):
"""
Upload queue for time series
Abstract base upload queue for time series

Args:
cdf_client: Cognite Data Fusion client to use
Expand All @@ -93,12 +97,6 @@ class TimeSeriesUploadQueue(AbstractUploadQueue):
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
create_missing: Create missing time series if possible (ie, if external id is used). Either given as a boolean
(True would auto-create a time series with nothing but an external ID), or as a factory function taking an
external ID and a list of datapoints about to be inserted and returning a TimeSeries object.
data_set_id: Data set id passed to create_missing. Does nothing if create_missing is False.
If a custom timeseries creation method is set in create_missing, this is used as fallback if
that method does not set data set id on its own.
"""

def __init__(
Expand All @@ -109,8 +107,6 @@ def __init__(
max_upload_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
create_missing: Callable[[str, DataPointList], TimeSeries] | bool = False,
data_set_id: int | None = None,
cancellation_token: CancellationToken | None = None,
):
# Super sets post_upload and threshold
Expand All @@ -124,21 +120,11 @@ def __init__(
cancellation_token,
)

self.missing_factory: Callable[[str, DataPointList], TimeSeries]

if isinstance(create_missing, bool):
self.create_missing = create_missing
self.missing_factory = default_time_series_factory
else:
self.create_missing = True
self.missing_factory = create_missing

self.upload_queue: dict[EitherId, DataPointList] = {}

self.points_queued = TIMESERIES_UPLOADER_POINTS_QUEUED
self.points_written = TIMESERIES_UPLOADER_POINTS_WRITTEN
self.queue_size = TIMESERIES_UPLOADER_QUEUE_SIZE
self.data_set_id = data_set_id

def _verify_datapoint_time(self, time: int | float | datetime | str) -> bool:
if isinstance(time, int) or isinstance(time, float):
Expand Down Expand Up @@ -171,6 +157,109 @@ def _is_datapoint_valid(
else:
return True

def _sanitize_datapoints(self, datapoints: DataPointList | None) -> DataPointList:
datapoints = datapoints or []
old_len = len(datapoints)
datapoints = list(filter(self._is_datapoint_valid, datapoints))

new_len = len(datapoints)

if old_len > new_len:
diff = old_len - new_len
self.logger.warning(f"Discarding {diff} datapoints due to bad timestamp or value")
TIMESERIES_UPLOADER_POINTS_DISCARDED.inc(diff)

return datapoints

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""
Wraps around stop method, for use as context manager

Args:
exc_type: Exception type
exc_val: Exception value
exc_tb: Traceback
"""
self.stop()

def __len__(self) -> int:
"""
The size of the upload queue

Returns:
Number of data points in queue
"""
return self.upload_queue_size

def __enter__(self: TQueue) -> TQueue:
"""
Wraps around start method, for use as context manager

Returns:
self
"""
self.start()
return self


class TimeSeriesUploadQueue(BaseTimeSeriesUploadQueue):
"""
Upload queue for time series

Args:
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of dicts containing the datapoints that were uploaded (on the same format as the kwargs in
datapoints upload in the Cognite SDK).
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
create_missing: Create missing time series if possible (ie, if external id is used). Either given as a boolean
(True would auto-create a time series with nothing but an external ID), or as a factory function taking an
external ID and a list of datapoints about to be inserted and returning a TimeSeries object.
data_set_id: Data set id passed to create_missing. Does nothing if create_missing is False.
If a custom timeseries creation method is set in create_missing, this is used as fallback if
that method does not set data set id on its own.
"""

def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Callable[[list[dict[str, str | DataPointList]]], None] | None = None,
max_queue_size: int | None = None,
max_upload_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
create_missing: Callable[[str, DataPointList], TimeSeries] | bool = False,
data_set_id: int | None = None,
cancellation_token: CancellationToken | None = None,
):
# Super sets post_upload and threshold
super().__init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
cancellation_token,
)

self.missing_factory: Callable[[str, DataPointList], TimeSeries]

if isinstance(create_missing, bool):
self.create_missing = create_missing
self.missing_factory = default_time_series_factory
else:
self.create_missing = True
self.missing_factory = create_missing

self.data_set_id = data_set_id

def add_to_upload_queue(
self, *, id: int | None = None, external_id: str | None = None, datapoints: DataPointList | None = None
) -> None:
Expand All @@ -183,16 +272,7 @@ def add_to_upload_queue(
external_id: External ID of time series. Either this or external_id must be set.
datapoints: list of data points to add
"""
datapoints = datapoints or []
old_len = len(datapoints)
datapoints = list(filter(self._is_datapoint_valid, datapoints))

new_len = len(datapoints)

if old_len > new_len:
diff = old_len - new_len
self.logger.warning(f"Discarding {diff} datapoints due to bad timestamp or value")
TIMESERIES_UPLOADER_POINTS_DISCARDED.inc(diff)
datapoints = self._sanitize_datapoints(datapoints)

either_id = EitherId(id=id, external_id=external_id)

Expand Down Expand Up @@ -307,37 +387,118 @@ def _upload_batch(upload_this: list[dict], retries: int = 5) -> list[dict]:
self.upload_queue_size = 0
self.queue_size.set(self.upload_queue_size)

def __enter__(self) -> "TimeSeriesUploadQueue":
"""
Wraps around start method, for use as context manager

Returns:
self
"""
self.start()
return self
class CDMTimeSeriesUploadQueue(BaseTimeSeriesUploadQueue):
"""
Upload queue for CDM time series

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
Args:
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of dicts containing the datapoints that were uploaded (on the same format as the kwargs in
datapoints upload in the Cognite SDK).
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
"""

def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Callable[[list[dict[str, str | DataPointList]]], None] | None = None,
max_queue_size: int | None = None,
max_upload_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
cancellation_token: CancellationToken | None = None,
):
super().__init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
cancellation_token,
)

def _apply_cognite_timeseries(self, timeseries_apply: CogniteExtractorTimeSeriesApply) -> NodeId:
instance_result = self.cdf_client.data_modeling.instances.apply(timeseries_apply)
node = instance_result.nodes[0]
return node.as_id()

def add_to_upload_queue(
self, *, timeseries_apply: CogniteExtractorTimeSeriesApply, datapoints: DataPointList | None = None
) -> None:
"""
Wraps around stop method, for use as context manager
Add data points to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.

Args:
exc_type: Exception type
exc_val: Exception value
exc_tb: Traceback
timeseries_apply: CogniteExtractorTimeSeriesApply object for which the node is to be created.
datapoints: list of data points to add
"""
self.stop()
datapoints = self._sanitize_datapoints(datapoints)

def __len__(self) -> int:
"""
The size of the upload queue
instance_id = self._apply_cognite_timeseries(timeseries_apply)
either_id = EitherId(instance_id=instance_id)

Returns:
Number of data points in queue
with self.lock:
if either_id not in self.upload_queue:
self.upload_queue[either_id] = []

self.upload_queue[either_id].extend(datapoints)
self.points_queued.inc(len(datapoints))
self.upload_queue_size += len(datapoints)
self.queue_size.set(self.upload_queue_size)

self._check_triggers()

def upload(self) -> None:
"""
return self.upload_queue_size
Trigger an upload of the queue, clears queue afterwards
"""

@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_batch(upload_this: list[dict]) -> list[dict]:
if len(upload_this) > 0:
self.cdf_client.time_series.data.insert_multiple(upload_this)

return upload_this

if len(self.upload_queue) == 0:
return

with self.lock:
upload_this = _upload_batch(
[
{either_id.type(): either_id.content(), "datapoints": list(datapoints)}
for either_id, datapoints in self.upload_queue.items()
if len(datapoints) > 0
]
)

for _either_id, datapoints in self.upload_queue.items():
self.points_written.inc(len(datapoints))

try:
self._post_upload(upload_this)
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

self.upload_queue.clear()
self.logger.info(f"Uploaded {self.upload_queue_size} datapoints")
self.upload_queue_size = 0
self.queue_size.set(self.upload_queue_size)


class SequenceUploadQueue(AbstractUploadQueue):
Expand Down
Loading
Loading