-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathdata_modeling.py
More file actions
152 lines (131 loc) · 5.01 KB
/
data_modeling.py
File metadata and controls
152 lines (131 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
Module for uploading data modeling instances to CDF.
"""
from collections.abc import Callable
from types import TracebackType
from typing import Any
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import EdgeApply, NodeApply
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
RETRIES,
RETRY_BACKOFF_FACTOR,
RETRY_DELAY,
RETRY_MAX_DELAY,
AbstractUploadQueue,
)
from cognite.extractorutils.util import cognite_exceptions, retry
class InstanceUploadQueue(AbstractUploadQueue):
"""
Upload queue for data modeling instances (nodes and edges).
Args:
cdf_client: Cognite Data Fusion client to use.
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the nodes and edges that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload on an interval when run as a thread (use start/stop
methods). Unit is seconds.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
cancellation_token: Cancellation token for managing thread cancellation.
auto_create_start_nodes: Automatically create start nodes if they do not exist.
auto_create_end_nodes: Automatically create end nodes if they do not exist.
auto_create_direct_relations: Automatically create direct relations if they do not exist.
"""
def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Callable[[list[Any]], None] | None = None,
max_queue_size: int | None = None,
max_upload_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
cancellation_token: CancellationToken | None = None,
auto_create_start_nodes: bool = True,
auto_create_end_nodes: bool = True,
auto_create_direct_relations: bool = True,
) -> None:
super().__init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
cancellation_token,
)
self.auto_create_start_nodes = auto_create_start_nodes
self.auto_create_end_nodes = auto_create_end_nodes
self.auto_create_direct_relations = auto_create_direct_relations
self.node_queue: list[NodeApply] = []
self.edge_queue: list[EdgeApply] = []
def add_to_upload_queue(
self,
*,
node_data: list[NodeApply] | None = None,
edge_data: list[EdgeApply] | None = None,
) -> None:
"""
Add instances to the upload queue.
The queue will be uploaded if the queue size is larger than the threshold specified in the ``__init__``.
Args:
node_data: List of nodes to add to the upload queue.
edge_data: List of edges to add to the upload queue.
"""
if node_data:
with self.lock:
self.node_queue.extend(node_data)
self.upload_queue_size += len(node_data)
if edge_data:
with self.lock:
self.edge_queue.extend(edge_data)
self.upload_queue_size += len(edge_data)
with self.lock:
self._check_triggers()
def upload(self) -> None:
"""
Trigger an upload of the queue, clears queue afterwards.
"""
@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def upload_batch() -> None:
self.cdf_client.data_modeling.instances.apply(
nodes=self.node_queue,
edges=self.edge_queue,
auto_create_start_nodes=self.auto_create_start_nodes,
auto_create_end_nodes=self.auto_create_end_nodes,
auto_create_direct_relations=self.auto_create_direct_relations,
)
self.node_queue.clear()
self.edge_queue.clear()
self.upload_queue_size = 0
with self.lock:
upload_batch()
def __enter__(self) -> "InstanceUploadQueue":
"""
Wraps around start method, for use as context manager.
Returns:
self
"""
self.start()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""
Wraps around stop method, for use as context manager.
Args:
exc_type: Exception type
exc_val: Exception value
exc_tb: Traceback
"""
self.stop()