Skip to content

Commit 0017661

Browse files
committed
rabbitmq is now abstract channel
1 parent c48d8fd commit 0017661

File tree

3 files changed

+62
-66
lines changed

3 files changed

+62
-66
lines changed

backend/app/dependencies.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
import boto3
55
import pika
6+
import aio_pika
7+
from aio_pika.abc import AbstractChannel
8+
69
from app.config import settings
710
from app.search.connect import connect_elasticsearch
811
from minio import Minio
@@ -74,20 +77,27 @@ async def get_external_fs() -> AsyncGenerator[Minio, None]:
7477
yield file_system
7578

7679

77-
def get_rabbitmq() -> BlockingChannel:
80+
async def get_rabbitmq() -> AbstractChannel:
7881
"""Client to connect to RabbitMQ for listeners/extractors interactions."""
79-
credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS)
80-
parameters = pika.ConnectionParameters(
81-
settings.RABBITMQ_HOST, credentials=credentials
82-
)
82+
RABBITMQ_URL = f"amqp://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASS}@{settings.RABBITMQ_HOST}/"
83+
8384
logger.debug("Connecting to rabbitmq at %s", settings.RABBITMQ_HOST)
84-
connection = pika.BlockingConnection(parameters)
85-
channel = connection.channel()
85+
connection = await aio_pika.connect_robust(RABBITMQ_URL)
86+
channel = await connection.channel()
87+
8688
print(f"DEBUG: get_rabbitmq() called. Returning channel of type: {type(channel)}")
87-
print(f"DEBUG: Channel object: {channel}")
8889
return channel
8990

9091

92+
# Keep the old function for compatibility if needed
93+
def get_blocking_rabbitmq() -> BlockingChannel:
94+
"""Legacy blocking RabbitMQ client (for extractors that need it)"""
95+
credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS)
96+
parameters = pika.ConnectionParameters(settings.RABBITMQ_HOST, credentials=credentials)
97+
connection = pika.BlockingConnection(parameters)
98+
return connection.channel()
99+
100+
91101
async def get_elasticsearchclient():
92102
es = await connect_elasticsearch()
93103
return es

backend/app/rabbitmq/listeners.py

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -41,33 +41,6 @@ async def create_reply_queue(channel: AbstractChannel):
4141

4242
return queue.name
4343

44-
# async def create_reply_queue(channel: BlockingChannel):
45-
# # channel: BlockingChannel = dependencies.get_rabbitmq()
46-
#
47-
# if (
48-
# config_entry := await ConfigEntryDB.find_one({"key": "instance_id"})
49-
# ) is not None:
50-
# instance_id = config_entry.value
51-
# else:
52-
# # If no ID has been generated for this instance, generate a 10-digit alphanumeric identifier
53-
# instance_id = "".join(
54-
# random.choice(
55-
# string.ascii_uppercase + string.ascii_lowercase + string.digits
56-
# )
57-
# for _ in range(10)
58-
# )
59-
# config_entry = ConfigEntryDB(key="instance_id", value=instance_id)
60-
# await config_entry.insert()
61-
#
62-
# queue_name = "clowder.%s" % instance_id
63-
# channel.exchange_declare(exchange="clowder", durable=True)
64-
# result = channel.queue_declare(
65-
# queue=queue_name, durable=True, exclusive=False, auto_delete=False
66-
# )
67-
# queue_name = result.method.queue
68-
# channel.queue_bind(exchange="clowder", queue=queue_name)
69-
# return queue_name
70-
7144

7245
async def submit_file_job(
7346
file_out: FileOut,
@@ -102,7 +75,6 @@ async def submit_file_job(
10275

10376
# Use aio_pika publishing
10477
# Get the existing clowder exchange
105-
exchange = await rabbitmq_client.get_exchange("clowder")
10678
reply_to = await create_reply_queue(rabbitmq_client)
10779
print("RABBITMQ_CLIENT: " + str(rabbitmq_client))
10880
await rabbitmq_client.default_exchange.publish(
@@ -122,7 +94,7 @@ async def submit_dataset_job(
12294
routing_key: str,
12395
parameters: dict,
12496
user: UserOut,
125-
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
97+
rabbitmq_client: AbstractChannel,
12698
):
12799
# Create an entry in job history with unique ID
128100
job = EventListenerJobDB(
@@ -142,13 +114,14 @@ async def submit_dataset_job(
142114
job_id=str(job.id),
143115
parameters=parameters,
144116
)
145-
reply_to = await create_reply_queue()
146-
rabbitmq_client.basic_publish(
147-
exchange="",
148-
routing_key=routing_key,
149-
body=json.dumps(msg_body.dict(), ensure_ascii=False),
150-
properties=pika.BasicProperties(
151-
content_type="application/json", delivery_mode=1, reply_to=reply_to
117+
reply_to = await create_reply_queue(rabbitmq_client)
118+
await rabbitmq_client.default_exchange.publish(
119+
aio_pika.Message(
120+
body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'),
121+
content_type="application/json",
122+
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
123+
reply_to=reply_to,
152124
),
125+
routing_key=routing_key,
153126
)
154127
return str(job.id)

backend/app/routers/files.py

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
3636
from minio import Minio
3737
from pika.adapters.blocking_connection import BlockingChannel
38+
import aio_pika
39+
from aio_pika.abc import AbstractChannel
3840

3941
router = APIRouter()
4042
security = HTTPBearer()
@@ -49,7 +51,7 @@ def default(self, obj):
4951

5052
async def _resubmit_file_extractors(
5153
file: FileOut,
52-
rabbitmq_client: BlockingChannel,
54+
rabbitmq_client: AbstractChannel,
5355
user: UserOut,
5456
credentials: HTTPAuthorizationCredentials = Security(security),
5557
):
@@ -94,7 +96,7 @@ async def add_file_entry(
9496
user: UserOut,
9597
fs: Minio,
9698
es: Elasticsearch,
97-
rabbitmq_client: BlockingChannel,
99+
rabbitmq_client: AbstractChannel,
98100
file: Optional[io.BytesIO] = None,
99101
content_type: Optional[str] = None,
100102
public: bool = False,
@@ -146,22 +148,28 @@ async def add_file_entry(
146148

147149
# Publish a message when indexing is complete
148150

151+
152+
# FIXED: Use aio_pika publishing
149153
message_body = {
150154
"event_type": "file_indexed",
151155
"file_data": json.loads(new_file.json()),
152-
"user": json.loads(user.json()),# This handles ObjectID serialization
156+
"user": json.loads(user.json()),
153157
"timestamp": datetime.now().isoformat()
154158
}
155159

156-
rabbitmq_client.basic_publish(
157-
exchange='clowder',
158-
routing_key='file_indexed_events',
159-
body=json.dumps(message_body).encode('utf-8')
160+
# Get the exchange first
161+
exchange = await rabbitmq_client.get_exchange("clowder")
162+
163+
# Use aio_pika publish method
164+
await exchange.publish(
165+
aio_pika.Message(
166+
body=json.dumps(message_body).encode('utf-8'),
167+
content_type="application/json",
168+
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
169+
),
170+
routing_key="file_indexed_events",
160171
)
161172

162-
# TODO - timing issue here, check_feed_listeners needs to happen asynchronously.
163-
time.sleep(1)
164-
165173
# Submit file job to any qualifying feeds
166174
# await check_feed_listeners(
167175
# es,
@@ -175,7 +183,7 @@ async def add_local_file_entry(
175183
new_file: FileDB,
176184
user: UserOut,
177185
es: Elasticsearch,
178-
rabbitmq_client: BlockingChannel,
186+
rabbitmq_client: AbstractChannel,
179187
content_type: Optional[str] = None,
180188
):
181189
"""Insert FileDB object into MongoDB (makes Clowder ID). Bytes are not stored in DB and versioning not supported
@@ -188,22 +196,27 @@ async def add_local_file_entry(
188196
# Add entry to the file index
189197
await index_file(es, FileOut(**new_file.dict()))
190198
# Publish a message when indexing is complete
199+
191200
message_body = {
192201
"event_type": "file_indexed",
193202
"file_data": json.loads(new_file.json()),
194-
"user": json.loads(user.json()),# This handles ObjectID serialization
203+
"user": json.loads(user.json()),
195204
"timestamp": datetime.now().isoformat()
196205
}
197206

198-
rabbitmq_client.basic_publish(
199-
exchange='clowder',
200-
routing_key='file_indexed_events',
201-
body=json.dumps(message_body).encode('utf-8')
207+
# Get the exchange first
208+
exchange = await rabbitmq_client.get_exchange("clowder")
209+
210+
# Use aio_pika publish method
211+
await exchange.publish(
212+
aio_pika.Message(
213+
body=json.dumps(message_body).encode('utf-8'),
214+
content_type="application/json",
215+
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
216+
),
217+
routing_key="file_indexed_events",
202218
)
203219

204-
# TODO - timing issue here, check_feed_listeners needs to happen asynchronously.
205-
time.sleep(1)
206-
207220
# Submit file job to any qualifying feeds
208221
# await check_feed_listeners(
209222
# es,
@@ -255,7 +268,7 @@ async def update_file(
255268
file: UploadFile = File(...),
256269
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
257270
credentials: HTTPAuthorizationCredentials = Security(security),
258-
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
271+
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
259272
allow: bool = Depends(FileAuthorization("uploader")),
260273
):
261274
# Check all connection and abort if any one of them is not available
@@ -593,7 +606,7 @@ async def post_file_extract(
593606
parameters: dict = None,
594607
user=Depends(get_current_user),
595608
credentials: HTTPAuthorizationCredentials = Security(security),
596-
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
609+
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
597610
allow: bool = Depends(FileAuthorization("uploader")),
598611
):
599612
if extractorName is None:
@@ -620,7 +633,7 @@ async def resubmit_file_extractions(
620633
file_id: str,
621634
user=Depends(get_current_user),
622635
credentials: HTTPAuthorizationCredentials = Security(security),
623-
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
636+
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
624637
allow: bool = Depends(FileAuthorization("editor")),
625638
):
626639
"""This route will check metadata. We get the extractors run from metadata from extractors.

0 commit comments

Comments
 (0)