Skip to content

Commit 983eaf1

Browse files
MagsschMagnus Schjølberg
andauthored
[CDF-26937] Events-to-records migration part 2 (#2767)
# Description Prepares the migration infrastructure to support targeting Records/Streams in addition to FDM instances. Generalizes `create_properties()` to accept `ContainerPropertyDefinition` alongside `ViewResponseProperty`, and makes the `container_id` parameter optional in `_prepare_asset_centric_arguments`, since this will not be used for records. I also changed my mind from previously and have made `iter_last_updated_time_windows` to `last_updated_time_windows` (iterator → list), since I realize it would have needed to be wrapped in list() anyways. ## Bump - [ ] Patch - [x] Skip --------- Co-authored-by: Magnus Schjølberg <magnus.schjolberg@cognitedata.com>
1 parent 6ddcd81 commit 983eaf1

File tree

6 files changed

+118
-48
lines changed

6 files changed

+118
-48
lines changed

cognite_toolkit/_cdf_tk/apps/_migrate_app.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
MigrateDataSetSelector,
4646
MigrationCSVFileSelector,
4747
)
48+
from cognite_toolkit._cdf_tk.exceptions import ToolkitValueError
4849
from cognite_toolkit._cdf_tk.feature_flags import Flags
4950
from cognite_toolkit._cdf_tk.storageio import CanvasIO, ChartIO, InstanceIO
5051
from cognite_toolkit._cdf_tk.storageio.selectors import (
@@ -410,7 +411,7 @@ def _prepare_asset_centric_arguments(
410411
skip_existing: bool,
411412
kind: AssetCentricKind,
412413
resource_type: str,
413-
container_id: ContainerId,
414+
container_id: ContainerId | None = None,
414415
) -> tuple[AssetCentricMigrationSelector, bool, bool, bool]:
415416
if data_set_id is not None and mapping_file is not None:
416417
raise typer.BadParameter("Cannot specify both data_set_id and mapping_file")
@@ -444,6 +445,11 @@ def _prepare_asset_centric_arguments(
444445
)
445446
else:
446447
# Interactive selection of data set.
448+
if container_id is None:
449+
raise ToolkitValueError(
450+
"container_id is required when neither mapping_file nor data_set_id is provided "
451+
"(interactive migration)."
452+
)
447453
selector = AssetInteractiveSelect(client, "migrate")
448454
selected_data_set_id = selector.select_data_set(allow_empty=False)
449455
asset_mapping = ResourceViewMappingInteractiveSelect(client, "migrate").select_resource_view_mapping(

cognite_toolkit/_cdf_tk/commands/_migrate/conversion.py

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,17 @@
88
from pydantic import JsonValue
99

1010
from cognite_toolkit._cdf_tk.client import ToolkitClient
11-
from cognite_toolkit._cdf_tk.client.identifiers import AssetCentricExternalId, EdgeTypeId, ExternalId, InternalId
11+
from cognite_toolkit._cdf_tk.client.identifiers import (
12+
AssetCentricExternalId,
13+
ContainerId,
14+
EdgeTypeId,
15+
ExternalId,
16+
InternalId,
17+
)
1218
from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse, AssetLinkData, FileLinkData
1319
from cognite_toolkit._cdf_tk.client.resource_classes.asset import AssetResponse
1420
from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import (
21+
ContainerPropertyDefinition,
1522
DirectNodeRelation,
1623
EdgeId,
1724
EdgeProperty,
@@ -318,50 +325,64 @@ def _lookup_resource_type(resource_type: AssetCentricResourceExtended) -> AssetC
318325

319326
def create_properties(
320327
dumped: dict[str, Any],
321-
view_properties: dict[str, ViewResponseProperty],
328+
properties: dict[str, ViewResponseProperty] | dict[str, ContainerPropertyDefinition],
322329
property_mapping: dict[str, str],
323330
resource_type: AssetCentricTypeExtended,
324331
issue: ConversionIssue,
325332
direct_relation_cache: DirectRelationCache,
333+
container_id: ContainerId | None = None,
326334
) -> dict[str, JsonValue | NodeId | list[NodeId]]:
327335
"""
328-
Create properties for a data model instance from an asset-centric resource.
336+
Create properties for a data model instance or record from an asset-centric resource.
337+
338+
When ``container_id`` is provided the values in ``properties`` are treated as
339+
``ContainerPropertyDefinition`` objects (records/streams path). Otherwise they must be
340+
``ViewResponseProperty`` objects (FDM/view path).
329341
330342
Args:
331343
dumped: Dict representation of the asset-centric resource.
332-
view_properties: Defined properties referenced in the view source mapping.
333-
property_mapping: Mapping from asset-centric property IDs to data model property IDs.
344+
properties: Defined properties referenced in the mapping.
345+
property_mapping: Mapping from asset-centric property JSON-paths to target property IDs.
334346
resource_type: The type of the asset-centric resource (e.g., "asset", "timeseries").
335347
issue: ConversionIssue object to log any issues encountered during conversion.
336-
direct_relation_cache: Cache for direct relation references to look up target of direct relations.
348+
direct_relation_cache: Cache for direct relation references.
349+
container_id: Target container ID, only used if container properties are being populated
337350
338351
Returns:
339-
Dict of property IDs to PropertyValueWrite objects.
352+
Dict of property IDs to property values.
340353
341354
"""
342355
flatten_dump = flatten_dict_json_path(dumped, keep_structured=set(property_mapping.keys()))
343-
properties: dict[str, JsonValue | NodeId | list[NodeId]] = {}
356+
result: dict[str, JsonValue | NodeId | list[NodeId]] = {}
344357
ignored_asset_centric_properties: set[str] = set()
345358
for prop_json_path, prop_id in property_mapping.items():
346359
if prop_json_path not in flatten_dump:
347360
continue
348-
if prop_id not in view_properties:
361+
if prop_id not in properties:
349362
continue
350-
if prop_id in properties:
363+
if prop_id in result:
351364
ignored_asset_centric_properties.add(prop_json_path)
352365
continue
353-
dm_prop = view_properties[prop_id]
354-
if not isinstance(dm_prop, ViewCorePropertyResponse):
366+
prop_def = properties[prop_id]
367+
if isinstance(prop_def, ContainerPropertyDefinition):
368+
if container_id is None:
369+
raise ValueError("Cannot create mapping directly to container property without providing Container ID")
370+
destination_container_property = (container_id, prop_id)
371+
elif isinstance(prop_def, ViewCorePropertyResponse):
372+
destination_container_property = (prop_def.container, prop_def.container_property_identifier)
373+
else:
355374
issue.invalid_instance_property_types.append(
356375
InvalidPropertyDataType(property_id=prop_id, expected_type=ViewCorePropertyResponse.__name__)
357376
)
358377
continue
378+
data_type = prop_def.type
379+
nullable = prop_def.nullable or False
359380
try:
360381
value = asset_centric_convert_to_primary_property(
361382
flatten_dump[prop_json_path],
362-
dm_prop.type,
363-
dm_prop.nullable or False,
364-
destination_container_property=(dm_prop.container, dm_prop.container_property_identifier),
383+
data_type,
384+
nullable,
385+
destination_container_property=destination_container_property,
365386
source_property=(resource_type, prop_json_path),
366387
direct_relation_lookup=direct_relation_cache.get_cache(resource_type, prop_json_path),
367388
)
@@ -372,12 +393,12 @@ def create_properties(
372393
continue
373394
if isinstance(value, datetime):
374395
# Convert datetime to ISO format string, as the data model expects datetimes as strings in ISO format.
375-
properties[prop_id] = value.isoformat(timespec="milliseconds")
396+
result[prop_id] = value.isoformat(timespec="milliseconds")
376397
elif isinstance(value, date):
377398
# Convert date to ISO format string, as the data model expects dates as strings in ISO format.
378-
properties[prop_id] = value.isoformat()
399+
result[prop_id] = value.isoformat()
379400
else:
380-
properties[prop_id] = value
401+
result[prop_id] = value
381402

382403
issue.ignored_asset_centric_properties = sorted(
383404
(set(flatten_dump.keys()) - set(property_mapping.keys())) | ignored_asset_centric_properties
@@ -390,9 +411,9 @@ def create_properties(
390411
for prop_id in property_mapping.values()
391412
if not (prop_id.startswith("edge.") or prop_id.startswith("node."))
392413
}
393-
- set(view_properties.keys())
414+
- set(properties.keys())
394415
)
395-
return properties
416+
return result
396417

397418

398419
def create_edge_properties(

cognite_toolkit/_cdf_tk/cruds/_resource_cruds/streams.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import time
2-
from collections.abc import Hashable, Iterable, Iterator, Sequence
2+
from collections.abc import Hashable, Iterable, Sequence
33
from datetime import timedelta
44
from typing import Any, Literal, final
55

@@ -87,34 +87,34 @@ def _iterate(
8787
all_streams = self.client.streams.list()
8888
return iter(all_streams)
8989

90-
def iter_last_updated_time_windows(
90+
def last_updated_time_windows(
9191
self, stream_external_id: str, start_ms: int | None = None
92-
) -> Iterator[dict[str, int] | None]:
93-
"""Yield lastUpdatedTime filter dicts to use in record queries.
92+
) -> list[dict[str, int] | None]:
93+
"""Return lastUpdatedTime filter dicts to use in record queries.
9494
95-
Each yielded dict is {"gte": ..., "lt": ...} representing one query window.
96-
None is yielded for Mutable streams with no start_ms — meaning no time filter is needed.
97-
Yields nothing if the stream does not exist.
95+
Each dict is {"gte": ..., "lt": ...} representing one query window.
96+
None is returned for Mutable streams with no start_ms — meaning no time filter is needed.
97+
Returns an empty list if the stream does not exist.
9898
9999
Immutable streams enforce a maxFilteringInterval per request, so the range
100100
[start_ms (or stream.createdTime), now) is split into consecutive windows.
101101
Mutable streams have no such constraint and are covered in a single pass.
102102
"""
103103
streams = self.retrieve(ExternalId.from_external_ids([stream_external_id]))
104104
if not streams:
105-
return
105+
return []
106106
stream = streams[0]
107107
now_ms = int(time.time() * 1000)
108108
if stream.type == "Mutable":
109109
if start_ms is None:
110-
yield None
111-
else:
112-
yield {"gte": start_ms, "lt": now_ms}
113-
return
110+
return [None]
111+
return [{"gte": start_ms, "lt": now_ms}]
114112
effective_start_ms = start_ms if start_ms is not None else stream.created_time
115113
max_interval_ms: int | None = None
116114
if stream.settings and stream.settings.limits.max_filtering_interval:
117115
td = _TIMEDELTA_ADAPTER.validate_python(stream.settings.limits.max_filtering_interval)
118116
max_interval_ms = int(td.total_seconds() * 1000)
119-
for window_start, window_end in time_windows_ms(effective_start_ms, now_ms, max_interval_ms):
120-
yield {"gte": window_start, "lt": window_end}
117+
return [
118+
{"gte": window_start, "lt": window_end}
119+
for window_start, window_end in time_windows_ms(effective_start_ms, now_ms, max_interval_ms)
120+
]

cognite_toolkit/_cdf_tk/storageio/_records.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ def count(self, selector: RecordContainerSelector) -> int | None:
5858
start_ms = timestamp_to_ms(selector.initialize_cursor)
5959
total = 0
6060
stream_crud = StreamCRUD.create_loader(self.client)
61-
for last_updated_time in stream_crud.iter_last_updated_time_windows(
62-
selector.stream.external_id, start_ms=start_ms
63-
):
61+
for last_updated_time in stream_crud.last_updated_time_windows(selector.stream.external_id, start_ms=start_ms):
6462
body: dict[str, object] = {
6563
"filter": sync_filter,
6664
"aggregates": {"total": {"count": {}}},

tests/test_unit/test_cdf_tk/test_commands/test_migration_cmd/test_conversion.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
BooleanProperty,
1616
ConstraintOrIndexState,
1717
ContainerId,
18+
ContainerPropertyDefinition,
1819
DateProperty,
1920
DirectNodeRelation,
2021
EdgeId,
@@ -119,7 +120,7 @@ class TestCreateProperties:
119120
EVENT_CENTRIC_ID = AssetCentricId(resource_type="event", id_=456)
120121

121122
@pytest.mark.parametrize(
122-
"dumped,view_properties,property_mapping,expected_properties,expected_issue",
123+
"dumped,property_definitions,property_mapping,expected_properties,expected_issue,container_id",
123124
[
124125
pytest.param(
125126
{"name": "MyAsset", "description": "An asset,", "metadata": {"categoryNo": "1"}},
@@ -146,8 +147,37 @@ class TestCreateProperties:
146147
{"name": "nameId", "description": "descriptionId", "metadata.categoryNo": "categoryNoId"},
147148
{"nameId": "MyAsset", "descriptionId": "An asset,", "categoryNoId": 1},
148149
ConversionIssue(id=str(ASSET_CENTRIC_ID), instance_id=INSTANCE_ID, asset_centric_id=ASSET_CENTRIC_ID),
150+
None,
149151
id="Basic property mapping with integer conversion and no issues",
150152
),
153+
pytest.param(
154+
{"name": "MyAsset", "description": "An asset,", "metadata": {"categoryNo": "1"}},
155+
{
156+
"nameId": ContainerPropertyDefinition(
157+
type=TextProperty(),
158+
nullable=True,
159+
immutable=False,
160+
auto_increment=False,
161+
),
162+
"descriptionId": ContainerPropertyDefinition(
163+
type=TextProperty(),
164+
nullable=True,
165+
immutable=False,
166+
auto_increment=False,
167+
),
168+
"categoryNoId": ContainerPropertyDefinition(
169+
type=Int64Property(),
170+
nullable=True,
171+
immutable=False,
172+
auto_increment=False,
173+
),
174+
},
175+
{"name": "nameId", "description": "descriptionId", "metadata.categoryNo": "categoryNoId"},
176+
{"nameId": "MyAsset", "descriptionId": "An asset,", "categoryNoId": 1},
177+
ConversionIssue(id=str(ASSET_CENTRIC_ID), instance_id=INSTANCE_ID, asset_centric_id=ASSET_CENTRIC_ID),
178+
CONTAINER_ID,
179+
id="Basic property mapping with container property definitions",
180+
),
151181
pytest.param(
152182
{"name": "MyAsset", "created": "2023-01-01T12:00:00Z", "active": True},
153183
{
@@ -177,6 +207,7 @@ class TestCreateProperties:
177207
"activeId": True,
178208
},
179209
ConversionIssue(id=str(ASSET_CENTRIC_ID), asset_centric_id=ASSET_CENTRIC_ID, instance_id=INSTANCE_ID),
210+
None,
180211
id="Multiple data types conversion",
181212
),
182213
pytest.param(
@@ -203,6 +234,7 @@ class TestCreateProperties:
203234
instance_id=INSTANCE_ID,
204235
missing_asset_centric_properties=["description"],
205236
),
237+
None,
206238
id="Missing property in flattened dump",
207239
),
208240
pytest.param(
@@ -223,6 +255,7 @@ class TestCreateProperties:
223255
instance_id=INSTANCE_ID,
224256
missing_instance_properties=["descriptionId"],
225257
),
258+
None,
226259
id="Missing property in view properties",
227260
),
228261
pytest.param(
@@ -247,6 +280,7 @@ class TestCreateProperties:
247280
InvalidPropertyDataType(property_id="nameId", expected_type="ViewCorePropertyResponse")
248281
],
249282
),
283+
None,
250284
id="Invalid property type",
251285
),
252286
pytest.param(
@@ -271,6 +305,7 @@ class TestCreateProperties:
271305
)
272306
],
273307
),
308+
None,
274309
id="Conversion error",
275310
),
276311
pytest.param(
@@ -291,6 +326,7 @@ class TestCreateProperties:
291326
instance_id=INSTANCE_ID,
292327
missing_asset_centric_properties=["name"],
293328
),
329+
None,
294330
id="Empty dictionary",
295331
),
296332
pytest.param(
@@ -316,6 +352,7 @@ class TestCreateProperties:
316352
asset_centric_id=ASSET_CENTRIC_ID,
317353
instance_id=INSTANCE_ID,
318354
),
355+
None,
319356
id="List of simple types (labels to list of strings)",
320357
),
321358
pytest.param(
@@ -352,6 +389,7 @@ class TestCreateProperties:
352389
instance_id=INSTANCE_ID,
353390
ignored_asset_centric_properties=["labels[1].externalId"],
354391
),
392+
None,
355393
id="Mapping the first label and entire metadata.",
356394
),
357395
pytest.param(
@@ -382,6 +420,7 @@ class TestCreateProperties:
382420
instance_id=INSTANCE_ID,
383421
ignored_asset_centric_properties=["metadata.category"],
384422
),
423+
None,
385424
id="Duplicated mapping target",
386425
),
387426
pytest.param(
@@ -410,24 +449,34 @@ class TestCreateProperties:
410449
asset_centric_id=ASSET_CENTRIC_ID,
411450
instance_id=INSTANCE_ID,
412451
),
452+
None,
413453
id="Japanese characters in property names and values",
414454
),
415455
],
416456
)
417457
def test_create_properties(
418458
self,
419459
dumped: dict[str, Any],
420-
view_properties: dict[str, ViewResponseProperty],
460+
property_definitions: dict[str, ViewResponseProperty] | dict[str, ContainerPropertyDefinition],
421461
property_mapping: dict[str, str],
422462
expected_properties: dict[str, JsonValue],
423463
expected_issue: ConversionIssue,
464+
container_id: ContainerId | None,
424465
direct_relation_cache: DirectRelationCache,
425466
) -> None:
426467
issue = ConversionIssue(
427468
asset_centric_id=self.ASSET_CENTRIC_ID, instance_id=self.INSTANCE_ID, id=str(self.ASSET_CENTRIC_ID)
428469
)
429470

430-
properties = create_properties(dumped, view_properties, property_mapping, "asset", issue, direct_relation_cache)
471+
properties = create_properties(
472+
dumped,
473+
property_definitions,
474+
property_mapping,
475+
"asset",
476+
issue,
477+
direct_relation_cache,
478+
container_id,
479+
)
431480

432481
assert properties == expected_properties
433482

0 commit comments

Comments
 (0)