Skip to content

Commit 6294d17

Browse files
committed
for now - this sends message to queue, but didn't actually check listeners, needs to be added
1 parent 898bcd2 commit 6294d17

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

backend/app/routers/files.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import time
33
from datetime import datetime, timedelta
44
from typing import List, Optional, Union
5-
5+
import json
6+
from json import JSONEncoder
7+
from aio_pika import Message
68
from app import dependencies
79
from app.config import settings
810
from app.db.file.download import _increment_file_downloads
@@ -37,6 +39,13 @@
3739
router = APIRouter()
3840
security = HTTPBearer()
3941

42+
class CustomJSONEncoder(JSONEncoder):
43+
def default(self, obj):
44+
if isinstance(obj, PydanticObjectId):
45+
return str(obj)
46+
# Handle other non-serializable types if needed
47+
return super().default(obj)
48+
4049

4150
async def _resubmit_file_extractors(
4251
file: FileOut,
@@ -135,6 +144,20 @@ async def add_file_entry(
135144
# Add entry to the file index
136145
await index_file(es, FileOut(**new_file.dict()))
137146

147+
# Publish a message when indexing is complete
148+
149+
message_body = {
150+
"event_type": "file_indexed",
151+
"file_data": json.loads(new_file.json()), # This handles ObjectID serialization
152+
"timestamp": datetime.now().isoformat()
153+
}
154+
155+
rabbitmq_client.basic_publish(
156+
exchange='clowder',
157+
routing_key='file_indexed_events',
158+
body=json.dumps(message_body).encode('utf-8')
159+
)
160+
138161
# TODO - timing issue here, check_feed_listeners needs to happen asynchronously.
139162
time.sleep(1)
140163

@@ -163,6 +186,18 @@ async def add_local_file_entry(
163186

164187
# Add entry to the file index
165188
await index_file(es, FileOut(**new_file.dict()))
189+
# Publish a message when indexing is complete
190+
message_body = {
191+
"event_type": "file_indexed",
192+
"file_data": json.loads(new_file.json()), # This handles ObjectID serialization
193+
"timestamp": datetime.now().isoformat()
194+
}
195+
196+
rabbitmq_client.basic_publish(
197+
exchange='clowder',
198+
routing_key='file_indexed_events',
199+
body=json.dumps(message_body).encode('utf-8')
200+
)
166201

167202
# TODO - timing issue here, check_feed_listeners needs to happen asynchronously.
168203
time.sleep(1)

backend/message_listener.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ async def callback(message: AbstractIncomingMessage):
9393
async with message.process():
9494
msg = json.loads(message.body.decode("utf-8"))
9595

96+
if "event_type" in msg and msg["event_type"] == "file_indexed":
97+
print(f"This is an event type file indexed!")
98+
9699
job_id = msg["job_id"]
97100
message_str = msg["status"]
98101
timestamp = datetime.strptime(
@@ -222,6 +225,7 @@ async def listen_for_messages():
222225

223226

224227
if __name__ == "__main__":
228+
logger.info(" Message listener starting...")
225229
start = datetime.now()
226230
while time_ran < timeout:
227231
try:

docker-compose.dev.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ services:
157157
- rabbitmq
158158

159159
extractors-messages:
160-
image: "clowder/clowder2-messages:latest"
160+
image: "clowder2-messages:test"
161161
build:
162162
dockerfile: backend/messages.Dockerfile
163163
environment:

0 commit comments

Comments
 (0)