|
6 | 6 | import string |
7 | 7 | import time |
8 | 8 | from datetime import datetime |
9 | | - |
| 9 | +from app.models.files import ( |
| 10 | + FileDB, |
| 11 | + FileOut, |
| 12 | +) |
| 13 | +from app.models.users import ( |
| 14 | + UserOut, |
| 15 | +) |
| 16 | +from app.routers.feeds import check_feed_listeners |
10 | 17 | from aio_pika import connect_robust |
11 | 18 | from aio_pika.abc import AbstractIncomingMessage |
12 | 19 | from app.main import startup_beanie |
|
16 | 23 | EventListenerJobStatus, |
17 | 24 | EventListenerJobUpdateDB, |
18 | 25 | ) |
| 26 | +import os |
| 27 | +from app.config import settings |
19 | 28 | from beanie import PydanticObjectId |
20 | 29 |
|
21 | 30 | logging.basicConfig(level=logging.INFO) |
|
24 | 33 |
|
25 | 34 | timeout = 5 * 60 # five minute timeout |
26 | 35 | time_ran = 0 |
| 36 | +from app.dependencies import get_elasticsearchclient, get_rabbitmq |
| 37 | + |
27 | 38 |
|
28 | 39 |
|
29 | 40 | def parse_message_status(msg): |
@@ -99,12 +110,27 @@ async def callback(message: AbstractIncomingMessage): |
99 | 110 |
|
100 | 111 | # Convert string IDs back to PydanticObjectId if needed |
101 | 112 | file_data = msg.get("file_data", {}) |
| 113 | + user = msg.get("user", {}) |
102 | 114 | if "id" in file_data and isinstance(file_data["id"], str): |
103 | 115 | file_data["id"] = PydanticObjectId(file_data["id"]) |
104 | 116 |
|
105 | | - # Now you can create your FileOut object |
106 | | - # file_out = FileOut(**file_data) |
107 | | - # TODO - process file indexed event here |
| 117 | + if "id" in file_data and isinstance(file_data["id"], str): |
| 118 | + file_data["id"] = PydanticObjectId(file_data["id"]) |
| 119 | + |
| 120 | + # Create FileOut object |
| 121 | + file_out = FileOut(**file_data) |
| 122 | + |
| 123 | + # Create UserOut object from the user data in the message |
| 124 | + user = UserOut(**user_data) |
| 125 | + |
| 126 | + # Now call check_feed_listeners with the injected dependencies |
| 127 | + await check_feed_listeners( |
| 128 | + es, # Elasticsearch client |
| 129 | + file_out, |
| 130 | + user, |
| 131 | + rabbitmq_client, # RabbitMQ client |
| 132 | + ) |
| 133 | + |
108 | 134 | else: |
109 | 135 | job_id = msg["job_id"] |
110 | 136 | message_str = msg["status"] |
@@ -180,6 +206,9 @@ async def callback(message: AbstractIncomingMessage): |
180 | 206 | async def listen_for_messages(): |
181 | 207 | await startup_beanie() |
182 | 208 |
|
| 209 | + # Initialize dependencies using your existing functions |
| 210 | + es = await get_elasticsearchclient() |
| 211 | + |
183 | 212 | # For some reason, Pydantic Settings environment variable overrides aren't being applied, so get them here. |
184 | 213 | RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest") |
185 | 214 | RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest") |
|
0 commit comments