Skip to content

Commit d195a67

Browse files
committed
Add option to have parent events and send secondary events
1 parent 4700a53 commit d195a67

File tree

10 files changed

+180
-39
lines changed

10 files changed

+180
-39
lines changed

backend/infrahub/core/changelog/models.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
if TYPE_CHECKING:
1111
from infrahub.core.attribute import BaseAttribute
12+
from infrahub.core.branch import Branch
1213
from infrahub.core.manager import RelationshipSchema
1314
from infrahub.core.query.relationship import RelationshipPeerData
1415
from infrahub.core.relationship.model import Relationship
16+
from infrahub.database import InfrahubDatabase
1517

1618

1719
class PropertyChangelog(BaseModel):
@@ -221,6 +223,11 @@ class NodeChangelog(BaseModel):
221223
def parent(self) -> ChangelogNodeParent | None:
222224
return self._parent
223225

226+
@property
227+
def updated_fields(self) -> list[str]:
228+
"""Return a list of update fields i.e. attributes and relationships"""
229+
return list(self.relationships.keys()) + list(self.attributes.keys())
230+
224231
@property
225232
def root_node_id(self) -> str:
226233
"""Return the top level node_id"""
@@ -404,3 +411,50 @@ def changelog(self) -> RelationshipCardinalityOneChangelog | RelationshipCardina
404411
return self.cardinality_one_relationship
405412
case RelationshipCardinality.MANY:
406413
return self.cardinality_many_relationship
414+
415+
416+
class RelationshipChangelogGetter:
417+
def __init__(self, db: InfrahubDatabase, branch: Branch) -> None:
418+
self._db = db
419+
self._branch = branch
420+
421+
async def get_changelogs(self, primary_changelog: NodeChangelog) -> list[NodeChangelog]:
422+
"""Return secondary changelogs based on this update
423+
424+
These will typically include updates to relationships on other nodes.
425+
"""
426+
schema_branch = self._db.schema.get_schema_branch(name=self._branch.name)
427+
node_schema = schema_branch.get(name=primary_changelog.node_kind)
428+
secondaries: list[NodeChangelog] = []
429+
430+
for relationship in primary_changelog.relationships.values():
431+
rel_schema = node_schema.get_relationship(name=relationship.name)
432+
if isinstance(relationship, RelationshipCardinalityOneChangelog):
433+
# For now this code only looks at the scenario when a cardinality=one relationship
434+
# is added to a node and it has a cardinality=many relationship coming back from
435+
# another node, it will be expanded to include all variations.
436+
if relationship.peer_status == DiffAction.ADDED:
437+
peer_schema = schema_branch.get(name=str(relationship.peer_kind))
438+
peer_relation = peer_schema.get_relationship_by_identifier(
439+
id=str(rel_schema.identifier), raise_on_error=False
440+
)
441+
if peer_relation:
442+
node_changelog = NodeChangelog(
443+
node_id=str(relationship.peer_id),
444+
node_kind=str(relationship.peer_kind),
445+
display_label="n/a",
446+
)
447+
if peer_relation.cardinality == RelationshipCardinality.MANY:
448+
node_changelog.relationships[peer_relation.name] = RelationshipCardinalityManyChangelog(
449+
name=peer_relation.name,
450+
peers=[
451+
RelationshipPeerChangelog(
452+
peer_id=primary_changelog.node_id,
453+
peer_kind=primary_changelog.node_kind,
454+
peer_status=DiffAction.ADDED,
455+
)
456+
],
457+
)
458+
secondaries.append(node_changelog)
459+
460+
return secondaries

backend/infrahub/core/constants/__init__.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -289,18 +289,6 @@ class AttributeDBNodeType(InfrahubStringEnum):
289289
IPNETWORK = "ipnetwork"
290290

291291

292-
class EventLevel(InfrahubStringEnum):
293-
ZERO = "zero"
294-
ONE = "one"
295-
296-
def to_int(self) -> int:
297-
match self:
298-
case EventLevel.ZERO:
299-
return 0
300-
case EventLevel.ONE:
301-
return 1
302-
303-
304292
RESTRICTED_NAMESPACES: list[str] = [
305293
"Account",
306294
"Branch",

backend/infrahub/events/models.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from infrahub import __version__
99
from infrahub.core.branch import Branch # noqa: TC001
10-
from infrahub.core.constants import EventLevel
1110
from infrahub.message_bus import InfrahubMessage, Meta
1211
from infrahub.worker import WORKER_IDENTITY
1312

@@ -27,10 +26,30 @@ class EventMeta(BaseModel):
2726
default=WORKER_IDENTITY, description="The worker identity of the initial sender of this message"
2827
)
2928
context: list[dict] = Field(default_factory=list)
30-
level: EventLevel = Field(default=EventLevel.ZERO)
29+
level: int = Field(default=0)
30+
has_children: bool = Field(
31+
default=False, description="Indicates if this event might potentially have child events under it."
32+
)
33+
34+
id: UUID = Field(
35+
default_factory=uuid4,
36+
description="UUID of the event",
37+
)
38+
39+
parent: UUID | None = Field(default=None, description="The UUID of the parent event if applicable")
40+
41+
def get_id(self) -> str:
42+
return str(self.id)
3143

3244
def get_related(self) -> list[dict[str, str]]:
33-
related: list[dict[str, str]] = []
45+
related: list[dict[str, str]] = [
46+
{"prefect.resource.id": __version__, "prefect.resource.role": "infrahub.version"},
47+
{
48+
"prefect.resource.id": self.get_id(),
49+
"prefect.resource.role": "infrahub.event",
50+
"infrahub.event.has_children": str(self.has_children).lower(),
51+
},
52+
]
3453
if self.account_id:
3554
related.append(
3655
{
@@ -50,25 +69,43 @@ def get_related(self) -> list[dict[str, str]]:
5069
}
5170
)
5271

53-
related.append({"prefect.resource.id": __version__, "prefect.resource.role": "infrahub.version"})
72+
if self.parent:
73+
related.append(
74+
{
75+
"prefect.resource.id": self.get_id(),
76+
"prefect.resource.role": "infrahub.child_event",
77+
"infrahub.event_parent.id": str(self.parent),
78+
}
79+
)
5480

5581
return related
5682

5783
@classmethod
5884
def default(cls) -> EventMeta:
5985
return cls()
6086

87+
@classmethod
88+
def from_parent(cls, parent: InfrahubEvent) -> EventMeta:
89+
"""Create the metadata from an existing event
90+
91+
Note that this action will modify the existing event to indicate that children might be attached to the event
92+
"""
93+
parent.meta.has_children = True
94+
return cls(
95+
parent=parent.meta.id,
96+
branch=parent.meta.branch,
97+
request_id=parent.meta.request_id,
98+
initiator_id=parent.meta.initiator_id,
99+
account_id=parent.meta.account_id,
100+
level=parent.meta.level + 1,
101+
)
102+
61103

62104
class InfrahubEvent(BaseModel):
63105
meta: EventMeta = Field(default_factory=EventMeta.default)
64106

65-
id: UUID = Field(
66-
default_factory=uuid4,
67-
description="UUID of the event",
68-
)
69-
70107
def get_id(self) -> str:
71-
return str(self.id)
108+
return self.meta.get_id()
72109

73110
def get_event_namespace(self) -> str:
74111
return EVENT_NAMESPACE

backend/infrahub/graphql/mutations/main.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from infrahub import config, lock
1111
from infrahub.core import registry
12+
from infrahub.core.changelog.models import RelationshipChangelogGetter
1213
from infrahub.core.constants import InfrahubKind, MutationAction
1314
from infrahub.core.constraint.node.runner import NodeConstraintRunner
1415
from infrahub.core.manager import NodeManager
@@ -100,21 +101,39 @@ async def mutate(cls, root: dict, info: GraphQLResolveInfo, data: InputObjectTyp
100101
if graphql_context.account_session:
101102
account_id = graphql_context.account_session.account_id
102103

103-
event = NodeMutatedEvent(
104+
meta = EventMeta(
105+
account_id=account_id,
106+
initiator_id=WORKER_IDENTITY,
107+
request_id=request_id,
108+
branch=graphql_context.branch,
109+
)
110+
main_event = NodeMutatedEvent(
104111
kind=obj._schema.kind,
105112
node_id=obj.id,
106113
data=obj.node_changelog,
107114
action=action,
108115
fields=_get_data_fields(data),
109-
meta=EventMeta(
110-
account_id=account_id,
111-
initiator_id=WORKER_IDENTITY,
112-
request_id=request_id,
113-
branch=graphql_context.branch,
114-
),
116+
meta=meta,
115117
)
116-
117-
graphql_context.background.add_task(graphql_context.active_service.event.send, event)
118+
relationship_changelogs = RelationshipChangelogGetter(db=graphql_context.db, branch=graphql_context.branch)
119+
node_changelogs = await relationship_changelogs.get_changelogs(primary_changelog=obj.node_changelog)
120+
121+
events = [main_event]
122+
123+
for node_changelog in node_changelogs:
124+
meta = EventMeta.from_parent(parent=main_event)
125+
event = NodeMutatedEvent(
126+
kind=node_changelog.node_kind,
127+
node_id=node_changelog.node_id,
128+
data=node_changelog,
129+
action=MutationAction.UPDATED,
130+
fields=node_changelog.updated_fields,
131+
meta=meta,
132+
)
133+
events.append(event)
134+
135+
for event in events:
136+
graphql_context.background.add_task(graphql_context.active_service.event.send, event)
118137

119138
return mutation
120139

backend/infrahub/graphql/types/event.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import TYPE_CHECKING, Any
44

5-
from graphene import Field, Int, Interface, List, NonNull, ObjectType, String
5+
from graphene import Boolean, Field, Int, Interface, List, NonNull, ObjectType, String
66
from graphene.types.generic import GenericScalar
77

88
from .common import RelatedNode
@@ -28,6 +28,8 @@ class EventNodeInterface(Interface):
2828
occurred_at = String(required=True)
2929
level = Int(required=True)
3030
primary_node = Field(RelatedNode, required=False)
31+
has_children = Boolean(required=True)
32+
parent_id = String(required=False)
3133

3234
@classmethod
3335
def resolve_type(

backend/infrahub/task_manager/event.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from prefect.events.schemas.events import ResourceSpecification
88
from pydantic import BaseModel, Field, TypeAdapter
99

10-
from infrahub.core.constants import EventLevel
1110
from infrahub.log import get_logger
1211
from infrahub.utils import get_nested_dict
1312

@@ -33,10 +32,20 @@ def get_level(self) -> int:
3332
level = resource.get("infrahub.event.level")
3433
if level is None:
3534
continue
36-
return EventLevel(level).to_int()
35+
try:
36+
return int(level)
37+
except ValueError:
38+
return 0
3739

3840
return 0
3941

42+
def get_parent(self) -> str | None:
43+
for resource in self.related:
44+
if resource.get("prefect.resource.role") != "infrahub.child_event":
45+
continue
46+
return resource.get("infrahub.event_parent.id")
47+
return None
48+
4049
def get_primary_node(self) -> dict[str, str] | None:
4150
node_id = self.resource.get("infrahub.node.id")
4251
node_kind = self.resource.get("infrahub.node.kind")
@@ -52,6 +61,15 @@ def get_account_id(self) -> str | None:
5261
return resource.get("infrahub.resource.id")
5362
return None
5463

64+
def has_children(self) -> bool:
65+
for resource in self.related:
66+
if resource.get("prefect.resource.role") != "infrahub.event":
67+
continue
68+
if resource.get("infrahub.event.has_children") == "true":
69+
return True
70+
return False
71+
return False
72+
5573
def _return_node_mutation(self) -> dict[str, Any]:
5674
attributes = []
5775

@@ -89,9 +107,11 @@ def to_graphql(self) -> dict[str, Any]:
89107
"branch": self.get_branch(),
90108
"account_id": self.get_account_id(),
91109
"occurred_at": self.occurred.to_iso8601_string(),
110+
"has_children": self.has_children(),
92111
"payload": self.payload,
93112
"level": self.get_level(),
94113
"primary_node": self.get_primary_node(),
114+
"parent_id": self.get_parent(),
95115
}
96116
response.update(self._return_event_specifics())
97117
return response

backend/tests/helpers/events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
async def send_events(client: PrefectClient, events: list[InfrahubEvent]) -> list[Event]:
1313
events_data = [
1414
Event(
15-
id=event.id,
15+
id=event.meta.id,
1616
event=event.get_name(),
1717
payload=event.get_payload(),
1818
related=[RelatedResource(item) for item in event.get_related()],

backend/tests/unit/core/test_changelog.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
PropertyChangelog,
55
RelationshipCardinalityManyChangelog,
66
RelationshipCardinalityOneChangelog,
7+
RelationshipChangelogGetter,
78
RelationshipPeerChangelog,
89
)
910
from infrahub.core.constants import DiffAction
@@ -132,6 +133,26 @@ async def test_node_changelog_creation(db: InfrahubDatabase, default_branch, ani
132133
)
133134
assert not dog1.node_changelog.parent
134135

136+
relationship_changelogs = RelationshipChangelogGetter(db=db, branch=default_branch)
137+
secondary_changelogs = await relationship_changelogs.get_changelogs(primary_changelog=dog1.node_changelog)
138+
assert len(secondary_changelogs) == 1
139+
140+
second = secondary_changelogs[0]
141+
assert second.node_id == person1.id
142+
assert second.node_kind == person1.get_kind()
143+
assert list(second.relationships.keys()) == ["animals"]
144+
assert second.relationships["animals"] == RelationshipCardinalityManyChangelog(
145+
name="animals",
146+
peers=[
147+
RelationshipPeerChangelog(
148+
peer_id=dog1.id,
149+
peer_kind=dog1.get_kind(),
150+
peer_status=DiffAction.ADDED,
151+
properties={},
152+
)
153+
],
154+
)
155+
135156

136157
async def test_node_changelog_update_with_cardinality_one_relationship(
137158
db: InfrahubDatabase, default_branch, animal_person_schema

backend/tests/unit/graphql/queries/test_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async def events_data(
206206

207207
@pytest.fixture
208208
async def event_ids_inscope(events_data: dict[str, InfrahubEvent]) -> list[str]:
209-
return [str(event.id) for event in events_data.values()]
209+
return [str(event.meta.id) for event in events_data.values()]
210210

211211

212212
def filter_outofscope_events(result_data: dict, in_scope_ids: list[str]):

backend/tests/unit/task_manager/test_event.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
def filter_outofscope_events(events: dict, in_scope_ids: list[str]):
2828
"""
29-
Because we can't garantee that Prefect is empty at the start of the test easily
29+
Because we can't guarantee that Prefect is empty at the start of the test easily
3030
we need to exclude all events not created by this test suite.
3131
"""
3232
filtered_events = [event for event in events["edges"] if event["node"]["id"] in in_scope_ids]
@@ -58,13 +58,13 @@ async def events_data(prefect_client: PrefectClient, branch1_id, branch2_id) ->
5858
"branch2_rebased": BranchRebasedEvent(branch_name="branch2", branch_id=branch2_id),
5959
}
6060

61-
await send_events(client=prefect_client, events=items.values())
61+
await send_events(client=prefect_client, events=list(items.values()))
6262
return items
6363

6464

6565
@pytest.fixture(scope="module")
6666
async def event_ids_inscope(events_data: dict[str, InfrahubEvent]) -> list[str]:
67-
return [str(event.id) for event in events_data.values()]
67+
return [str(event.meta.id) for event in events_data.values()]
6868

6969

7070
async def test_query_no_filters(event_ids_inscope):

0 commit comments

Comments
 (0)