Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions cognite_toolkit/_cdf_tk/commands/_migrate/migration_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
ItemsResultList,
ItemsSuccessResponse,
)
from cognite_toolkit._cdf_tk.client.identifiers import InternalId
from cognite_toolkit._cdf_tk.client.identifiers import InternalId, SpaceId
from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse
from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import EdgeId, InstanceRequest, NodeId
from cognite_toolkit._cdf_tk.client.resource_classes.migration import SpaceSource
from cognite_toolkit._cdf_tk.client.resource_classes.pending_instance_id import PendingInstanceId
from cognite_toolkit._cdf_tk.client.resource_classes.three_d import (
AssetMappingClassicResponse,
AssetMappingDMRequestId,
ThreeDModelClassicResponse,
)
from cognite_toolkit._cdf_tk.commands._migrate.data_classes import ThreeDMigrationRequest
from cognite_toolkit._cdf_tk.constants import MISSING_EXTERNAL_ID, MISSING_INSTANCE_SPACE
from cognite_toolkit._cdf_tk.constants import MISSING_EXTERNAL_ID
from cognite_toolkit._cdf_tk.exceptions import ToolkitNotImplementedError, ToolkitValueError
from cognite_toolkit._cdf_tk.storageio import (
AnnotationIO,
Expand All @@ -41,7 +42,7 @@
ThreeDSelector,
)
from cognite_toolkit._cdf_tk.tk_warnings import MediumSeverityWarning
from cognite_toolkit._cdf_tk.utils.collection import chunker_sequence
from cognite_toolkit._cdf_tk.utils.collection import chunker_sequence, humanize_collection
from cognite_toolkit._cdf_tk.utils.useful_types import (
AssetCentricKindExtended,
AssetCentricType,
Expand Down Expand Up @@ -89,12 +90,28 @@ def stream_data(
bookmark: Bookmark | None = None,
) -> Iterator[Page]:
file_location = bookmark if isinstance(bookmark, FileBookmark) else None

if isinstance(selector, MigrationCSVFileSelector):
instance_spaces = list({SpaceId(space=item.instance_id.space) for item in selector.items})
iterator = self._stream_from_csv(selector, limit, file_location)
elif isinstance(selector, MigrateDataSetSelector):
iterator = self._stream_given_dataset(selector, limit)
space_source = self.client.migration.space_source.retrieve(
data_set_external_id=selector.data_set_external_id
)
if space_source is None:
raise ToolkitValueError(
f"Missing instance space that maps to {selector.data_set_external_id!r}. Have you run `cdf migrate data-sets`?"
)
instance_spaces = [SpaceId(space=space_source.space)]
iterator = self._stream_given_dataset(selector, space_source, limit)
else:
raise ToolkitNotImplementedError(f"Selector {type(selector)} is not supported for stream_data")
existing = self.client.tool.spaces.retrieve(instance_spaces)
if missing := set(instance_spaces).difference({item.as_id() for item in existing}):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line, and line 94 (list({SpaceId(...)})), rely on SpaceId being hashable to be used in a set. However, the SpaceId class does not appear to have a __hash__ method defined, unlike other similar identifier classes in the project (e.g., ViewNoVersionId). This could lead to a TypeError at runtime if SpaceId is not hashable by default in your Pydantic configuration. Please verify that SpaceId is hashable. If not, you should implement __hash__ on it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is frozen and thus hashable.

raise ToolkitValueError(
f"The following instance spaces do not exist in CDF: {humanize_collection(missing)}. Please create these spaces before running the migration."
)

yield from (
Page(
worker_id="main",
Expand Down Expand Up @@ -132,11 +149,10 @@ def count(self, selector: AssetCentricMigrationSelector) -> int | None:
raise ToolkitNotImplementedError(f"Selector {type(selector)} is not supported for count")

def _stream_given_dataset(
self, selector: MigrateDataSetSelector, limit: int | None = None
self, selector: MigrateDataSetSelector, space_source: SpaceSource, limit: int | None = None
) -> Iterator[Sequence[AssetCentricMapping[T_AssetCentricResource]]]:
asset_centric_selector = selector.as_asset_centric_selector()
space_source = self.client.migration.space_source.retrieve(data_set_external_id=selector.data_set_external_id)
instance_space = space_source.instance_space if space_source else MISSING_INSTANCE_SPACE
instance_space = space_source.instance_space
for data_chunk in self.hierarchy.stream_data(asset_centric_selector, limit):
mapping_list: list[AssetCentricMapping[T_AssetCentricResource]] = []
for data_item in data_chunk.items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
)
from cognite_toolkit._cdf_tk.client.resource_classes.chart import ChartResponse
from cognite_toolkit._cdf_tk.client.resource_classes.charts_data import ChartData, ChartSource, ChartTimeseries
from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import InstanceSource, NodeRequest, ViewId
from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import (
InstanceSource,
NodeRequest,
SpaceResponse,
ViewId,
)
from cognite_toolkit._cdf_tk.client.resource_classes.migration import InstanceSource as LegacyInstanceSource
from cognite_toolkit._cdf_tk.client.testing import monkeypatch_toolkit_client
from cognite_toolkit._cdf_tk.commands._migrate.command import MigrationCommand
Expand Down Expand Up @@ -203,6 +208,18 @@ def test_migrate_assets(
+ "\n".join(f"{1000 + i},{space},asset_{i},{ASSET_ID},cdf_cdm,CogniteAsset,v1" for i in range(len(assets)))
)

# Retrieve space
respx.post(
config.create_api_url("/models/spaces/byids"),
).mock(
return_value=httpx.Response(
status_code=200,
json={
"items": [SpaceResponse(space=space, created_time=1, last_updated_time=1, is_global=False).dump()]
},
)
)

# Asset retrieve ids
respx.post(
config.create_api_url("/assets/byids"),
Expand Down Expand Up @@ -318,6 +335,17 @@ def test_migrate_resume(
"id,space,externalId,ingestionView,consumerViewSpace,consumerViewExternalId,consumerViewVersion\n"
f"{assets[0].id},{space},{assets[0].external_id},{ASSET_ID},cdf_cdm,CogniteAsset,v1"
)
# Retrieve space
respx.post(
config.create_api_url("/models/spaces/byids"),
).mock(
return_value=httpx.Response(
status_code=200,
json={
"items": [SpaceResponse(space=space, created_time=1, last_updated_time=1, is_global=False).dump()]
},
)
)

# Asset retrieve ids
respx.post(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from pathlib import Path
from unittest.mock import MagicMock

import httpx
import pytest
import respx
from httpx import Response

from cognite_toolkit._cdf_tk.client import ToolkitClient, ToolkitClientConfig
from cognite_toolkit._cdf_tk.client.http_client import HTTPClient
from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse
from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling import SpaceResponse
from cognite_toolkit._cdf_tk.commands._migrate.migration_io import (
AnnotationMigrationIO,
AssetCentricMigrationIO,
Expand Down Expand Up @@ -39,6 +41,19 @@ def test_download(self, toolkit_client: ToolkitClient, respx_mock: respx.MockRou
}
for i in range(N)
]
respx.post(
config.create_api_url("/models/spaces/byids"),
).mock(
return_value=httpx.Response(
status_code=200,
json={
"items": [
SpaceResponse(space="mySpace", created_time=1, last_updated_time=1, is_global=False).dump()
]
},
)
)

respx_mock.post(config.create_api_url("/assets/byids")).mock(
side_effect=[
Response(status_code=200, json={"items": items[: AssetIO.CHUNK_SIZE]}),
Expand Down
Loading