1717from app .routers .users import get_user_job_key
1818from fastapi import Depends
1919from pika .adapters .blocking_connection import BlockingChannel
20+ import aio_pika
21+ from aio_pika .abc import AbstractChannel
2022
2123
22- async def create_reply_queue ():
23- channel : BlockingChannel = dependencies .get_rabbitmq ()
24+ async def create_reply_queue (channel : AbstractChannel ):
25+ if (config_entry := await ConfigEntryDB .find_one ({"key" : "instance_id" })) is not None :
26+ instance_id = config_entry .value
27+ else :
28+ instance_id = "" .join (
29+ random .choice (string .ascii_uppercase + string .ascii_lowercase + string .digits )
30+ for _ in range (10 )
31+ )
32+ config_entry = ConfigEntryDB (key = "instance_id" , value = instance_id )
33+ await config_entry .insert ()
34+
35+ queue_name = f"clowder.{ instance_id } "
36+
37+ # Use aio_pika methods instead of pika methods
38+ exchange = await channel .declare_exchange ("clowder" , durable = True )
39+ queue = await channel .declare_queue (queue_name , durable = True , exclusive = False , auto_delete = False )
40+ await queue .bind (exchange )
41+
42+ return queue .name
43+
44+ async def create_reply_queue (channel : BlockingChannel ):
45+ # channel: BlockingChannel = dependencies.get_rabbitmq()
2446
2547 if (
2648 config_entry := await ConfigEntryDB .find_one ({"key" : "instance_id" })
@@ -52,8 +74,12 @@ async def submit_file_job(
5274 routing_key : str ,
5375 parameters : dict ,
5476 user : UserOut ,
55- rabbitmq_client : BlockingChannel ,
77+ rabbitmq_client : AbstractChannel ,
5678):
79+ # print(f"DEBUG submit_file_job: Got client of type: {type(rabbitmq_client)}")
80+ # if not isinstance(rabbitmq_client, BlockingChannel):
81+ # raise TypeError(f"Expected BlockingChannel, got {type(rabbitmq_client)}. This confirms a mixing issue.")
82+
5783 # Create an entry in job history with unique ID
5884 job = EventListenerJobDB (
5985 listener_id = routing_key ,
@@ -65,6 +91,7 @@ async def submit_file_job(
6591 )
6692 await job .insert ()
6793
94+
6895 current_secretKey = await get_user_job_key (user .email )
6996 msg_body = EventListenerJobMessage (
7097 filename = file_out .name ,
@@ -75,15 +102,20 @@ async def submit_file_job(
75102 job_id = str (job .id ),
76103 parameters = parameters ,
77104 )
78- reply_to = await create_reply_queue ()
105+
106+ # Use aio_pika publishing
107+ # Get the existing clowder exchange
108+ exchange = await rabbitmq_client .get_exchange ("clowder" )
109+ reply_to = await create_reply_queue (rabbitmq_client )
79110 print ("RABBITMQ_CLIENT: " + str (rabbitmq_client ))
80- rabbitmq_client . basic_publish (
81- exchange = "" ,
82- routing_key = routing_key ,
83- body = json . dumps ( msg_body . dict (), ensure_ascii = False ) ,
84- properties = pika . BasicProperties (
85- content_type = "application/json" , delivery_mode = 1 , reply_to = reply_to
111+ await exchange . publish (
112+ aio_pika . Message (
113+ body = json . dumps ( msg_body . dict (), ensure_ascii = False ). encode ( 'utf-8' ) ,
114+ content_type = "application/json" ,
115+ delivery_mode = aio_pika . DeliveryMode . PERSISTENT ,
116+ reply_to = reply_to ,
86117 ),
118+ routing_key = routing_key ,
87119 )
88120 return str (job .id )
89121
0 commit comments