Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/+convert-object-type.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `convert_object_type` method to allow converting an object to another type.
63 changes: 63 additions & 0 deletions infrahub_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from .config import Config
from .constants import InfrahubClientMode
from .convert_object_type import CONVERT_OBJECT_MUTATION, ConversionFieldInput
from .data import RepositoryBranchInfo, RepositoryData
from .diff import NodeDiff, diff_tree_node_to_node_diff, get_diff_summary_query
from .exceptions import (
Expand Down Expand Up @@ -1670,6 +1671,37 @@ async def __aexit__(

self.mode = InfrahubClientMode.DEFAULT

async def convert_object_type(
self,
node_id: str,
target_kind: str,
branch: str | None = None,
fields_mapping: dict[str, ConversionFieldInput] | None = None,
) -> InfrahubNode:
"""
Convert a given node to another kind on a given branch. `fields_mapping` keys are target fields names
and its values indicate how to fill in these fields. Any mandatory field not having an equivalent field
in the source kind should be specified in this mapping. See https://docs.infrahub.app/guides/object-convert-type
for more information.
"""

if fields_mapping is None:
mapping_dict = {}
else:
mapping_dict = {field_name: model.model_dump(mode="json") for field_name, model in fields_mapping.items()}

branch_name = branch or self.default_branch
response = await self.execute_graphql(
query=CONVERT_OBJECT_MUTATION,
variables={
"node_id": node_id,
"fields_mapping": mapping_dict,
"target_kind": target_kind,
},
branch_name=branch_name,
)
return await InfrahubNode.from_graphql(client=self, branch=branch_name, data=response["ConvertObjectType"])


class InfrahubClientSync(BaseClient):
schema: InfrahubSchemaSync
Expand Down Expand Up @@ -2984,3 +3016,34 @@ def __exit__(
self.group_context.update_group()

self.mode = InfrahubClientMode.DEFAULT

def convert_object_type(
self,
node_id: str,
target_kind: str,
branch: str | None = None,
fields_mapping: dict[str, ConversionFieldInput] | None = None,
) -> InfrahubNodeSync:
"""
Convert a given node to another kind on a given branch. `fields_mapping` keys are target fields names
and its values indicate how to fill in these fields. Any mandatory field not having an equivalent field
in the source kind should be specified in this mapping. See https://docs.infrahub.app/guides/object-convert-type
for more information.
"""

if fields_mapping is None:
mapping_dict = {}
else:
mapping_dict = {field_name: model.model_dump(mode="json") for field_name, model in fields_mapping.items()}

branch_name = branch or self.default_branch
response = self.execute_graphql(
query=CONVERT_OBJECT_MUTATION,
variables={
"node_id": node_id,
"fields_mapping": mapping_dict,
"target_kind": target_kind,
},
branch_name=branch_name,
)
return InfrahubNodeSync.from_graphql(client=self, branch=branch_name, data=response["ConvertObjectType"])
60 changes: 60 additions & 0 deletions infrahub_sdk/convert_object_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from typing import Any

from pydantic import BaseModel, model_validator

CONVERT_OBJECT_MUTATION = """
mutation($node_id: String!, $target_kind: String!, $fields_mapping: GenericScalar!) {
ConvertObjectType(data: {
node_id: $node_id,
target_kind: $target_kind,
fields_mapping: $fields_mapping
}) {
ok
node
}
}
"""


class ConversionFieldValue(BaseModel): # Only one of these fields can be not None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a follow up PR infrahub side to use these classes from the SDK instead of having them duplicated

"""
Holds the new value of the destination field during an object conversion.
Use `attribute_value` to specify the new raw value of an attribute.
Use `peer_id` to specify new peer of a cardinality one relationship.
Use `peer_ids` to specify new peers of a cardinality many relationship.
Only one of `attribute_value`, `peer_id` and `peers_ids` can be specified.
"""

attribute_value: Any | None = None
peer_id: str | None = None
peers_ids: list[str] | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the docstring says peer_ids, but the code says peers_ids
I think peer_ids is better, but, either way, they should match

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going for peers_ids as it what the server expects


@model_validator(mode="after")
def check_only_one_field(self) -> ConversionFieldValue:
fields = [self.attribute_value, self.peer_id, self.peers_ids]
set_fields = [f for f in fields if f is not None]
if len(set_fields) != 1:
raise ValueError("Exactly one of attribute_value, peer_id, or peers_ids must be set")
return self


class ConversionFieldInput(BaseModel):
"""
Indicates how to fill in the value of the destination field during an object conversion.
Use `source_field` to reuse the value of the corresponding field of the object being converted.
Use `data` to specify the new value for the field.
Only one of `source_field` or `data` can be specified.
"""

source_field: str | None = None
data: ConversionFieldValue | None = None

@model_validator(mode="after")
def check_only_one_field(self) -> ConversionFieldInput:
if self.source_field is not None and self.data is not None:
raise ValueError("Only one of source_field or data can be set")
if self.source_field is None and self.data is None:
raise ValueError("Either source_field or data must be set")
return self
3 changes: 3 additions & 0 deletions tests/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CLIENT_TYPE_ASYNC = "standard"
CLIENT_TYPE_SYNC = "sync"
CLIENT_TYPES = [CLIENT_TYPE_ASYNC, CLIENT_TYPE_SYNC]
100 changes: 100 additions & 0 deletions tests/integration/test_convert_object_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

import uuid
from typing import Any

import pytest

from infrahub_sdk.convert_object_type import ConversionFieldInput, ConversionFieldValue
from infrahub_sdk.testing.docker import TestInfrahubDockerClient
from tests.constants import CLIENT_TYPE_ASYNC, CLIENT_TYPES

SCHEMA: dict[str, Any] = {
"version": "1.0",
"generics": [
{
"name": "PersonGeneric",
"namespace": "Testconv",
"human_friendly_id": ["name__value"],
"attributes": [
{"name": "name", "kind": "Text", "unique": True},
],
},
],
"nodes": [
{
"name": "Person1",
"namespace": "Testconv",
"inherit_from": ["TestconvPersonGeneric"],
},
{
"name": "Person2",
"namespace": "Testconv",
"inherit_from": ["TestconvPersonGeneric"],
"attributes": [
{"name": "age", "kind": "Number"},
],
"relationships": [
{
"name": "my_car",
"peer": "TestconvCar",
"cardinality": "one",
"identifier": "person__mandatory_owner",
},
{
"name": "fastest_cars",
"peer": "TestconvCar",
"cardinality": "many",
"identifier": "person__fastest_cars",
},
],
},
{
"name": "Car",
"namespace": "Testconv",
"human_friendly_id": ["name__value"],
"attributes": [
{"name": "name", "kind": "Text"},
],
},
],
}


class TestConvertObjectType(TestInfrahubDockerClient):
@pytest.mark.parametrize("client_type", CLIENT_TYPES)
async def test_convert_object_type(self, client, client_sync, client_type) -> None:
resp = await client.schema.load(schemas=[SCHEMA], wait_until_converged=True)
assert not resp.errors

person_1 = await client.create(kind="TestconvPerson1", name=f"person_{uuid.uuid4()}")
await person_1.save()
car_1 = await client.create(kind="TestconvCar", name=f"car_{uuid.uuid4()}")
await car_1.save()

new_age = 25
fields_mapping = {
"name": ConversionFieldInput(source_field="name"),
"age": ConversionFieldInput(data=ConversionFieldValue(attribute_value=new_age)),
"worst_car": ConversionFieldInput(data=ConversionFieldValue(peer_id=car_1.id)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this in here to test that it is ignored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I updated schema to replace my_car by worst_car

"fastest_cars": ConversionFieldInput(data=ConversionFieldValue(peers_ids=[car_1.id])),
}

if client_type == CLIENT_TYPE_ASYNC:
person_2 = await client.convert_object_type(
node_id=person_1.id,
target_kind="TestconvPerson2",
branch=client.default_branch,
fields_mapping=fields_mapping,
)
else:
person_2 = client_sync.convert_object_type(
node_id=person_1.id,
target_kind="TestconvPerson2",
branch=client.default_branch,
fields_mapping=fields_mapping,
)

assert person_2.get_kind() == "TestconvPerson2"
assert person_2.name.value == person_1.name.value
assert person_2.age.value == new_age
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth testing that fastest_cars is correctly updated too