8
8
import ssl
9
9
from .models import Enqueue , Fanout
10
10
from .logger import Logger , LOG_LEVEL
11
+ from .connection import RabbitMQConnection , validate_rabbitmq_name
11
12
12
13
13
14
async def serve (rabbitmq_host : str , port : int , username : str , password : str , use_tls : bool , log_level : str = LOG_LEVEL .DEBUG .name ) -> None :
@@ -23,14 +24,8 @@ async def serve(rabbitmq_host: str, port: int, username: str, password: str, use
23
24
logger = Logger ("server.log" , log_level )
24
25
if is_log_level_exception :
25
26
logger .warning ("Wrong log_level received. Default to WARNING" )
26
- # Setup RabbitMQ connection metadata
27
- protocol = "amqps" if use_tls else "amqp"
28
- url = f"{ protocol } ://{ username } :{ password } @{ rabbitmq_host } :{ port } "
29
- parameters = pika .URLParameters (url )
30
- if use_tls :
31
- ssl_context = ssl .SSLContext (ssl .PROTOCOL_TLSv1_2 )
32
- ssl_context .set_ciphers ('ECDHE+AESGCM:!ECDSA' )
33
- parameters .ssl_options = pika .SSLOptions (context = ssl_context )
27
+ # Setup RabbitMQ connection
28
+ rabbitmq = RabbitMQConnection (rabbitmq_host , port , username , password , use_tls )
34
29
35
30
@server .list_tools ()
36
31
async def list_tools () -> list [Tool ]:
@@ -57,22 +52,13 @@ async def call_tool(
57
52
message = arguments ["message" ]
58
53
queue = arguments ["queue" ]
59
54
60
- if not message or not message .strip ():
61
- raise ValueError ("Message cannot be empty" )
62
- if not queue or not queue .strip ():
63
- raise ValueError ("Queue name cannot be empty" )
64
- # RabbitMQ queue names can only contain letters, digits, hyphen, underscore, period, or colon
65
- # and must be less than 255 characters
66
- if not all (c .isalnum () or c in '-_.:' for c in queue ):
67
- raise ValueError ("Queue name can only contain letters, digits, hyphen, underscore, period, or colon" )
68
- if len (queue ) > 255 :
69
- raise ValueError ("Queue name must be less than 255 characters" )
55
+ validate_rabbitmq_name (queue , "Queue name" )
70
56
71
57
try :
72
- connection = pika .BlockingConnection (parameters )
73
- channel = connection .channel ()
58
+ connection , channel = rabbitmq .get_channel ()
74
59
channel .queue_declare (queue )
75
60
channel .basic_publish (exchange = "" , routing_key = queue , body = message )
61
+ connection .close ()
76
62
return [TextContent (type = "text" , text = str ("suceeded" ))]
77
63
except Exception as e :
78
64
logger .error (f"{ e } " )
@@ -82,22 +68,13 @@ async def call_tool(
82
68
message = arguments ["message" ]
83
69
exchange = arguments ["exchange" ]
84
70
85
- if not message or not message .strip ():
86
- raise ValueError ("Message cannot be empty" )
87
- if not exchange or not exchange .strip ():
88
- raise ValueError ("Exchange name cannot be empty" )
89
- # RabbitMQ exchange names can only contain letters, digits, hyphen, underscore, period, or colon
90
- # and must be less than 255 characters
91
- if not all (c .isalnum () or c in '-_.:' for c in exchange ):
92
- raise ValueError ("Exchange name can only contain letters, digits, hyphen, underscore, period, or colon" )
93
- if len (exchange ) > 255 :
94
- raise ValueError ("Exchange name must be less than 255 characters" )
71
+ validate_rabbitmq_name (exchange , "Exchange name" )
95
72
96
73
try :
97
- connection = pika .BlockingConnection (parameters )
98
- channel = connection .channel ()
74
+ connection , channel = rabbitmq .get_channel ()
99
75
channel .exchange_declare (exchange = exchange , exchange_type = "fanout" )
100
76
channel .basic_publish (exchange = exchange , routing_key = "" , body = message )
77
+ connection .close ()
101
78
return [TextContent (type = "text" , text = str ("suceeded" ))]
102
79
except Exception as e :
103
80
logger .error (f"{ e } " )
0 commit comments