Skip to content

Commit 6c15b0a

Browse files
committed
Add fanout tool
1 parent 8e93a51 commit 6c15b0a

File tree

2 files changed

+24
-3
lines changed

2 files changed

+24
-3
lines changed

src/mcp_server_rabbitmq/models.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,8 @@
33

44
class Enqueue(BaseModel):
55
message: Annotated[str, Field(description="The message to publish")]
6-
queue: Annotated[str, Field(description="The name of the queue")]
6+
queue: Annotated[str, Field(description="The name of the queue")]
7+
8+
class Fanout(BaseModel):
9+
message: Annotated[str, Field(description="The message to publish")]
10+
exchange: Annotated[str, Field(description="The name of the exchange")]

src/mcp_server_rabbitmq/server.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
)
77
import pika
88
import ssl
9-
from .models import Enqueue
9+
from .models import Enqueue, Fanout
1010
from .logger import Logger, LOG_LEVEL
1111

1212

@@ -39,6 +39,11 @@ async def list_tools() -> list[Tool]:
3939
name="enqueue",
4040
description="""Enqueue a message to a queue hosted on RabbitMQ""",
4141
inputSchema=Enqueue.model_json_schema(),
42+
),
43+
Tool(
44+
name="fanout",
45+
description="""Publish a message to an exchange with fanout type""",
46+
inputSchema=Fanout.model_json_schema(),
4247
)
4348
]
4449

@@ -51,7 +56,6 @@ async def call_tool(
5156
logger.debug("Executing enqueue tool")
5257
message = arguments["message"]
5358
queue = arguments["queue"]
54-
# Send to RabbitMQ host
5559
try:
5660
connection = pika.BlockingConnection(parameters)
5761
channel = connection.channel()
@@ -61,6 +65,19 @@ async def call_tool(
6165
except Exception as e:
6266
logger.error(f"{e}")
6367
return [TextContent(type="text", text=str("failed"))]
68+
elif name == "fanout":
69+
logger.debug("Executing fanout tool")
70+
exchange = arguments["exchange"]
71+
message = arguments["message"]
72+
try:
73+
connection = pika.BlockingConnection(parameters)
74+
channel = connection.channel()
75+
channel.exchange_declare(exchange=exchange, exchange_type="fanout")
76+
channel.basic_publish(exchange=exchange, routing_key="", body=message)
77+
return [TextContent(type="text", text=str("suceeded"))]
78+
except Exception as e:
79+
logger.error(f"{e}")
80+
return [TextContent(type="text", text=str("failed"))]
6481
raise ValueError(f"Tool not found: {name}")
6582

6683
options = server.create_initialization_options()

0 commit comments

Comments
 (0)