Skip to content

Commit 97ae477

Browse files
committed
Add "handle_purge_queue" "handle_delete_exchange" and "handle_get_exchange_info"
1 parent c0b37c3 commit 97ae477

File tree

3 files changed

+84
-2
lines changed

3 files changed

+84
-2
lines changed

src/mcp_server_rabbitmq/handlers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,12 @@ def handle_get_queue_info(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str
2727

2828
def handle_delete_queue(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> None:
2929
rabbitmq_admin.delete_queue(queue, vhost)
30+
31+
def handle_purge_queue(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> None:
32+
rabbitmq_admin.purge_queue(queue, vhost)
33+
34+
def handle_delete_exchange(rabbitmq_admin: RabbitMQAdmin, exchange: str, vhost: str = "/") -> None:
35+
rabbitmq_admin.delete_exchange(exchange, vhost)
36+
37+
def handle_get_exchange_info(rabbitmq_admin: RabbitMQAdmin, exchange: str, vhost: str = "/") -> dict:
38+
return rabbitmq_admin.get_exchange_info(exchange, vhost)

src/mcp_server_rabbitmq/models.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,15 @@ class GetQueueInfo(BaseModel):
2222
class DeleteQueue(BaseModel):
2323
queue: Annotated[str, Field(description="The name of the queue to delete")]
2424
vhost: Annotated[str, Field(description="The virtual host where the queue exists")] = "/"
25+
26+
class PurgeQueue(BaseModel):
27+
queue: Annotated[str, Field(description="The name of the queue to purge")]
28+
vhost: Annotated[str, Field(description="The virtual host where the queue exists")] = "/"
29+
30+
class DeleteExchange(BaseModel):
31+
exchange: Annotated[str, Field(description="The name of the exchange to delete")]
32+
vhost: Annotated[str, Field(description="The virtual host where the exchange exists")] = "/"
33+
34+
class GetExchangeInfo(BaseModel):
35+
exchange: Annotated[str, Field(description="The name of the exchange to get info about")]
36+
vhost: Annotated[str, Field(description="The virtual host where the exchange exists")] = "/"

src/mcp_server_rabbitmq/server.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,17 @@
55
Tool,
66
)
77
import ssl
8-
from .models import Enqueue, Fanout, ListQueues, ListExchanges, GetQueueInfo, DeleteQueue
8+
from .models import (
9+
Enqueue,
10+
Fanout,
11+
ListQueues,
12+
ListExchanges,
13+
GetQueueInfo,
14+
DeleteQueue,
15+
PurgeQueue,
16+
DeleteExchange,
17+
GetExchangeInfo
18+
)
919
from .logger import Logger, LOG_LEVEL
1020
from .connection import RabbitMQConnection, validate_rabbitmq_name
1121
from .handlers import (
@@ -14,7 +24,10 @@
1424
handle_list_queues,
1525
handle_list_exchanges,
1626
handle_get_queue_info,
17-
handle_delete_queue
27+
handle_delete_queue,
28+
handle_purge_queue,
29+
handle_delete_exchange,
30+
handle_get_exchange_info
1831
)
1932
from .admin import RabbitMQAdmin
2033

@@ -65,6 +78,21 @@ async def list_tools() -> list[Tool]:
6578
name="delete_queue",
6679
description="""Delete a specific queue""",
6780
inputSchema=DeleteQueue.model_json_schema(),
81+
),
82+
Tool(
83+
name="purge_queue",
84+
description="""Remove all messages from a specific queue""",
85+
inputSchema=PurgeQueue.model_json_schema(),
86+
),
87+
Tool(
88+
name="delete_exchange",
89+
description="""Delete a specific exchange""",
90+
inputSchema=DeleteExchange.model_json_schema(),
91+
),
92+
Tool(
93+
name="get_exchange_info",
94+
description="""Get detailed information about a specific exchange""",
95+
inputSchema=GetExchangeInfo.model_json_schema(),
6896
)
6997
]
7098

@@ -142,6 +170,39 @@ async def call_tool(
142170
except Exception as e:
143171
logger.error(f"{e}")
144172
return [TextContent(type="text", text=str("failed"))]
173+
elif name == "purge_queue":
174+
try:
175+
admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls)
176+
queue = arguments["queue"]
177+
vhost = arguments.get("vhost", "/")
178+
validate_rabbitmq_name(queue, "Queue name")
179+
handle_purge_queue(admin, queue, vhost)
180+
return [TextContent(type="text", text=str("succeeded"))]
181+
except Exception as e:
182+
logger.error(f"{e}")
183+
return [TextContent(type="text", text=str("failed"))]
184+
elif name == "delete_exchange":
185+
try:
186+
admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls)
187+
exchange = arguments["exchange"]
188+
vhost = arguments.get("vhost", "/")
189+
validate_rabbitmq_name(exchange, "Exchange name")
190+
handle_delete_exchange(admin, exchange, vhost)
191+
return [TextContent(type="text", text=str("succeeded"))]
192+
except Exception as e:
193+
logger.error(f"{e}")
194+
return [TextContent(type="text", text=str("failed"))]
195+
elif name == "get_exchange_info":
196+
try:
197+
admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls)
198+
exchange = arguments["exchange"]
199+
vhost = arguments.get("vhost", "/")
200+
validate_rabbitmq_name(exchange, "Exchange name")
201+
result = handle_get_exchange_info(admin, exchange, vhost)
202+
return [TextContent(type="text", text=str(result))]
203+
except Exception as e:
204+
logger.error(f"{e}")
205+
return [TextContent(type="text", text=str("failed"))]
145206
raise ValueError(f"Tool not found: {name}")
146207

147208
options = server.create_initialization_options()

0 commit comments

Comments
 (0)