|
25 | 25 | from .utils.error_schema import ErrorResponse |
26 | 26 | from .utils.handle_httpx_exception import handle_httpx_exception |
27 | 27 |
|
| 28 | +from .services.scheduler.schemas import SchedulerStatus, SchedulerControl |
| 29 | +from .services.scheduler import start_scheduler, stop_scheduler, get_scheduler_status |
| 30 | + |
| 31 | + |
28 | 32 |
|
29 | 33 | # --- Configuration --- |
30 | 34 | load_dotenv() |
|
44 | 48 | {"name": "Crawler", "description": "Crawl and clean website content."}, |
45 | 49 | {"name": "Embedder", "description": "Chunk and embed text to vector DB."}, |
46 | 50 | {"name": "LLM", "description": "Language Model completions and chat."}, |
| 51 | + {"name": "Scheduler", "description": "Scheduled background jobs and automation."}, |
47 | 52 | ] |
48 | 53 |
|
49 | 54 | # ------------------------------- |
50 | 55 | # --- Lifespan Manager (for startup) --- |
51 | 56 | # ------------------------------- |
52 | 57 | @asynccontextmanager |
53 | 58 | async def lifespan(app: FastAPI): |
54 | | - logger.info("Service starting up...") |
55 | | - try: |
56 | | - # Example: Create client connections here |
57 | | - client = get_weaviate_client() |
58 | | - logger.info("Testing Weaviate connection...") |
59 | | - test_weaviate_connection() |
60 | | - logger.info("Ensuring Weaviate schema exists...") |
61 | | - ensure_schema_exists(client) |
62 | | - logger.info("Everything is set up successfully and ready to go!") |
63 | | - except Exception as e: |
64 | | - logger.error(f"Error during startup: {e}", exc_info=True) |
65 | | - raise RuntimeError("Failed during application startup.") from e |
66 | | - yield |
67 | | - logger.info("Service shutting down...") |
| 59 | + logger.info("Service starting up...") |
| 60 | + try: |
| 61 | + # Example: Create client connections here |
| 62 | + client = get_weaviate_client() |
| 63 | + logger.info("Testing Weaviate connection...") |
| 64 | + test_weaviate_connection() |
| 65 | + logger.info("Ensuring Weaviate schema exists...") |
| 66 | + ensure_schema_exists(client) |
| 67 | + logger.info("Everything is set up successfully and ready to go!") |
| 68 | + |
| 69 | + # Start the blog embedder scheduler |
| 70 | + logger.info("Starting blog embedder scheduler...") |
| 71 | + start_scheduler() |
| 72 | + |
| 73 | + except Exception as e: |
| 74 | + logger.error(f"Error during startup: {e}", exc_info=True) |
| 75 | + raise RuntimeError("Failed during application startup.") from e |
| 76 | + yield |
| 77 | + logger.info("Service shutting down...") |
| 78 | + |
| 79 | + # Stop the scheduler on shutdown |
| 80 | + logger.info("Stopping blog embedder scheduler...") |
| 81 | + stop_scheduler() |
68 | 82 |
|
69 | 83 | # --- App Initialization --- |
70 | 84 | app = FastAPI( |
@@ -263,6 +277,53 @@ async def generate_course(req: CourseGenerationRequest): |
263 | 277 | except Exception as e: |
264 | 278 | raise HTTPException(500, str(e)) from e |
265 | 279 |
|
| 280 | +# ------------------------------- |
| 281 | +# --- Scheduler Endpoints ------- |
| 282 | +# ------------------------------- |
| 283 | +@app.get(f"{API_PREFIX}/scheduler/status", response_model=SchedulerStatus, tags=["Scheduler"]) |
| 284 | +async def get_scheduler_status_endpoint(): |
| 285 | + """Get the current status of the blog embedder scheduler""" |
| 286 | + return SchedulerStatus(**get_scheduler_status()) |
| 287 | + |
| 288 | + |
| 289 | +@app.post(f"{API_PREFIX}/scheduler/control", tags=["Scheduler"]) |
| 290 | +async def control_scheduler(request: SchedulerControl): |
| 291 | + """Control the blog embedder scheduler (start/stop)""" |
| 292 | + if request.action == "start": |
| 293 | + start_scheduler() |
| 294 | + return {"message": "Scheduler started successfully"} |
| 295 | + elif request.action == "stop": |
| 296 | + stop_scheduler() |
| 297 | + return {"message": "Scheduler stopped successfully"} |
| 298 | + else: |
| 299 | + raise HTTPException(status_code=400, detail="Invalid action. Use 'start' or 'stop'") |
| 300 | + |
| 301 | + |
| 302 | +@app.post(f"{API_PREFIX}/scheduler/run-now", tags=["Scheduler"]) |
| 303 | +async def run_scheduler_now(): |
| 304 | + """Manually trigger the blog embedder job immediately""" |
| 305 | + try: |
| 306 | + # Import here to avoid circular imports |
| 307 | + from .services.scheduler.scheduler_service import _scheduler |
| 308 | + |
| 309 | + # Run the job in a separate thread to avoid blocking |
| 310 | + import threading |
| 311 | + def run_job(): |
| 312 | + import asyncio |
| 313 | + async def async_job(): |
| 314 | + urls = await _scheduler.fetch_freecodecamp_articles() |
| 315 | + await _scheduler.embed_articles(urls) |
| 316 | + asyncio.run(async_job()) |
| 317 | + |
| 318 | + thread = threading.Thread(target=run_job, daemon=True) |
| 319 | + thread.start() |
| 320 | + |
| 321 | + return {"message": "Blog embedder job triggered successfully"} |
| 322 | + except Exception as e: |
| 323 | + logger.error(f"Error triggering scheduler job: {e}") |
| 324 | + raise HTTPException(status_code=500, detail=f"Failed to trigger job: {str(e)}") |
| 325 | + |
| 326 | + |
266 | 327 |
|
267 | 328 | # ------------------------------- |
268 | 329 | # --------- MAIN ---------------- |
|
0 commit comments