Skip to content

Commit 74d945a

Browse files
committed
Skip prefect when using BusSimulator
1 parent 3eb3c3e commit 74d945a

File tree

2 files changed

+9
-3
lines changed

2 files changed

+9
-3
lines changed

backend/infrahub/message_bus/operations/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Optional
22

33
import ujson
4+
from prefect import Flow
45

56
from infrahub.message_bus import RPCErrorResponse, messages
67
from infrahub.message_bus.operations import (
@@ -81,12 +82,17 @@
8182
}
8283

8384

84-
async def execute_message(routing_key: str, message_body: bytes, service: InfrahubServices) -> Optional[MessageTTL]:
85+
async def execute_message(
86+
routing_key: str, message_body: bytes, service: InfrahubServices, skip_flow: bool = False
87+
) -> Optional[MessageTTL]:
8588
message_data = ujson.loads(message_body)
8689
message = messages.MESSAGE_MAP[routing_key](**message_data)
8790
message.set_log_data(routing_key=routing_key)
8891
try:
89-
await COMMAND_MAP[routing_key](message=message, service=service)
92+
func = COMMAND_MAP[routing_key]
93+
if skip_flow and isinstance(func, Flow):
94+
func = func.fn
95+
await func(message=message, service=service)
9096
except Exception as exc: # pylint: disable=broad-except
9197
if message.reply_requested:
9298
response = RPCErrorResponse(errors=[str(exc)], initial_message=message.model_dump())

backend/tests/adapters/message_bus.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def publish(
5050
if routing_key not in self.messages_per_routing_key:
5151
self.messages_per_routing_key[routing_key] = []
5252
self.messages_per_routing_key[routing_key].append(message)
53-
await execute_message(routing_key=routing_key, message_body=message.body, service=self.service)
53+
await execute_message(routing_key=routing_key, message_body=message.body, service=self.service, skip_flow=True)
5454

5555
async def reply(self, message: InfrahubMessage, routing_key: str) -> None:
5656
correlation_id = message.meta.correlation_id or "default"

0 commit comments

Comments
 (0)