Skip to content

Commit 639c432

Browse files
authored
Merge pull request #5831 from opsmill/pog-branch-events
Refactor branch events with regards to impacted branch
2 parents 3d85c9a + eee243c commit 639c432

File tree

7 files changed

+114
-27
lines changed

7 files changed

+114
-27
lines changed

backend/infrahub/core/branch/tasks.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,9 @@ async def rebase_branch(branch: str, context: InfrahubContext, service: Infrahub
159159
# TODO Add account information
160160
await service.event.send(
161161
event=BranchRebasedEvent(
162-
branch_name=obj.name, branch_id=str(obj.uuid), meta=EventMeta(branch=obj, context=context)
162+
branch_name=obj.name,
163+
branch_id=str(obj.uuid),
164+
meta=EventMeta.from_context(context=context, branch=registry.get_global_branch()),
163165
)
164166
)
165167

@@ -174,7 +176,11 @@ async def merge_branch(branch: str, context: InfrahubContext, service: InfrahubS
174176
obj = await Branch.get_by_name(db=db, name=branch)
175177
default_branch = await registry.get_branch(db=db, branch=registry.default_branch)
176178
component_registry = get_component_registry()
177-
merge_event = BranchMergedEvent(meta=EventMeta.from_context(context=context, branch=obj))
179+
merge_event = BranchMergedEvent(
180+
branch_name=obj.name,
181+
branch_id=str(obj.get_uuid()),
182+
meta=EventMeta.from_context(context=context, branch=registry.get_global_branch()),
183+
)
178184

179185
merger: BranchMerger | None = None
180186
async with lock.registry.global_graph_lock():
@@ -280,7 +286,7 @@ async def delete_branch(branch: str, context: InfrahubContext, service: Infrahub
280286
branch_name=branch,
281287
branch_id=str(obj.uuid),
282288
sync_with_git=obj.sync_with_git,
283-
meta=EventMeta(branch=obj, context=context),
289+
meta=EventMeta.from_context(context=context, branch=registry.get_global_branch()),
284290
)
285291

286292
await service.workflow.submit_workflow(
@@ -348,9 +354,7 @@ async def create_branch(model: BranchCreateModel, context: InfrahubContext, serv
348354
branch_name=obj.name,
349355
branch_id=str(obj.uuid),
350356
sync_with_git=obj.sync_with_git,
351-
meta=EventMeta(
352-
branch=obj, account_id=context.account.account_id, initiator_id=WORKER_IDENTITY, context=context
353-
),
357+
meta=EventMeta.from_context(context=context, branch=registry.get_global_branch()),
354358
)
355359
await service.event.send(event=event)
356360

backend/infrahub/events/branch_action.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,14 @@ def event_name(self) -> str:
7373
class BranchMergedEvent(InfrahubEvent):
7474
"""Event generated when a branch has been merged"""
7575

76+
branch_name: str = Field(..., description="The name of the branch")
77+
branch_id: str = Field(..., description="The ID of the branch")
78+
7679
def get_resource(self) -> dict[str, str]:
7780
return {
78-
"prefect.resource.id": f"infrahub.branch.{self.meta.get_branch_id()}",
79-
"infrahub.node.kind": "Branch",
80-
"infrahub.node.id": self.meta.get_branch_id(),
81-
"infrahub.node.label": self.meta.context.branch.name,
81+
"prefect.resource.id": f"infrahub.branch.{self.branch_name}",
82+
"infrahub.branch.id": self.branch_id,
83+
"infrahub.branch.name": self.branch_name,
8284
}
8385

8486
def get_messages(self) -> list[InfrahubMessage]:
@@ -99,6 +101,7 @@ def get_resource(self) -> dict[str, str]:
99101
return {
100102
"prefect.resource.id": f"infrahub.branch.{self.branch_name}",
101103
"infrahub.branch.id": self.branch_id,
104+
"infrahub.branch.name": self.branch_name,
102105
}
103106

104107
def get_messages(self) -> list[InfrahubMessage]:

backend/infrahub/graphql/queries/event.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
from typing import TYPE_CHECKING, Any
44

5-
from graphene import Boolean, DateTime, Field, Int, List, NonNull, ObjectType, String
5+
from graphene import Argument, Boolean, DateTime, Field, Int, List, NonNull, ObjectType, String
66
from infrahub_sdk.utils import extract_fields_first_node
77

88
from infrahub.exceptions import ValidationError
9-
from infrahub.graphql.types.event import EventNodes
9+
from infrahub.graphql.types.event import EventNodes, EventTypeFilter
1010
from infrahub.task_manager.event import PrefectEvent
1111
from infrahub.task_manager.models import InfrahubEventFilter
1212

@@ -32,6 +32,7 @@ async def resolve(
3232
ids: list[str] | None = None,
3333
branches: list[str] | None = None,
3434
event_type: list[str] | None = None,
35+
event_type_filter: dict[str, Any] | None = None,
3536
related_node__ids: list[str] | None = None,
3637
primary_node__ids: list[str] | None = None,
3738
parent__ids: list[str] | None = None,
@@ -49,6 +50,7 @@ async def resolve(
4950
account__ids=account__ids,
5051
has_children=has_children,
5152
event_type=event_type,
53+
event_type_filter=event_type_filter,
5254
related_node__ids=related_node__ids,
5355
primary_node__ids=primary_node__ids,
5456
parent__ids=parent__ids,
@@ -93,6 +95,7 @@ async def query(
9395
level=Int(required=False),
9496
has_children=Boolean(required=False, description="Filter events based on if they can have children or not"),
9597
event_type=List(NonNull(String), description="Filter events that match a specific type"),
98+
event_type_filter=Argument(EventTypeFilter, required=False, description="Filters specific to a given event_type"),
9699
primary_node__ids=List(
97100
NonNull(String), description="Filter events where the primary node id is within indicated node ids"
98101
),

backend/infrahub/graphql/types/event.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import TYPE_CHECKING, Any
44

5-
from graphene import Boolean, DateTime, Field, Int, Interface, List, NonNull, ObjectType, String
5+
from graphene import Boolean, DateTime, Field, InputObjectType, Int, Interface, List, NonNull, ObjectType, String
66
from graphene.types.generic import GenericScalar
77

88
from .common import RelatedNode
@@ -21,16 +21,25 @@ class InfrahubMutatedAttribute(ObjectType):
2121

2222

2323
class EventNodeInterface(Interface):
24-
id = String(required=True)
25-
event = String(required=True)
26-
branch = String(required=False)
27-
account_id = String(required=False)
28-
occurred_at = DateTime(required=True)
29-
level = Int(required=True)
30-
primary_node = Field(RelatedNode, required=False)
31-
related_nodes = List(NonNull(RelatedNode), required=True)
32-
has_children = Boolean(required=True)
33-
parent_id = String(required=False)
24+
id = String(required=True, description="The ID of the event.")
25+
event = String(required=True, description="The name of the event.")
26+
branch = String(required=False, description="The branch where the event occurred.")
27+
account_id = String(required=False, description="The account ID that triggered the event.")
28+
occurred_at = DateTime(required=True, description="The timestamp when the event occurred.")
29+
level = Int(
30+
required=True,
31+
description="The level of the event 0 is a root level event, the child events will have 1 and grand children 2.",
32+
)
33+
primary_node = Field(
34+
RelatedNode, required=False, description="The primary Infrahub node this event is associated with."
35+
)
36+
related_nodes = List(
37+
NonNull(RelatedNode), required=True, description="Related Infrahub nodes this event is associated with."
38+
)
39+
has_children = Boolean(
40+
required=True, description="Indicates if the event is expected to have child events under it"
41+
)
42+
parent_id = String(required=False, description="The event ID of the direct parent to this event.")
3443

3544
@classmethod
3645
def resolve_type(
@@ -47,27 +56,49 @@ class EventNodes(ObjectType):
4756
node = Field(EventNodeInterface)
4857

4958

59+
class BranchEventTypeFilter(InputObjectType):
60+
branches = List(NonNull(String), required=True, description="Name of impacted branches")
61+
62+
63+
class EventTypeFilter(InputObjectType):
64+
branch_merged = Field(
65+
BranchEventTypeFilter, required=False, description="Filters specific to infrahub.branch.merged events"
66+
)
67+
68+
5069
# ---------------------------------------
5170
# Branch events
5271
# ---------------------------------------
5372
class BranchCreatedEvent(ObjectType):
5473
class Meta:
5574
interfaces = (EventNodeInterface,)
5675

76+
created_branch = String(required=True, description="The name of the branch that was created")
5777
payload = Field(GenericScalar, required=True)
5878

5979

80+
class BranchMergedEvent(ObjectType):
81+
class Meta:
82+
interfaces = (EventNodeInterface,)
83+
84+
source_branch = String(required=True, description="The name of the branch that was merged into the default branch")
85+
86+
6087
class BranchRebasedEvent(ObjectType):
6188
class Meta:
6289
interfaces = (EventNodeInterface,)
6390

91+
rebased_branch = String(
92+
required=True, description="The name of the branch that was rebased and aligned with the default branch"
93+
)
6494
payload = Field(GenericScalar, required=True)
6595

6696

6797
class BranchDeletedEvent(ObjectType):
6898
class Meta:
6999
interfaces = (EventNodeInterface,)
70100

101+
deleted_branch = String(required=True, description="The name of the branch that was deleted")
71102
payload = Field(GenericScalar, required=True)
72103

73104

@@ -94,6 +125,7 @@ class Meta:
94125
"infrahub.node.updated": NodeMutatedEvent,
95126
"infrahub.node.deleted": NodeMutatedEvent,
96127
"infrahub.branch.created": BranchCreatedEvent,
128+
"infrahub.branch.merged": BranchMergedEvent,
97129
"infrahub.branch.rebased": BranchRebasedEvent,
98130
"infrahub.branch.deleted": BranchDeletedEvent,
99131
"undefined": StandardEvent,

backend/infrahub/task_manager/event.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from prefect.events.schemas.events import Event as PrefectEventModel
77
from pydantic import BaseModel, Field, TypeAdapter
88

9+
from infrahub.core.constants import GLOBAL_BRANCH_NAME
910
from infrahub.log import get_logger
1011
from infrahub.utils import get_nested_dict
1112

@@ -22,6 +23,8 @@ def get_branch(self) -> str | None:
2223
continue
2324
if "infrahub.resource.label" not in resource:
2425
continue
26+
if resource.get("infrahub.resource.label") == GLOBAL_BRANCH_NAME:
27+
return None
2528
return resource.get("infrahub.resource.label")
2629
return None
2730

@@ -107,10 +110,34 @@ def _return_node_mutation(self) -> dict[str, Any]:
107110

108111
return {"attributes": attributes}
109112

113+
def _get_branch_name_from_resource(self) -> str:
114+
return self.resource.get("infrahub.branch.name") or ""
115+
116+
def _return_branch_created(self) -> dict[str, Any]:
117+
return {"created_branch": self._get_branch_name_from_resource()}
118+
119+
def _return_branch_deleted(self) -> dict[str, Any]:
120+
return {"deleted_branch": self._get_branch_name_from_resource()}
121+
122+
def _return_branch_merged(self) -> dict[str, Any]:
123+
return {"source_branch": self._get_branch_name_from_resource()}
124+
125+
def _return_branch_rebased(self) -> dict[str, Any]:
126+
return {"rebased_branch": self._get_branch_name_from_resource()}
127+
110128
def _return_event_specifics(self) -> dict[str, Any]:
129+
"""Return event specific data based on the type of event being processed"""
111130
match self.event:
112131
case "infrahub.node.created" | "infrahub.node.updated" | "infrahub.node.deleted":
113132
return self._return_node_mutation()
133+
case "infrahub.branch.created":
134+
return self._return_branch_created()
135+
case "infrahub.branch.deleted":
136+
return self._return_branch_deleted()
137+
case "infrahub.branch.merged":
138+
return self._return_branch_merged()
139+
case "infrahub.branch.rebased":
140+
return self._return_branch_rebased()
114141

115142
return {}
116143

backend/infrahub/task_manager/models.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import uuid
44
from collections import defaultdict
5-
from typing import TYPE_CHECKING
5+
from typing import TYPE_CHECKING, Any
66
from uuid import UUID
77

88
from prefect.client.schemas.objects import Log as PrefectLog # noqa: TC002
@@ -122,7 +122,19 @@ def add_event_id_filter(self, ids: list[str] | None = None) -> None:
122122
if ids:
123123
self.id = EventIDFilter(id=[uuid.UUID(id) for id in ids])
124124

125-
def add_event_type_filter(self, event_type: list[str] | None = None) -> None:
125+
def add_event_type_filter(
126+
self, event_type: list[str] | None = None, event_type_filter: dict[str, Any] | None = None
127+
) -> None:
128+
event_type = event_type or []
129+
event_type_filter = event_type_filter or {}
130+
131+
if branch_merged := event_type_filter.get("branch_merged"):
132+
branches: list[str] = branch_merged.get("branches") or []
133+
if "infrahub.branch.created" not in event_type:
134+
event_type.append("infrahub.branch.merged")
135+
if branches:
136+
self.resource = EventResourceFilter(labels=ResourceSpecification({"infrahub.branch.name": branches}))
137+
126138
if event_type:
127139
self.event = EventNameFilter(name=event_type)
128140

@@ -159,6 +171,7 @@ def from_filters(
159171
parent__ids: list[str] | None = None,
160172
primary_node__ids: list[str] | None = None,
161173
event_type: list[str] | None = None,
174+
event_type_filter: dict[str, Any] | None = None,
162175
branches: list[str] | None = None,
163176
level: int | None = None,
164177
has_children: bool | None = None,
@@ -179,7 +192,7 @@ def from_filters(
179192

180193
filters.add_event_filter(level=level, has_children=has_children)
181194
filters.add_event_id_filter(ids=ids)
182-
filters.add_event_type_filter(event_type=event_type)
195+
filters.add_event_type_filter(event_type=event_type, event_type_filter=event_type_filter)
183196
filters.add_branch_filter(branches=branches)
184197
filters.add_account_filter(account__ids=account__ids)
185198
filters.add_parent_filter(parent__ids=parent__ids)

backend/tests/integration/proposed_change/test_proposed_change_conflict.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,10 @@ async def test_happy_pipeline(self, db: InfrahubDatabase, happy_data_branch: str
251251

252252
for _ in range(10):
253253
merge_event = await client.execute_graphql(
254-
query=QUERY_EVENT, variables={"branch": happy_data_branch, "event_type": "infrahub.branch.merged"}
254+
query=QUERY_EVENT,
255+
variables={
256+
"event_type_filter": {"branch_merged": {"branches": happy_data_branch}},
257+
},
255258
)
256259
if merge_event["InfrahubEvent"]["count"] == 1:
257260
break
@@ -322,11 +325,13 @@ async def test_connectivity(self, db: InfrahubDatabase, initial_dataset: str, cl
322325
$branch: [String!],
323326
$parent__ids: [String!],
324327
$event_type: [String!]
328+
$event_type_filter: EventTypeFilter
325329
) {
326330
InfrahubEvent(
327331
branches: $branch,
328332
parent__ids: $parent__ids
329333
event_type: $event_type
334+
event_type_filter: $event_type_filter
330335
) {
331336
count
332337
edges {

0 commit comments

Comments
 (0)