Skip to content

Commit 273ae56

Browse files
committed
Add tool for listing queues
1 parent 7db6626 commit 273ae56

File tree

4 files changed

+34
-9
lines changed

4 files changed

+34
-9
lines changed

src/mcp_server_rabbitmq/admin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,6 @@ def get_overview(self) -> Dict:
7878

7979
if __name__ == "__main__":
8080
# test
81-
admin = RabbitMQAdmin("localhost", 15672, "guest", "guest", False)
81+
admin = RabbitMQAdmin("b-9560b8e1-3d33-4d91-9488-a3dc4a61dfe7.mq.us-east-1.amazonaws.com", 15671, "admin", "admintestrabbit", True)
8282
print(admin.list_queues())
8383
print("Done")

src/mcp_server_rabbitmq/handlers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from .connection import RabbitMQConnection
2+
from .admin import RabbitMQAdmin
3+
from typing import List
24

35
def handle_enqueue(rabbitmq: RabbitMQConnection, queue: str, message: str):
46
connection, channel = rabbitmq.get_channel()
@@ -10,4 +12,8 @@ def handle_fanout(rabbitmq: RabbitMQConnection, exchange: str, message: str):
1012
connection, channel = rabbitmq.get_channel()
1113
channel.exchange_declare(exchange=exchange, exchange_type="fanout")
1214
channel.basic_publish(exchange=exchange, routing_key="", body=message)
13-
connection.close()
15+
connection.close()
16+
17+
def handle_list_queues(rabbitmq_admin: RabbitMQAdmin) -> List[str]:
18+
result = rabbitmq_admin.list_queues()
19+
return [queue['name'] for queue in result]

src/mcp_server_rabbitmq/models.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,7 @@ class Enqueue(BaseModel):
77

88
class Fanout(BaseModel):
99
message: Annotated[str, Field(description="The message to publish")]
10-
exchange: Annotated[str, Field(description="The name of the exchange")]
10+
exchange: Annotated[str, Field(description="The name of the exchange")]
11+
12+
class ListQueues(BaseModel):
13+
pass

src/mcp_server_rabbitmq/server.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
TextContent,
55
Tool,
66
)
7-
import pika
87
import ssl
9-
from .models import Enqueue, Fanout
8+
from .models import Enqueue, Fanout, ListQueues
109
from .logger import Logger, LOG_LEVEL
1110
from .connection import RabbitMQConnection, validate_rabbitmq_name
12-
from .handlers import handle_enqueue, handle_fanout
11+
from .handlers import handle_enqueue, handle_fanout, handle_list_queues
12+
from .admin import RabbitMQAdmin
1313

1414

15-
async def serve(rabbitmq_host: str, port: int, username: str, password: str, use_tls: bool, log_level: str = LOG_LEVEL.DEBUG.name) -> None:
15+
async def serve(rabbitmq_host: str, port: int, username: str, password: str, use_tls: bool, log_level: str = LOG_LEVEL.DEBUG.name, api_port: int = 15671) -> None:
1616
# Setup server
1717
server = Server("mcp-rabbitmq")
1818
# Setup logger
@@ -25,8 +25,6 @@ async def serve(rabbitmq_host: str, port: int, username: str, password: str, use
2525
logger = Logger("server.log", log_level)
2626
if is_log_level_exception:
2727
logger.warning("Wrong log_level received. Default to WARNING")
28-
# Setup RabbitMQ connection
29-
rabbitmq = RabbitMQConnection(rabbitmq_host, port, username, password, use_tls)
3028

3129
@server.list_tools()
3230
async def list_tools() -> list[Tool]:
@@ -40,6 +38,11 @@ async def list_tools() -> list[Tool]:
4038
name="fanout",
4139
description="""Publish a message to an exchange with fanout type""",
4240
inputSchema=Fanout.model_json_schema(),
41+
),
42+
Tool(
43+
name="list_queues",
44+
description="""List all the queues in the broker""",
45+
inputSchema=ListQueues.model_json_schema(),
4346
)
4447
]
4548

@@ -56,6 +59,8 @@ async def call_tool(
5659
validate_rabbitmq_name(queue, "Queue name")
5760

5861
try:
62+
# Setup RabbitMQ connection
63+
rabbitmq = RabbitMQConnection(rabbitmq_host, port, username, password, use_tls)
5964
handle_enqueue(rabbitmq, queue, message)
6065
return [TextContent(type="text", text=str("suceeded"))]
6166
except Exception as e:
@@ -69,11 +74,22 @@ async def call_tool(
6974
validate_rabbitmq_name(exchange, "Exchange name")
7075

7176
try:
77+
# Setup RabbitMQ connection
78+
rabbitmq = RabbitMQConnection(rabbitmq_host, port, username, password, use_tls)
7279
handle_fanout(rabbitmq, exchange, message)
7380
return [TextContent(type="text", text=str("suceeded"))]
7481
except Exception as e:
7582
logger.error(f"{e}")
7683
return [TextContent(type="text", text=str("failed"))]
84+
elif name == "list_queues":
85+
try:
86+
admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls)
87+
result = handle_list_queues(admin)
88+
return [TextContent(type="text", text=str(result))]
89+
except Exception as e:
90+
logger.error(f"{e}")
91+
return [TextContent(type="text", text=str("failed"))]
92+
return [TextContent(type="text", text=str("succeeded"))]
7793
raise ValueError(f"Tool not found: {name}")
7894

7995
options = server.create_initialization_options()

0 commit comments

Comments
 (0)