@@ -41,32 +41,32 @@ async def create_reply_queue(channel: AbstractChannel):
4141
4242 return queue .name
4343
44- async def create_reply_queue (channel : BlockingChannel ):
45- # channel: BlockingChannel = dependencies.get_rabbitmq()
46-
47- if (
48- config_entry := await ConfigEntryDB .find_one ({"key" : "instance_id" })
49- ) is not None :
50- instance_id = config_entry .value
51- else :
52- # If no ID has been generated for this instance, generate a 10-digit alphanumeric identifier
53- instance_id = "" .join (
54- random .choice (
55- string .ascii_uppercase + string .ascii_lowercase + string .digits
56- )
57- for _ in range (10 )
58- )
59- config_entry = ConfigEntryDB (key = "instance_id" , value = instance_id )
60- await config_entry .insert ()
61-
62- queue_name = "clowder.%s" % instance_id
63- channel .exchange_declare (exchange = "clowder" , durable = True )
64- result = channel .queue_declare (
65- queue = queue_name , durable = True , exclusive = False , auto_delete = False
66- )
67- queue_name = result .method .queue
68- channel .queue_bind (exchange = "clowder" , queue = queue_name )
69- return queue_name
44+ # async def create_reply_queue(channel: BlockingChannel):
45+ # # channel: BlockingChannel = dependencies.get_rabbitmq()
46+ #
47+ # if (
48+ # config_entry := await ConfigEntryDB.find_one({"key": "instance_id"})
49+ # ) is not None:
50+ # instance_id = config_entry.value
51+ # else:
52+ # # If no ID has been generated for this instance, generate a 10-digit alphanumeric identifier
53+ # instance_id = "".join(
54+ # random.choice(
55+ # string.ascii_uppercase + string.ascii_lowercase + string.digits
56+ # )
57+ # for _ in range(10)
58+ # )
59+ # config_entry = ConfigEntryDB(key="instance_id", value=instance_id)
60+ # await config_entry.insert()
61+ #
62+ # queue_name = "clowder.%s" % instance_id
63+ # channel.exchange_declare(exchange="clowder", durable=True)
64+ # result = channel.queue_declare(
65+ # queue=queue_name, durable=True, exclusive=False, auto_delete=False
66+ # )
67+ # queue_name = result.method.queue
68+ # channel.queue_bind(exchange="clowder", queue=queue_name)
69+ # return queue_name
7070
7171
7272async def submit_file_job (
@@ -76,10 +76,7 @@ async def submit_file_job(
7676 user : UserOut ,
7777 rabbitmq_client : AbstractChannel ,
7878):
79- # print(f"DEBUG submit_file_job: Got client of type: {type(rabbitmq_client)}")
80- # if not isinstance(rabbitmq_client, BlockingChannel):
81- # raise TypeError(f"Expected BlockingChannel, got {type(rabbitmq_client)}. This confirms a mixing issue.")
82-
79+ print (f"DEBUG submit_file_job: Got client of type: { type (rabbitmq_client )} " )
8380 # Create an entry in job history with unique ID
8481 job = EventListenerJobDB (
8582 listener_id = routing_key ,
@@ -108,7 +105,7 @@ async def submit_file_job(
108105 exchange = await rabbitmq_client .get_exchange ("clowder" )
109106 reply_to = await create_reply_queue (rabbitmq_client )
110107 print ("RABBITMQ_CLIENT: " + str (rabbitmq_client ))
111- await exchange .publish (
108+ await rabbitmq_client . default_exchange .publish (
112109 aio_pika .Message (
113110 body = json .dumps (msg_body .dict (), ensure_ascii = False ).encode ('utf-8' ),
114111 content_type = "application/json" ,
0 commit comments